You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by wu...@apache.org on 2018/03/21 22:03:55 UTC

[incubator-skywalking] branch master updated: Support ES type/table namespace feature (#956)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e60d40  Support ES type/table namespace feature (#956)
0e60d40 is described below

commit 0e60d40d0121c5d57b6fe1c51dbddb982daf44c7
Author: 吴晟 Wu Sheng <wu...@foxmail.com>
AuthorDate: Thu Mar 22 06:03:51 2018 +0800

    Support ES type/table namespace feature (#956)
    
    * Proposal the code structure for table name prefix. Related to #932
    
    * Fix style check.
    
    * Keep ESDao table name in control.
    
    * Finish all codes about ES namespace feature.
---
 .../client/elasticsearch/ElasticSearchClient.java  |  96 +++++++++---
 .../collector-storage-es-provider/pom.xml          |   5 +
 .../storage/es/StorageModuleEsProvider.java        | 162 +++++----------------
 .../es/base/dao/AbstractPersistenceEsDAO.java      |  23 +--
 .../es/dao/GlobalTraceEsPersistenceDAO.java        |  21 ++-
 .../es/dao/SegmentDurationEsPersistenceDAO.java    |  21 ++-
 .../storage/es/dao/SegmentEsPersistenceDAO.java    |  21 ++-
 .../alarm/ApplicationAlarmEsPersistenceDAO.java    |  35 +++--
 .../ApplicationReferenceAlarmEsPersistenceDAO.java |  33 +++--
 ...licationReferenceAlarmListEsPersistenceDAO.java |  33 +++--
 .../dao/alarm/InstanceAlarmEsPersistenceDAO.java   |  33 +++--
 .../alarm/InstanceAlarmListEsPersistenceDAO.java   |  33 +++--
 .../InstanceReferenceAlarmEsPersistenceDAO.java    |  37 +++--
 ...InstanceReferenceAlarmListEsPersistenceDAO.java |  37 +++--
 .../es/dao/alarm/ServiceAlarmEsPersistenceDAO.java |  35 +++--
 .../alarm/ServiceAlarmListEsPersistenceDAO.java    |  35 +++--
 .../ServiceReferenceAlarmEsPersistenceDAO.java     |  45 +++---
 .../ServiceReferenceAlarmListEsPersistenceDAO.java |  45 +++---
 .../AbstractMemoryPoolMetricEsPersistenceDAO.java  |  32 ++--
 .../es/dao/register/InstanceRegisterEsDAO.java     |  32 ++--
 .../storage/es/dao/ui/CpuMetricEsUIDAO.java        |  25 ++--
 .../storage/es/dao/ui/GCMetricEsUIDAO.java         |  23 +--
 .../storage/es/dao/ui/InstanceMetricEsUIDAO.java   |  46 +++---
 .../storage/es/dao/ui/MemoryMetricEsUIDAO.java     |  33 +++--
 .../storage/es/dao/ui/MemoryPoolMetricEsUIDAO.java |  44 ++----
 .../storage/es/dao/ui/ServiceMetricEsUIDAO.java    |  80 +++++-----
 26 files changed, 554 insertions(+), 511 deletions(-)

diff --git a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
index f639f81..d58b955 100644
--- a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
+++ b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
@@ -19,12 +19,9 @@
 
 package org.apache.skywalking.apm.collector.client.elasticsearch;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
+import org.apache.skywalking.apm.collector.client.Client;
 import org.apache.skywalking.apm.collector.client.ClientException;
+import org.apache.skywalking.apm.collector.core.util.StringUtils;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
 import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
@@ -33,19 +30,25 @@ import org.elasticsearch.action.get.GetRequestBuilder;
 import org.elasticsearch.action.get.MultiGetRequestBuilder;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.action.update.UpdateRequestBuilder;
 import org.elasticsearch.client.IndicesAdminClient;
+import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.reindex.DeleteByQueryAction;
 import org.elasticsearch.index.reindex.DeleteByQueryRequestBuilder;
 import org.elasticsearch.transport.client.PreBuiltTransportClient;
-import org.apache.skywalking.apm.collector.client.Client;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Consumer;
+
 /**
  * @author peng-yongsheng
  */
@@ -55,37 +58,42 @@ public class ElasticSearchClient implements Client {
 
     private org.elasticsearch.client.Client client;
 
+    private final String namespace;
+
     private final String clusterName;
 
     private final Boolean clusterTransportSniffer;
 
     private final String clusterNodes;
 
-    public ElasticSearchClient(String clusterName, Boolean clusterTransportSniffer, String clusterNodes) {
+    public ElasticSearchClient(String namespace, String clusterName, Boolean clusterTransportSniffer, String clusterNodes) {
+        this.namespace = namespace;
         this.clusterName = clusterName;
         this.clusterTransportSniffer = clusterTransportSniffer;
         this.clusterNodes = clusterNodes;
     }
 
-    @Override public void initialize() throws ClientException {
+    @Override
+    public void initialize() throws ClientException {
         Settings settings = Settings.builder()
-            .put("cluster.name", clusterName)
-            .put("client.transport.sniff", clusterTransportSniffer)
-            .build();
+                .put("cluster.name", clusterName)
+                .put("client.transport.sniff", clusterTransportSniffer)
+                .build();
 
         client = new PreBuiltTransportClient(settings);
 
         List<AddressPairs> pairsList = parseClusterNodes(clusterNodes);
         for (AddressPairs pairs : pairsList) {
             try {
-                ((PreBuiltTransportClient)client).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(pairs.host), pairs.port));
+                ((PreBuiltTransportClient) client).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(pairs.host), pairs.port));
             } catch (UnknownHostException e) {
                 throw new ElasticSearchClientException(e.getMessage(), e);
             }
         }
     }
 
-    @Override public void shutdown() {
+    @Override
+    public void shutdown() {
 
     }
 
@@ -114,12 +122,14 @@ public class ElasticSearchClient implements Client {
 
     public boolean createIndex(String indexName, String indexType, Settings settings, XContentBuilder mappingBuilder) {
         IndicesAdminClient adminClient = client.admin().indices();
+        indexName = formatIndexName(indexName);
         CreateIndexResponse response = adminClient.prepareCreate(indexName).setSettings(settings).addMapping(indexType, mappingBuilder).get();
         logger.info("create {} index with type of {} finished, isAcknowledged: {}", indexName, indexType, response.isAcknowledged());
         return response.isShardsAcked();
     }
 
     public boolean deleteIndex(String indexName) {
+        indexName = formatIndexName(indexName);
         IndicesAdminClient adminClient = client.admin().indices();
         DeleteIndexResponse response = adminClient.prepareDelete(indexName).get();
         logger.info("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
@@ -127,44 +137,84 @@ public class ElasticSearchClient implements Client {
     }
 
     public boolean isExistsIndex(String indexName) {
+        indexName = formatIndexName(indexName);
         IndicesAdminClient adminClient = client.admin().indices();
         IndicesExistsResponse response = adminClient.prepareExists(indexName).get();
         return response.isExists();
     }
 
     public SearchRequestBuilder prepareSearch(String indexName) {
+        indexName = formatIndexName(indexName);
         return client.prepareSearch(indexName);
     }
 
     public IndexRequestBuilder prepareIndex(String indexName, String id) {
+        indexName = formatIndexName(indexName);
         return client.prepareIndex(indexName, "type", id);
     }
 
     public UpdateRequestBuilder prepareUpdate(String indexName, String id) {
+        indexName = formatIndexName(indexName);
         return client.prepareUpdate(indexName, "type", id);
     }
 
     public GetRequestBuilder prepareGet(String indexName, String id) {
+        indexName = formatIndexName(indexName);
         return client.prepareGet(indexName, "type", id);
     }
 
-    public DeleteByQueryRequestBuilder prepareDelete() {
-        return DeleteByQueryAction.INSTANCE.newRequestBuilder(client);
+    public DeleteByQueryRequestBuilder prepareDelete(QueryBuilder queryBuilder, String indexName) {
+        indexName = formatIndexName(indexName);
+        return DeleteByQueryAction.INSTANCE.newRequestBuilder(client).filter(queryBuilder).source(indexName);
     }
 
-    public MultiGetRequestBuilder prepareMultiGet() {
-        return client.prepareMultiGet();
+    public MultiGetRequestBuilder prepareMultiGet(List<?> rows, MultiGetRowHandler rowHandler) {
+        MultiGetRequestBuilder prepareMultiGet = client.prepareMultiGet();
+        rowHandler.setPrepareMultiGet(prepareMultiGet);
+        rowHandler.setNamespace(namespace);
+
+        rows.forEach(row -> {
+            rowHandler.accept(row);
+        });
+
+        return rowHandler.getPrepareMultiGet();
     }
 
+    public abstract static class MultiGetRowHandler<T> implements Consumer<T> {
+        private MultiGetRequestBuilder prepareMultiGet;
+        private String namespace;
+
+        public void setPrepareMultiGet(MultiGetRequestBuilder prepareMultiGet) {
+            this.prepareMultiGet = prepareMultiGet;
+        }
+
+        public void setNamespace(String namespace) {
+            this.namespace = namespace;
+        }
+
+        public void add(String indexName, @Nullable String type, String id) {
+            indexName = formatIndexName(namespace, indexName);
+            prepareMultiGet = prepareMultiGet.add(indexName, type, id);
+        }
+
+        private MultiGetRequestBuilder getPrepareMultiGet() {
+            return prepareMultiGet;
+        }
+    }
+
+
     public BulkRequestBuilder prepareBulk() {
         return client.prepareBulk();
     }
 
-    public void update(UpdateRequest updateRequest) {
-        try {
-            client.update(updateRequest).get();
-        } catch (InterruptedException | ExecutionException e) {
-            logger.error(e.getMessage(), e);
+    private String formatIndexName(String indexName) {
+        return formatIndexName(this.namespace, indexName);
+    }
+
+    private static String formatIndexName(String namespace, String indexName) {
+        if (StringUtils.isNotEmpty(namespace)) {
+            return namespace + "_" + indexName;
         }
+        return indexName;
     }
 }
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/pom.xml b/apm-collector/apm-collector-storage/collector-storage-es-provider/pom.xml
index efd4772..0aa276a 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/pom.xml
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/pom.xml
@@ -39,5 +39,10 @@
             <artifactId>collector-cluster-define</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.skywalking</groupId>
+            <artifactId>collector-configuration-define</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 </project>
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java
index 0e9cb27..ff52285 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java
@@ -18,13 +18,13 @@
 
 package org.apache.skywalking.apm.collector.storage.es;
 
-import java.util.Properties;
-import java.util.UUID;
 import org.apache.skywalking.apm.collector.client.ClientException;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.cluster.ClusterModule;
 import org.apache.skywalking.apm.collector.cluster.service.ModuleListenerService;
 import org.apache.skywalking.apm.collector.cluster.service.ModuleRegisterService;
+import org.apache.skywalking.apm.collector.configuration.ConfigurationModule;
+import org.apache.skywalking.apm.collector.configuration.service.ICollectorConfig;
 import org.apache.skywalking.apm.collector.core.module.Module;
 import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
 import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
@@ -39,21 +39,7 @@ import org.apache.skywalking.apm.collector.storage.dao.acp.IApplicationComponent
 import org.apache.skywalking.apm.collector.storage.dao.acp.IApplicationComponentHourPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.dao.acp.IApplicationComponentMinutePersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.dao.acp.IApplicationComponentMonthPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationAlarmListDayPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationAlarmListHourPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationAlarmListMinutePersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationAlarmListMonthPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationAlarmPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationReferenceAlarmListPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationReferenceAlarmPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceAlarmListPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceAlarmPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceReferenceAlarmListPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceReferenceAlarmPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceAlarmListPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceAlarmPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceReferenceAlarmListPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceReferenceAlarmPersistenceDAO;
+import org.apache.skywalking.apm.collector.storage.dao.alarm.*;
 import org.apache.skywalking.apm.collector.storage.dao.amp.IApplicationDayMetricPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.dao.amp.IApplicationHourMetricPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.dao.amp.IApplicationMinuteMetricPersistenceDAO;
@@ -70,16 +56,8 @@ import org.apache.skywalking.apm.collector.storage.dao.cache.IApplicationCacheDA
 import org.apache.skywalking.apm.collector.storage.dao.cache.IInstanceCacheDAO;
 import org.apache.skywalking.apm.collector.storage.dao.cache.INetworkAddressCacheDAO;
 import org.apache.skywalking.apm.collector.storage.dao.cache.IServiceNameCacheDAO;
-import org.apache.skywalking.apm.collector.storage.dao.cpu.ICpuDayMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.cpu.ICpuHourMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.cpu.ICpuMinuteMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.cpu.ICpuMonthMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.cpu.ICpuSecondMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.gc.IGCDayMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.gc.IGCHourMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.gc.IGCMinuteMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.gc.IGCMonthMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.gc.IGCSecondMetricPersistenceDAO;
+import org.apache.skywalking.apm.collector.storage.dao.cpu.*;
+import org.apache.skywalking.apm.collector.storage.dao.gc.*;
 import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceDayMetricPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceHourMetricPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceMinuteMetricPersistenceDAO;
@@ -92,16 +70,8 @@ import org.apache.skywalking.apm.collector.storage.dao.irmp.IInstanceReferenceDa
 import org.apache.skywalking.apm.collector.storage.dao.irmp.IInstanceReferenceHourMetricPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.dao.irmp.IInstanceReferenceMinuteMetricPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.dao.irmp.IInstanceReferenceMonthMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.memory.IMemoryDayMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.memory.IMemoryHourMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.memory.IMemoryMinuteMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.memory.IMemoryMonthMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.memory.IMemorySecondMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.mpool.IMemoryPoolDayMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.mpool.IMemoryPoolHourMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.mpool.IMemoryPoolMinuteMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.mpool.IMemoryPoolMonthMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.mpool.IMemoryPoolSecondMetricPersistenceDAO;
+import org.apache.skywalking.apm.collector.storage.dao.memory.*;
+import org.apache.skywalking.apm.collector.storage.dao.mpool.*;
 import org.apache.skywalking.apm.collector.storage.dao.register.IApplicationRegisterDAO;
 import org.apache.skywalking.apm.collector.storage.dao.register.IInstanceRegisterDAO;
 import org.apache.skywalking.apm.collector.storage.dao.register.INetworkAddressRegisterDAO;
@@ -114,27 +84,7 @@ import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceDay
 import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceHourMetricPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceMinuteMetricPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceMonthMetricPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationAlarmListUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationAlarmUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationComponentUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationMappingUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationMetricUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationReferenceMetricUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.ICpuMetricUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IGCMetricUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IGlobalTraceUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IInstanceAlarmUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IInstanceMetricUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IInstanceUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IMemoryMetricUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IMemoryPoolMetricUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.INetworkAddressUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.ISegmentDurationUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.ISegmentUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceAlarmUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceMetricUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceNameServiceUIDAO;
-import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceReferenceMetricUIDAO;
+import org.apache.skywalking.apm.collector.storage.dao.ui.*;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.BatchEsDAO;
 import org.apache.skywalking.apm.collector.storage.es.base.define.ElasticSearchStorageInstaller;
 import org.apache.skywalking.apm.collector.storage.es.dao.GlobalTraceEsPersistenceDAO;
@@ -145,21 +95,7 @@ import org.apache.skywalking.apm.collector.storage.es.dao.acp.ApplicationCompone
 import org.apache.skywalking.apm.collector.storage.es.dao.acp.ApplicationComponentHourEsPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.es.dao.acp.ApplicationComponentMinuteEsPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.es.dao.acp.ApplicationComponentMonthEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.ApplicationAlarmEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.ApplicationAlarmListEsDayPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.ApplicationAlarmListEsHourPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.ApplicationAlarmListEsMinutePersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.ApplicationAlarmListEsMonthPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.ApplicationReferenceAlarmEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.ApplicationReferenceAlarmListEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.InstanceAlarmEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.InstanceAlarmListEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.InstanceReferenceAlarmEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.InstanceReferenceAlarmListEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.ServiceAlarmEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.ServiceAlarmListEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.ServiceReferenceAlarmEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.alarm.ServiceReferenceAlarmListEsPersistenceDAO;
+import org.apache.skywalking.apm.collector.storage.es.dao.alarm.*;
 import org.apache.skywalking.apm.collector.storage.es.dao.amp.ApplicationDayMetricEsPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.es.dao.amp.ApplicationHourMetricEsPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.es.dao.amp.ApplicationMinuteMetricEsPersistenceDAO;
@@ -176,16 +112,8 @@ import org.apache.skywalking.apm.collector.storage.es.dao.cache.ApplicationEsCac
 import org.apache.skywalking.apm.collector.storage.es.dao.cache.InstanceEsCacheDAO;
 import org.apache.skywalking.apm.collector.storage.es.dao.cache.NetworkAddressEsCacheDAO;
 import org.apache.skywalking.apm.collector.storage.es.dao.cache.ServiceNameEsCacheDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.cpu.CpuDayMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.cpu.CpuHourMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.cpu.CpuMinuteMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.cpu.CpuMonthMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.cpu.CpuSecondMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.gc.GCDayMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.gc.GCHourMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.gc.GCMinuteMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.gc.GCMonthMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.gc.GCSecondMetricEsPersistenceDAO;
+import org.apache.skywalking.apm.collector.storage.es.dao.cpu.*;
+import org.apache.skywalking.apm.collector.storage.es.dao.gc.*;
 import org.apache.skywalking.apm.collector.storage.es.dao.imp.InstanceDayMetricEsPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.es.dao.imp.InstanceHourMetricEsPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.es.dao.imp.InstanceMinuteMetricEsPersistenceDAO;
@@ -198,16 +126,8 @@ import org.apache.skywalking.apm.collector.storage.es.dao.irmp.InstanceReference
 import org.apache.skywalking.apm.collector.storage.es.dao.irmp.InstanceReferenceHourMetricEsPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.es.dao.irmp.InstanceReferenceMinuteMetricEsPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.es.dao.irmp.InstanceReferenceMonthMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.memory.MemoryDayMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.memory.MemoryHourMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.memory.MemoryMinuteMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.memory.MemoryMonthMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.memory.MemorySecondMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.mpool.MemoryPoolDayMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.mpool.MemoryPoolHourMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.mpool.MemoryPoolMinuteMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.mpool.MemoryPoolMonthMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.mpool.MemoryPoolSecondMetricEsPersistenceDAO;
+import org.apache.skywalking.apm.collector.storage.es.dao.memory.*;
+import org.apache.skywalking.apm.collector.storage.es.dao.mpool.*;
 import org.apache.skywalking.apm.collector.storage.es.dao.register.ApplicationRegisterEsDAO;
 import org.apache.skywalking.apm.collector.storage.es.dao.register.InstanceRegisterEsDAO;
 import org.apache.skywalking.apm.collector.storage.es.dao.register.NetworkAddressRegisterEsDAO;
@@ -220,30 +140,13 @@ import org.apache.skywalking.apm.collector.storage.es.dao.srmp.ServiceReferenceD
 import org.apache.skywalking.apm.collector.storage.es.dao.srmp.ServiceReferenceHourMetricEsPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.es.dao.srmp.ServiceReferenceMinuteMetricEsPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.es.dao.srmp.ServiceReferenceMonthMetricEsPersistenceDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.ApplicationAlarmEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.ApplicationAlarmListEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.ApplicationComponentEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.ApplicationMappingEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.ApplicationMetricEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.ApplicationReferenceMetricEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.CpuMetricEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.GCMetricEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.GlobalTraceEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.InstanceAlarmEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.InstanceEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.InstanceMetricEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.MemoryMetricEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.MemoryPoolMetricEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.NetworkAddressEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.SegmentDurationEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.SegmentEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.ServiceAlarmEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.ServiceMetricEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.ServiceNameServiceEsUIDAO;
-import org.apache.skywalking.apm.collector.storage.es.dao.ui.ServiceReferenceEsMetricUIDAO;
+import org.apache.skywalking.apm.collector.storage.es.dao.ui.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Properties;
+import java.util.UUID;
+
 /**
  * @author peng-yongsheng
  */
@@ -262,19 +165,23 @@ public class StorageModuleEsProvider extends ModuleProvider {
     private ElasticSearchClient elasticSearchClient;
     private DataTTLKeeperTimer deleteTimer;
 
-    @Override public String name() {
+    @Override
+    public String name() {
         return NAME;
     }
 
-    @Override public Class<? extends Module> module() {
+    @Override
+    public Class<? extends Module> module() {
         return StorageModule.class;
     }
 
-    @Override public void prepare(Properties config) throws ServiceNotProvidedException {
+    @Override
+    public void prepare(Properties config) throws ServiceNotProvidedException {
         String clusterName = config.getProperty(CLUSTER_NAME);
-        Boolean clusterTransportSniffer = (Boolean)config.get(CLUSTER_TRANSPORT_SNIFFER);
+        Boolean clusterTransportSniffer = (Boolean) config.get(CLUSTER_TRANSPORT_SNIFFER);
         String clusterNodes = config.getProperty(CLUSTER_NODES);
-        elasticSearchClient = new ElasticSearchClient(clusterName, clusterTransportSniffer, clusterNodes);
+        String namespace = getManager().find(ConfigurationModule.NAME).getService(ICollectorConfig.class).getNamespace();
+        elasticSearchClient = new ElasticSearchClient(namespace, clusterName, clusterTransportSniffer, clusterNodes);
 
         this.registerServiceImplementation(IBatchDAO.class, new BatchEsDAO(elasticSearchClient));
         registerCacheDAO();
@@ -284,9 +191,10 @@ public class StorageModuleEsProvider extends ModuleProvider {
         registerAlarmDAO();
     }
 
-    @Override public void start(Properties config) throws ServiceNotProvidedException {
-        Integer indexShardsNumber = (Integer)config.get(INDEX_SHARDS_NUMBER);
-        Integer indexReplicasNumber = (Integer)config.get(INDEX_REPLICAS_NUMBER);
+    @Override
+    public void start(Properties config) throws ServiceNotProvidedException {
+        Integer indexShardsNumber = (Integer) config.get(INDEX_SHARDS_NUMBER);
+        Integer indexReplicasNumber = (Integer) config.get(INDEX_REPLICAS_NUMBER);
         try {
             elasticSearchClient.initialize();
 
@@ -304,16 +212,18 @@ public class StorageModuleEsProvider extends ModuleProvider {
         ModuleListenerService moduleListenerService = getManager().find(ClusterModule.NAME).getService(ModuleListenerService.class);
         moduleListenerService.addListener(namingListener);
 
-        Integer beforeDay = (Integer)config.getOrDefault(TIME_TO_LIVE_OF_DATA, 3);
+        Integer beforeDay = (Integer) config.getOrDefault(TIME_TO_LIVE_OF_DATA, 3);
         deleteTimer = new DataTTLKeeperTimer(getManager(), namingListener, uuId + 0, beforeDay);
     }
 
-    @Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
+    @Override
+    public void notifyAfterCompleted() throws ServiceNotProvidedException {
         deleteTimer.start();
     }
 
-    @Override public String[] requiredModules() {
-        return new String[] {ClusterModule.NAME};
+    @Override
+    public String[] requiredModules() {
+        return new String[]{ClusterModule.NAME, ConfigurationModule.NAME};
     }
 
     private void registerCacheDAO() throws ServiceNotProvidedException {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java
index e98d2b5..eb22185 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java
@@ -18,7 +18,6 @@
 
 package org.apache.skywalking.apm.collector.storage.es.base.dao;
 
-import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
 import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
@@ -31,6 +30,8 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+
 /**
  * @author peng-yongsheng
  */
@@ -46,7 +47,8 @@ public abstract class AbstractPersistenceEsDAO<STREAM_DATA extends StreamData> e
 
     protected abstract String tableName();
 
-    @Override public final STREAM_DATA get(String id) {
+    @Override
+    public final STREAM_DATA get(String id) {
         GetResponse getResponse = getClient().prepareGet(tableName(), id).get();
         if (getResponse.isExists()) {
             STREAM_DATA streamData = esDataToStreamData(getResponse.getSource());
@@ -59,25 +61,28 @@ public abstract class AbstractPersistenceEsDAO<STREAM_DATA extends StreamData> e
 
     protected abstract Map<String, Object> esStreamDataToEsData(STREAM_DATA streamData);
 
-    @Override public final IndexRequestBuilder prepareBatchInsert(STREAM_DATA streamData) {
+    @Override
+    public final IndexRequestBuilder prepareBatchInsert(STREAM_DATA streamData) {
         Map<String, Object> source = esStreamDataToEsData(streamData);
         return getClient().prepareIndex(tableName(), streamData.getId()).setSource(source);
     }
 
-    @Override public final UpdateRequestBuilder prepareBatchUpdate(STREAM_DATA streamData) {
+    @Override
+    public final UpdateRequestBuilder prepareBatchUpdate(STREAM_DATA streamData) {
         Map<String, Object> source = esStreamDataToEsData(streamData);
         return getClient().prepareUpdate(tableName(), streamData.getId()).setDoc(source);
     }
 
     protected abstract String timeBucketColumnNameForDelete();
 
-    @Override public final void deleteHistory(Long startTimestamp, Long endTimestamp) {
+    @Override
+    public final void deleteHistory(Long startTimestamp, Long endTimestamp) {
         long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
         long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
-        BulkByScrollResponse response = getClient().prepareDelete()
-            .filter(QueryBuilders.rangeQuery(timeBucketColumnNameForDelete()).gte(startTimeBucket).lte(endTimeBucket))
-            .source(tableName())
-            .get();
+        BulkByScrollResponse response = getClient().prepareDelete(
+                QueryBuilders.rangeQuery(timeBucketColumnNameForDelete()).gte(startTimeBucket).lte(endTimeBucket),
+                tableName())
+                .get();
 
         long deleted = response.getDeleted();
         logger.info("Delete {} rows history from {} index.", deleted, tableName());
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/GlobalTraceEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/GlobalTraceEsPersistenceDAO.java
index 2b9f23a..b134636 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/GlobalTraceEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/GlobalTraceEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao;
 
 import java.util.HashMap;
 import java.util.Map;
+
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.UnexpectedException;
 import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
@@ -45,15 +46,18 @@ public class GlobalTraceEsPersistenceDAO extends EsDAO implements IGlobalTracePe
         super(client);
     }
 
-    @Override public GlobalTrace get(String id) {
+    @Override
+    public GlobalTrace get(String id) {
         throw new UnexpectedException("There is no need to merge stream data with database data.");
     }
 
-    @Override public UpdateRequestBuilder prepareBatchUpdate(GlobalTrace data) {
+    @Override
+    public UpdateRequestBuilder prepareBatchUpdate(GlobalTrace data) {
         throw new UnexpectedException("There is no need to merge stream data with database data.");
     }
 
-    @Override public IndexRequestBuilder prepareBatchInsert(GlobalTrace data) {
+    @Override
+    public IndexRequestBuilder prepareBatchInsert(GlobalTrace data) {
         Map<String, Object> source = new HashMap<>();
         source.put(GlobalTraceTable.COLUMN_SEGMENT_ID, data.getSegmentId());
         source.put(GlobalTraceTable.COLUMN_GLOBAL_TRACE_ID, data.getGlobalTraceId());
@@ -62,13 +66,14 @@ public class GlobalTraceEsPersistenceDAO extends EsDAO implements IGlobalTracePe
         return getClient().prepareIndex(GlobalTraceTable.TABLE, data.getId()).setSource(source);
     }
 
-    @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
+    @Override
+    public void deleteHistory(Long startTimestamp, Long endTimestamp) {
         long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
         long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
-        BulkByScrollResponse response = getClient().prepareDelete()
-            .filter(QueryBuilders.rangeQuery(GlobalTraceTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
-            .source(GlobalTraceTable.TABLE)
-            .get();
+        BulkByScrollResponse response = getClient().prepareDelete(
+                QueryBuilders.rangeQuery(GlobalTraceTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+                GlobalTraceTable.TABLE)
+                .get();
 
         long deleted = response.getDeleted();
         logger.info("Delete {} rows history from {} index.", deleted, GlobalTraceTable.TABLE);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentDurationEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentDurationEsPersistenceDAO.java
index 611666e..363bbc0 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentDurationEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentDurationEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao;
 
 import java.util.HashMap;
 import java.util.Map;
+
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
 import org.apache.skywalking.apm.collector.storage.dao.ISegmentDurationPersistenceDAO;
@@ -44,15 +45,18 @@ public class SegmentDurationEsPersistenceDAO extends EsDAO implements ISegmentDu
         super(client);
     }
 
-    @Override public SegmentDuration get(String id) {
+    @Override
+    public SegmentDuration get(String id) {
         return null;
     }
 
-    @Override public UpdateRequestBuilder prepareBatchUpdate(SegmentDuration data) {
+    @Override
+    public UpdateRequestBuilder prepareBatchUpdate(SegmentDuration data) {
         return null;
     }
 
-    @Override public IndexRequestBuilder prepareBatchInsert(SegmentDuration data) {
+    @Override
+    public IndexRequestBuilder prepareBatchInsert(SegmentDuration data) {
         logger.debug("segment cost prepareBatchInsert, getApplicationId: {}", data.getId());
         Map<String, Object> source = new HashMap<>();
         source.put(SegmentDurationTable.COLUMN_SEGMENT_ID, data.getSegmentId());
@@ -67,13 +71,14 @@ public class SegmentDurationEsPersistenceDAO extends EsDAO implements ISegmentDu
         return getClient().prepareIndex(SegmentDurationTable.TABLE, data.getId()).setSource(source);
     }
 
-    @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
+    @Override
+    public void deleteHistory(Long startTimestamp, Long endTimestamp) {
         long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
         long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
-        BulkByScrollResponse response = getClient().prepareDelete()
-            .filter(QueryBuilders.rangeQuery(SegmentDurationTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
-            .source(SegmentDurationTable.TABLE)
-            .get();
+        BulkByScrollResponse response = getClient().prepareDelete(
+                QueryBuilders.rangeQuery(SegmentDurationTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+                SegmentDurationTable.TABLE)
+                .get();
 
         long deleted = response.getDeleted();
         logger.info("Delete {} rows history from {} index.", deleted, SegmentDurationTable.TABLE);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentEsPersistenceDAO.java
index 61bb6ad..31cda7e 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentEsPersistenceDAO.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao;
 import java.util.Base64;
 import java.util.HashMap;
 import java.util.Map;
+
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
 import org.apache.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO;
@@ -45,15 +46,18 @@ public class SegmentEsPersistenceDAO extends EsDAO implements ISegmentPersistenc
         super(client);
     }
 
-    @Override public Segment get(String id) {
+    @Override
+    public Segment get(String id) {
         return null;
     }
 
-    @Override public UpdateRequestBuilder prepareBatchUpdate(Segment data) {
+    @Override
+    public UpdateRequestBuilder prepareBatchUpdate(Segment data) {
         return null;
     }
 
-    @Override public IndexRequestBuilder prepareBatchInsert(Segment data) {
+    @Override
+    public IndexRequestBuilder prepareBatchInsert(Segment data) {
         Map<String, Object> source = new HashMap<>();
         source.put(SegmentTable.COLUMN_DATA_BINARY, new String(Base64.getEncoder().encode(data.getDataBinary())));
         source.put(SegmentTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
@@ -61,13 +65,14 @@ public class SegmentEsPersistenceDAO extends EsDAO implements ISegmentPersistenc
         return getClient().prepareIndex(SegmentTable.TABLE, data.getId()).setSource(source);
     }
 
-    @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
+    @Override
+    public void deleteHistory(Long startTimestamp, Long endTimestamp) {
         long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
         long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
-        BulkByScrollResponse response = getClient().prepareDelete()
-            .filter(QueryBuilders.rangeQuery(SegmentTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
-            .source(SegmentTable.TABLE)
-            .get();
+        BulkByScrollResponse response = getClient().prepareDelete(
+                QueryBuilders.rangeQuery(SegmentTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+                SegmentTable.TABLE)
+                .get();
 
         long deleted = response.getDeleted();
         logger.info("Delete {} rows history from {} index.", deleted, SegmentTable.TABLE);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationAlarmEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationAlarmEsPersistenceDAO.java
index 4e3b058..d3d5603 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationAlarmEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationAlarmEsPersistenceDAO.java
@@ -18,8 +18,6 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
 
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
 import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationAlarmPersistenceDAO;
@@ -34,6 +32,9 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * @author peng-yongsheng
  */
@@ -45,27 +46,29 @@ public class ApplicationAlarmEsPersistenceDAO extends EsDAO implements IApplicat
         super(client);
     }
 
-    @Override public ApplicationAlarm get(String id) {
+    @Override
+    public ApplicationAlarm get(String id) {
         GetResponse getResponse = getClient().prepareGet(ApplicationAlarmTable.TABLE, id).get();
         if (getResponse.isExists()) {
             ApplicationAlarm instanceAlarm = new ApplicationAlarm();
             instanceAlarm.setId(id);
 
             Map<String, Object> source = getResponse.getSource();
-            instanceAlarm.setApplicationId(((Number)source.get(ApplicationAlarmTable.COLUMN_APPLICATION_ID)).intValue());
-            instanceAlarm.setSourceValue(((Number)source.get(ApplicationAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
+            instanceAlarm.setApplicationId(((Number) source.get(ApplicationAlarmTable.COLUMN_APPLICATION_ID)).intValue());
+            instanceAlarm.setSourceValue(((Number) source.get(ApplicationAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
 
-            instanceAlarm.setAlarmType(((Number)source.get(ApplicationAlarmTable.COLUMN_ALARM_TYPE)).intValue());
-            instanceAlarm.setAlarmContent((String)source.get(ApplicationAlarmTable.COLUMN_ALARM_CONTENT));
+            instanceAlarm.setAlarmType(((Number) source.get(ApplicationAlarmTable.COLUMN_ALARM_TYPE)).intValue());
+            instanceAlarm.setAlarmContent((String) source.get(ApplicationAlarmTable.COLUMN_ALARM_CONTENT));
 
-            instanceAlarm.setLastTimeBucket(((Number)source.get(ApplicationAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
+            instanceAlarm.setLastTimeBucket(((Number) source.get(ApplicationAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
             return instanceAlarm;
         } else {
             return null;
         }
     }
 
-    @Override public IndexRequestBuilder prepareBatchInsert(ApplicationAlarm data) {
+    @Override
+    public IndexRequestBuilder prepareBatchInsert(ApplicationAlarm data) {
         Map<String, Object> source = new HashMap<>();
         source.put(ApplicationAlarmTable.COLUMN_APPLICATION_ID, data.getApplicationId());
         source.put(ApplicationAlarmTable.COLUMN_SOURCE_VALUE, data.getSourceValue());
@@ -78,7 +81,8 @@ public class ApplicationAlarmEsPersistenceDAO extends EsDAO implements IApplicat
         return getClient().prepareIndex(ApplicationAlarmTable.TABLE, data.getId()).setSource(source);
     }
 
-    @Override public UpdateRequestBuilder prepareBatchUpdate(ApplicationAlarm data) {
+    @Override
+    public UpdateRequestBuilder prepareBatchUpdate(ApplicationAlarm data) {
         Map<String, Object> source = new HashMap<>();
         source.put(ApplicationAlarmTable.COLUMN_APPLICATION_ID, data.getApplicationId());
         source.put(ApplicationAlarmTable.COLUMN_SOURCE_VALUE, data.getSourceValue());
@@ -91,13 +95,14 @@ public class ApplicationAlarmEsPersistenceDAO extends EsDAO implements IApplicat
         return getClient().prepareUpdate(ApplicationAlarmTable.TABLE, data.getId()).setDoc(source);
     }
 
-    @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
+    @Override
+    public void deleteHistory(Long startTimestamp, Long endTimestamp) {
         long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
         long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
-        BulkByScrollResponse response = getClient().prepareDelete()
-            .filter(QueryBuilders.rangeQuery(ApplicationAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
-            .source(ApplicationAlarmTable.TABLE)
-            .get();
+        BulkByScrollResponse response = getClient().prepareDelete(
+                QueryBuilders.rangeQuery(ApplicationAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+                ApplicationAlarmTable.TABLE)
+                .get();
 
         long deleted = response.getDeleted();
         logger.info("Delete {} rows history from {} index.", deleted, ApplicationAlarmTable.TABLE);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmEsPersistenceDAO.java
index 30cb14b..33c1111 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
 
 import java.util.HashMap;
 import java.util.Map;
+
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
 import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationReferenceAlarmPersistenceDAO;
@@ -45,27 +46,29 @@ public class ApplicationReferenceAlarmEsPersistenceDAO extends EsDAO implements
         super(client);
     }
 
-    @Override public ApplicationReferenceAlarm get(String id) {
+    @Override
+    public ApplicationReferenceAlarm get(String id) {
         GetResponse getResponse = getClient().prepareGet(ApplicationReferenceAlarmTable.TABLE, id).get();
         if (getResponse.isExists()) {
             ApplicationReferenceAlarm applicationReferenceAlarm = new ApplicationReferenceAlarm();
             applicationReferenceAlarm.setId(id);
             Map<String, Object> source = getResponse.getSource();
-            applicationReferenceAlarm.setFrontApplicationId(((Number)source.get(ApplicationReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
-            applicationReferenceAlarm.setBehindApplicationId(((Number)source.get(ApplicationReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
-            applicationReferenceAlarm.setSourceValue(((Number)source.get(ApplicationReferenceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
+            applicationReferenceAlarm.setFrontApplicationId(((Number) source.get(ApplicationReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
+            applicationReferenceAlarm.setBehindApplicationId(((Number) source.get(ApplicationReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
+            applicationReferenceAlarm.setSourceValue(((Number) source.get(ApplicationReferenceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
 
-            applicationReferenceAlarm.setAlarmType(((Number)source.get(ApplicationReferenceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
-            applicationReferenceAlarm.setAlarmContent((String)source.get(ApplicationReferenceAlarmTable.COLUMN_ALARM_CONTENT));
+            applicationReferenceAlarm.setAlarmType(((Number) source.get(ApplicationReferenceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
+            applicationReferenceAlarm.setAlarmContent((String) source.get(ApplicationReferenceAlarmTable.COLUMN_ALARM_CONTENT));
 
-            applicationReferenceAlarm.setLastTimeBucket(((Number)source.get(ApplicationReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
+            applicationReferenceAlarm.setLastTimeBucket(((Number) source.get(ApplicationReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
             return applicationReferenceAlarm;
         } else {
             return null;
         }
     }
 
-    @Override public IndexRequestBuilder prepareBatchInsert(ApplicationReferenceAlarm data) {
+    @Override
+    public IndexRequestBuilder prepareBatchInsert(ApplicationReferenceAlarm data) {
         Map<String, Object> source = new HashMap<>();
         source.put(ApplicationReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
         source.put(ApplicationReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
@@ -79,7 +82,8 @@ public class ApplicationReferenceAlarmEsPersistenceDAO extends EsDAO implements
         return getClient().prepareIndex(ApplicationReferenceAlarmTable.TABLE, data.getId()).setSource(source);
     }
 
-    @Override public UpdateRequestBuilder prepareBatchUpdate(ApplicationReferenceAlarm data) {
+    @Override
+    public UpdateRequestBuilder prepareBatchUpdate(ApplicationReferenceAlarm data) {
         Map<String, Object> source = new HashMap<>();
         source.put(ApplicationReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
         source.put(ApplicationReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
@@ -93,13 +97,14 @@ public class ApplicationReferenceAlarmEsPersistenceDAO extends EsDAO implements
         return getClient().prepareUpdate(ApplicationReferenceAlarmTable.TABLE, data.getId()).setDoc(source);
     }
 
-    @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
+    @Override
+    public void deleteHistory(Long startTimestamp, Long endTimestamp) {
         long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
         long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
-        BulkByScrollResponse response = getClient().prepareDelete()
-            .filter(QueryBuilders.rangeQuery(ApplicationReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
-            .source(ApplicationReferenceAlarmTable.TABLE)
-            .get();
+        BulkByScrollResponse response = getClient().prepareDelete(
+                QueryBuilders.rangeQuery(ApplicationReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+                ApplicationReferenceAlarmTable.TABLE)
+                .get();
 
         long deleted = response.getDeleted();
         logger.info("Delete {} rows history from {} index.", deleted, ApplicationReferenceAlarmTable.TABLE);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmListEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmListEsPersistenceDAO.java
index 4b7469c..062c794 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmListEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmListEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
 
 import java.util.HashMap;
 import java.util.Map;
+
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
 import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationReferenceAlarmListPersistenceDAO;
@@ -45,27 +46,29 @@ public class ApplicationReferenceAlarmListEsPersistenceDAO extends EsDAO impleme
         super(client);
     }
 
-    @Override public ApplicationReferenceAlarmList get(String id) {
+    @Override
+    public ApplicationReferenceAlarmList get(String id) {
         GetResponse getResponse = getClient().prepareGet(ApplicationReferenceAlarmListTable.TABLE, id).get();
         if (getResponse.isExists()) {
             ApplicationReferenceAlarmList applicationReferenceAlarmList = new ApplicationReferenceAlarmList();
             applicationReferenceAlarmList.setId(id);
             Map<String, Object> source = getResponse.getSource();
-            applicationReferenceAlarmList.setFrontApplicationId(((Number)source.get(ApplicationReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
-            applicationReferenceAlarmList.setBehindApplicationId(((Number)source.get(ApplicationReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
-            applicationReferenceAlarmList.setSourceValue(((Number)source.get(ApplicationReferenceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
+            applicationReferenceAlarmList.setFrontApplicationId(((Number) source.get(ApplicationReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
+            applicationReferenceAlarmList.setBehindApplicationId(((Number) source.get(ApplicationReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
+            applicationReferenceAlarmList.setSourceValue(((Number) source.get(ApplicationReferenceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
 
-            applicationReferenceAlarmList.setAlarmType(((Number)source.get(ApplicationReferenceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
-            applicationReferenceAlarmList.setAlarmContent((String)source.get(ApplicationReferenceAlarmListTable.COLUMN_ALARM_CONTENT));
+            applicationReferenceAlarmList.setAlarmType(((Number) source.get(ApplicationReferenceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
+            applicationReferenceAlarmList.setAlarmContent((String) source.get(ApplicationReferenceAlarmListTable.COLUMN_ALARM_CONTENT));
 
-            applicationReferenceAlarmList.setTimeBucket(((Number)source.get(ApplicationReferenceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
+            applicationReferenceAlarmList.setTimeBucket(((Number) source.get(ApplicationReferenceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
             return applicationReferenceAlarmList;
         } else {
             return null;
         }
     }
 
-    @Override public IndexRequestBuilder prepareBatchInsert(ApplicationReferenceAlarmList data) {
+    @Override
+    public IndexRequestBuilder prepareBatchInsert(ApplicationReferenceAlarmList data) {
         Map<String, Object> source = new HashMap<>();
         source.put(ApplicationReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
         source.put(ApplicationReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
@@ -79,7 +82,8 @@ public class ApplicationReferenceAlarmListEsPersistenceDAO extends EsDAO impleme
         return getClient().prepareIndex(ApplicationReferenceAlarmListTable.TABLE, data.getId()).setSource(source);
     }
 
-    @Override public UpdateRequestBuilder prepareBatchUpdate(ApplicationReferenceAlarmList data) {
+    @Override
+    public UpdateRequestBuilder prepareBatchUpdate(ApplicationReferenceAlarmList data) {
         Map<String, Object> source = new HashMap<>();
         source.put(ApplicationReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
         source.put(ApplicationReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
@@ -93,13 +97,14 @@ public class ApplicationReferenceAlarmListEsPersistenceDAO extends EsDAO impleme
         return getClient().prepareUpdate(ApplicationReferenceAlarmListTable.TABLE, data.getId()).setDoc(source);
     }
 
-    @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
+    @Override
+    public void deleteHistory(Long startTimestamp, Long endTimestamp) {
         long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
         long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
-        BulkByScrollResponse response = getClient().prepareDelete()
-            .filter(QueryBuilders.rangeQuery(ApplicationReferenceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
-            .source(ApplicationReferenceAlarmListTable.TABLE)
-            .get();
+        BulkByScrollResponse response = getClient().prepareDelete(
+                QueryBuilders.rangeQuery(ApplicationReferenceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+                ApplicationReferenceAlarmListTable.TABLE)
+                .get();
 
         long deleted = response.getDeleted();
         logger.info("Delete {} rows history from {} index.", deleted, ApplicationReferenceAlarmListTable.TABLE);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmEsPersistenceDAO.java
index 64631c6..cec07f4 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
 
 import java.util.HashMap;
 import java.util.Map;
+
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
 import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceAlarmPersistenceDAO;
@@ -45,27 +46,29 @@ public class InstanceAlarmEsPersistenceDAO extends EsDAO implements IInstanceAla
         super(client);
     }
 
-    @Override public InstanceAlarm get(String id) {
+    @Override
+    public InstanceAlarm get(String id) {
         GetResponse getResponse = getClient().prepareGet(InstanceAlarmTable.TABLE, id).get();
         if (getResponse.isExists()) {
             InstanceAlarm instanceAlarm = new InstanceAlarm();
             instanceAlarm.setId(id);
             Map<String, Object> source = getResponse.getSource();
-            instanceAlarm.setApplicationId(((Number)source.get(InstanceAlarmTable.COLUMN_APPLICATION_ID)).intValue());
-            instanceAlarm.setInstanceId(((Number)source.get(InstanceAlarmTable.COLUMN_INSTANCE_ID)).intValue());
-            instanceAlarm.setSourceValue(((Number)source.get(InstanceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
+            instanceAlarm.setApplicationId(((Number) source.get(InstanceAlarmTable.COLUMN_APPLICATION_ID)).intValue());
+            instanceAlarm.setInstanceId(((Number) source.get(InstanceAlarmTable.COLUMN_INSTANCE_ID)).intValue());
+            instanceAlarm.setSourceValue(((Number) source.get(InstanceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
 
-            instanceAlarm.setAlarmType(((Number)source.get(InstanceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
-            instanceAlarm.setAlarmContent((String)source.get(InstanceAlarmTable.COLUMN_ALARM_CONTENT));
+            instanceAlarm.setAlarmType(((Number) source.get(InstanceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
+            instanceAlarm.setAlarmContent((String) source.get(InstanceAlarmTable.COLUMN_ALARM_CONTENT));
 
-            instanceAlarm.setLastTimeBucket(((Number)source.get(InstanceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
+            instanceAlarm.setLastTimeBucket(((Number) source.get(InstanceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
             return instanceAlarm;
         } else {
             return null;
         }
     }
 
-    @Override public IndexRequestBuilder prepareBatchInsert(InstanceAlarm data) {
+    @Override
+    public IndexRequestBuilder prepareBatchInsert(InstanceAlarm data) {
         Map<String, Object> source = new HashMap<>();
         source.put(InstanceAlarmTable.COLUMN_APPLICATION_ID, data.getApplicationId());
         source.put(InstanceAlarmTable.COLUMN_INSTANCE_ID, data.getInstanceId());
@@ -79,7 +82,8 @@ public class InstanceAlarmEsPersistenceDAO extends EsDAO implements IInstanceAla
         return getClient().prepareIndex(InstanceAlarmTable.TABLE, data.getId()).setSource(source);
     }
 
-    @Override public UpdateRequestBuilder prepareBatchUpdate(InstanceAlarm data) {
+    @Override
+    public UpdateRequestBuilder prepareBatchUpdate(InstanceAlarm data) {
         Map<String, Object> source = new HashMap<>();
         source.put(InstanceAlarmTable.COLUMN_APPLICATION_ID, data.getApplicationId());
         source.put(InstanceAlarmTable.COLUMN_INSTANCE_ID, data.getInstanceId());
@@ -93,13 +97,14 @@ public class InstanceAlarmEsPersistenceDAO extends EsDAO implements IInstanceAla
         return getClient().prepareUpdate(InstanceAlarmTable.TABLE, data.getId()).setDoc(source);
     }
 
-    @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
+    @Override
+    public void deleteHistory(Long startTimestamp, Long endTimestamp) {
         long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
         long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
-        BulkByScrollResponse response = getClient().prepareDelete()
-            .filter(QueryBuilders.rangeQuery(InstanceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
-            .source(InstanceAlarmTable.TABLE)
-            .get();
+        BulkByScrollResponse response = getClient().prepareDelete(
+                QueryBuilders.rangeQuery(InstanceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+                InstanceAlarmTable.TABLE)
+                .get();
 
         long deleted = response.getDeleted();
         logger.info("Delete {} rows history from {} index.", deleted, InstanceAlarmTable.TABLE);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmListEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmListEsPersistenceDAO.java
index 47cefae..ac350bd 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmListEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmListEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
 
 import java.util.HashMap;
 import java.util.Map;
+
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
 import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceAlarmListPersistenceDAO;
@@ -45,27 +46,29 @@ public class InstanceAlarmListEsPersistenceDAO extends EsDAO implements IInstanc
         super(client);
     }
 
-    @Override public InstanceAlarmList get(String id) {
+    @Override
+    public InstanceAlarmList get(String id) {
         GetResponse getResponse = getClient().prepareGet(InstanceAlarmListTable.TABLE, id).get();
         if (getResponse.isExists()) {
             InstanceAlarmList instanceAlarmList = new InstanceAlarmList();
             instanceAlarmList.setId(id);
             Map<String, Object> source = getResponse.getSource();
-            instanceAlarmList.setApplicationId(((Number)source.get(InstanceAlarmListTable.COLUMN_APPLICATION_ID)).intValue());
-            instanceAlarmList.setInstanceId(((Number)source.get(InstanceAlarmListTable.COLUMN_INSTANCE_ID)).intValue());
-            instanceAlarmList.setSourceValue(((Number)source.get(InstanceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
+            instanceAlarmList.setApplicationId(((Number) source.get(InstanceAlarmListTable.COLUMN_APPLICATION_ID)).intValue());
+            instanceAlarmList.setInstanceId(((Number) source.get(InstanceAlarmListTable.COLUMN_INSTANCE_ID)).intValue());
+            instanceAlarmList.setSourceValue(((Number) source.get(InstanceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
 
-            instanceAlarmList.setAlarmType(((Number)source.get(InstanceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
-            instanceAlarmList.setAlarmContent((String)source.get(InstanceAlarmListTable.COLUMN_ALARM_CONTENT));
+            instanceAlarmList.setAlarmType(((Number) source.get(InstanceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
+            instanceAlarmList.setAlarmContent((String) source.get(InstanceAlarmListTable.COLUMN_ALARM_CONTENT));
 
-            instanceAlarmList.setTimeBucket(((Number)source.get(InstanceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
+            instanceAlarmList.setTimeBucket(((Number) source.get(InstanceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
             return instanceAlarmList;
         } else {
             return null;
         }
     }
 
-    @Override public IndexRequestBuilder prepareBatchInsert(InstanceAlarmList data) {
+    @Override
+    public IndexRequestBuilder prepareBatchInsert(InstanceAlarmList data) {
         Map<String, Object> source = new HashMap<>();
         source.put(InstanceAlarmListTable.COLUMN_APPLICATION_ID, data.getApplicationId());
         source.put(InstanceAlarmListTable.COLUMN_INSTANCE_ID, data.getInstanceId());
@@ -79,7 +82,8 @@ public class InstanceAlarmListEsPersistenceDAO extends EsDAO implements IInstanc
         return getClient().prepareIndex(InstanceAlarmListTable.TABLE, data.getId()).setSource(source);
     }
 
-    @Override public UpdateRequestBuilder prepareBatchUpdate(InstanceAlarmList data) {
+    @Override
+    public UpdateRequestBuilder prepareBatchUpdate(InstanceAlarmList data) {
         Map<String, Object> source = new HashMap<>();
         source.put(InstanceAlarmListTable.COLUMN_APPLICATION_ID, data.getApplicationId());
         source.put(InstanceAlarmListTable.COLUMN_INSTANCE_ID, data.getInstanceId());
@@ -93,13 +97,14 @@ public class InstanceAlarmListEsPersistenceDAO extends EsDAO implements IInstanc
         return getClient().prepareUpdate(InstanceAlarmListTable.TABLE, data.getId()).setDoc(source);
     }
 
-    @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
+    @Override
+    public void deleteHistory(Long startTimestamp, Long endTimestamp) {
         long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
         long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
-        BulkByScrollResponse response = getClient().prepareDelete()
-            .filter(QueryBuilders.rangeQuery(InstanceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
-            .source(InstanceAlarmListTable.TABLE)
-            .get();
+        BulkByScrollResponse response = getClient().prepareDelete(
+                QueryBuilders.rangeQuery(InstanceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+                InstanceAlarmListTable.TABLE)
+                .get();
 
         long deleted = response.getDeleted();
         logger.info("Delete {} rows history from {} index.", deleted, InstanceAlarmListTable.TABLE);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmEsPersistenceDAO.java
index fd12f43..79d21da 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
 
 import java.util.HashMap;
 import java.util.Map;
+
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
 import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceReferenceAlarmPersistenceDAO;
@@ -45,29 +46,31 @@ public class InstanceReferenceAlarmEsPersistenceDAO extends EsDAO implements IIn
         super(client);
     }
 
-    @Override public InstanceReferenceAlarm get(String id) {
+    @Override
+    public InstanceReferenceAlarm get(String id) {
         GetResponse getResponse = getClient().prepareGet(InstanceReferenceAlarmTable.TABLE, id).get();
         if (getResponse.isExists()) {
             InstanceReferenceAlarm instanceReferenceAlarm = new InstanceReferenceAlarm();
             instanceReferenceAlarm.setId(id);
             Map<String, Object> source = getResponse.getSource();
-            instanceReferenceAlarm.setFrontApplicationId(((Number)source.get(InstanceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
-            instanceReferenceAlarm.setBehindApplicationId(((Number)source.get(InstanceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
-            instanceReferenceAlarm.setFrontInstanceId(((Number)source.get(InstanceReferenceAlarmTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
-            instanceReferenceAlarm.setBehindInstanceId(((Number)source.get(InstanceReferenceAlarmTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
-            instanceReferenceAlarm.setSourceValue(((Number)source.get(InstanceReferenceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
+            instanceReferenceAlarm.setFrontApplicationId(((Number) source.get(InstanceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
+            instanceReferenceAlarm.setBehindApplicationId(((Number) source.get(InstanceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
+            instanceReferenceAlarm.setFrontInstanceId(((Number) source.get(InstanceReferenceAlarmTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
+            instanceReferenceAlarm.setBehindInstanceId(((Number) source.get(InstanceReferenceAlarmTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
+            instanceReferenceAlarm.setSourceValue(((Number) source.get(InstanceReferenceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
 
-            instanceReferenceAlarm.setAlarmType(((Number)source.get(InstanceReferenceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
-            instanceReferenceAlarm.setAlarmContent((String)source.get(InstanceReferenceAlarmTable.COLUMN_ALARM_CONTENT));
+            instanceReferenceAlarm.setAlarmType(((Number) source.get(InstanceReferenceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
+            instanceReferenceAlarm.setAlarmContent((String) source.get(InstanceReferenceAlarmTable.COLUMN_ALARM_CONTENT));
 
-            instanceReferenceAlarm.setLastTimeBucket(((Number)source.get(InstanceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
+            instanceReferenceAlarm.setLastTimeBucket(((Number) source.get(InstanceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
             return instanceReferenceAlarm;
         } else {
             return null;
         }
     }
 
-    @Override public IndexRequestBuilder prepareBatchInsert(InstanceReferenceAlarm data) {
+    @Override
+    public IndexRequestBuilder prepareBatchInsert(InstanceReferenceAlarm data) {
         Map<String, Object> source = new HashMap<>();
         source.put(InstanceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
         source.put(InstanceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
@@ -83,7 +86,8 @@ public class InstanceReferenceAlarmEsPersistenceDAO extends EsDAO implements IIn
         return getClient().prepareIndex(InstanceReferenceAlarmTable.TABLE, data.getId()).setSource(source);
     }
 
-    @Override public UpdateRequestBuilder prepareBatchUpdate(InstanceReferenceAlarm data) {
+    @Override
+    public UpdateRequestBuilder prepareBatchUpdate(InstanceReferenceAlarm data) {
         Map<String, Object> source = new HashMap<>();
         source.put(InstanceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
         source.put(InstanceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
@@ -99,13 +103,14 @@ public class InstanceReferenceAlarmEsPersistenceDAO extends EsDAO implements IIn
         return getClient().prepareUpdate(InstanceReferenceAlarmTable.TABLE, data.getId()).setDoc(source);
     }
 
-    @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
+    @Override
+    public void deleteHistory(Long startTimestamp, Long endTimestamp) {
         long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
         long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
-        BulkByScrollResponse response = getClient().prepareDelete()
-            .filter(QueryBuilders.rangeQuery(InstanceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
-            .source(InstanceReferenceAlarmTable.TABLE)
-            .get();
+        BulkByScrollResponse response = getClient().prepareDelete(
+                QueryBuilders.rangeQuery(InstanceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+                InstanceReferenceAlarmTable.TABLE)
+                .get();
 
         long deleted = response.getDeleted();
         logger.info("Delete {} rows history from {} index.", deleted, InstanceReferenceAlarmTable.TABLE);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmListEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmListEsPersistenceDAO.java
index 7738849..2c93cbb 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmListEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmListEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
 
 import java.util.HashMap;
 import java.util.Map;
+
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
 import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceReferenceAlarmListPersistenceDAO;
@@ -45,29 +46,31 @@ public class InstanceReferenceAlarmListEsPersistenceDAO extends EsDAO implements
         super(client);
     }
 
-    @Override public InstanceReferenceAlarmList get(String id) {
+    @Override
+    public InstanceReferenceAlarmList get(String id) {
         GetResponse getResponse = getClient().prepareGet(InstanceReferenceAlarmListTable.TABLE, id).get();
         if (getResponse.isExists()) {
             InstanceReferenceAlarmList serviceReferenceAlarmList = new InstanceReferenceAlarmList();
             serviceReferenceAlarmList.setId(id);
             Map<String, Object> source = getResponse.getSource();
-            serviceReferenceAlarmList.setFrontApplicationId(((Number)source.get(InstanceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
-            serviceReferenceAlarmList.setBehindApplicationId(((Number)source.get(InstanceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
-            serviceReferenceAlarmList.setFrontInstanceId(((Number)source.get(InstanceReferenceAlarmListTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
-            serviceReferenceAlarmList.setBehindInstanceId(((Number)source.get(InstanceReferenceAlarmListTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
-            serviceReferenceAlarmList.setSourceValue(((Number)source.get(InstanceReferenceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
+            serviceReferenceAlarmList.setFrontApplicationId(((Number) source.get(InstanceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
+            serviceReferenceAlarmList.setBehindApplicationId(((Number) source.get(InstanceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
+            serviceReferenceAlarmList.setFrontInstanceId(((Number) source.get(InstanceReferenceAlarmListTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
+            serviceReferenceAlarmList.setBehindInstanceId(((Number) source.get(InstanceReferenceAlarmListTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
+            serviceReferenceAlarmList.setSourceValue(((Number) source.get(InstanceReferenceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
 
-            serviceReferenceAlarmList.setAlarmType(((Number)source.get(InstanceReferenceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
-            serviceReferenceAlarmList.setAlarmContent((String)source.get(InstanceReferenceAlarmListTable.COLUMN_ALARM_CONTENT));
+            serviceReferenceAlarmList.setAlarmType(((Number) source.get(InstanceReferenceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
+            serviceReferenceAlarmList.setAlarmContent((String) source.get(InstanceReferenceAlarmListTable.COLUMN_ALARM_CONTENT));
 
-            serviceReferenceAlarmList.setTimeBucket(((Number)source.get(InstanceReferenceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
+            serviceReferenceAlarmList.setTimeBucket(((Number) source.get(InstanceReferenceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
             return serviceReferenceAlarmList;
         } else {
             return null;
         }
     }
 
-    @Override public IndexRequestBuilder prepareBatchInsert(InstanceReferenceAlarmList data) {
+    @Override
+    public IndexRequestBuilder prepareBatchInsert(InstanceReferenceAlarmList data) {
         Map<String, Object> source = new HashMap<>();
         source.put(InstanceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
         source.put(InstanceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
@@ -83,7 +86,8 @@ public class InstanceReferenceAlarmListEsPersistenceDAO extends EsDAO implements
         return getClient().prepareIndex(InstanceReferenceAlarmListTable.TABLE, data.getId()).setSource(source);
     }
 
-    @Override public UpdateRequestBuilder prepareBatchUpdate(InstanceReferenceAlarmList data) {
+    @Override
+    public UpdateRequestBuilder prepareBatchUpdate(InstanceReferenceAlarmList data) {
         Map<String, Object> source = new HashMap<>();
         source.put(InstanceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
         source.put(InstanceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
@@ -99,13 +103,14 @@ public class InstanceReferenceAlarmListEsPersistenceDAO extends EsDAO implements
         return getClient().prepareUpdate(InstanceReferenceAlarmListTable.TABLE, data.getId()).setDoc(source);
     }
 
-    @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
+    @Override
+    public void deleteHistory(Long startTimestamp, Long endTimestamp) {
         long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
         long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
-        BulkByScrollResponse response = getClient().prepareDelete()
-            .filter(QueryBuilders.rangeQuery(InstanceReferenceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
-            .source(InstanceReferenceAlarmListTable.TABLE)
-            .get();
+        BulkByScrollResponse response = getClient().prepareDelete(
+                QueryBuilders.rangeQuery(InstanceReferenceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+                InstanceReferenceAlarmListTable.TABLE)
+                .get();
 
         long deleted = response.getDeleted();
         logger.info("Delete {} rows history from {} index.", deleted, InstanceReferenceAlarmListTable.TABLE);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmEsPersistenceDAO.java
index 44d8265..a48a79d 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
 
 import java.util.HashMap;
 import java.util.Map;
+
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
 import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceAlarmPersistenceDAO;
@@ -45,28 +46,30 @@ public class ServiceAlarmEsPersistenceDAO extends EsDAO implements IServiceAlarm
         super(client);
     }
 
-    @Override public ServiceAlarm get(String id) {
+    @Override
+    public ServiceAlarm get(String id) {
         GetResponse getResponse = getClient().prepareGet(ServiceAlarmTable.TABLE, id).get();
         if (getResponse.isExists()) {
             ServiceAlarm serviceAlarm = new ServiceAlarm();
             serviceAlarm.setId(id);
             Map<String, Object> source = getResponse.getSource();
-            serviceAlarm.setApplicationId(((Number)source.get(ServiceAlarmTable.COLUMN_APPLICATION_ID)).intValue());
-            serviceAlarm.setInstanceId(((Number)source.get(ServiceAlarmTable.COLUMN_INSTANCE_ID)).intValue());
-            serviceAlarm.setServiceId(((Number)source.get(ServiceAlarmTable.COLUMN_SERVICE_ID)).intValue());
-            serviceAlarm.setSourceValue(((Number)source.get(ServiceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
+            serviceAlarm.setApplicationId(((Number) source.get(ServiceAlarmTable.COLUMN_APPLICATION_ID)).intValue());
+            serviceAlarm.setInstanceId(((Number) source.get(ServiceAlarmTable.COLUMN_INSTANCE_ID)).intValue());
+            serviceAlarm.setServiceId(((Number) source.get(ServiceAlarmTable.COLUMN_SERVICE_ID)).intValue());
+            serviceAlarm.setSourceValue(((Number) source.get(ServiceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
 
-            serviceAlarm.setAlarmType(((Number)source.get(ServiceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
-            serviceAlarm.setAlarmContent((String)source.get(ServiceAlarmTable.COLUMN_ALARM_CONTENT));
+            serviceAlarm.setAlarmType(((Number) source.get(ServiceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
+            serviceAlarm.setAlarmContent((String) source.get(ServiceAlarmTable.COLUMN_ALARM_CONTENT));
 
-            serviceAlarm.setLastTimeBucket(((Number)source.get(ServiceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
+            serviceAlarm.setLastTimeBucket(((Number) source.get(ServiceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
             return serviceAlarm;
         } else {
             return null;
         }
     }
 
-    @Override public IndexRequestBuilder prepareBatchInsert(ServiceAlarm data) {
+    @Override
+    public IndexRequestBuilder prepareBatchInsert(ServiceAlarm data) {
         Map<String, Object> source = new HashMap<>();
         source.put(ServiceAlarmTable.COLUMN_APPLICATION_ID, data.getApplicationId());
         source.put(ServiceAlarmTable.COLUMN_INSTANCE_ID, data.getInstanceId());
@@ -81,7 +84,8 @@ public class ServiceAlarmEsPersistenceDAO extends EsDAO implements IServiceAlarm
         return getClient().prepareIndex(ServiceAlarmTable.TABLE, data.getId()).setSource(source);
     }
 
-    @Override public UpdateRequestBuilder prepareBatchUpdate(ServiceAlarm data) {
+    @Override
+    public UpdateRequestBuilder prepareBatchUpdate(ServiceAlarm data) {
         Map<String, Object> source = new HashMap<>();
         source.put(ServiceAlarmTable.COLUMN_APPLICATION_ID, data.getApplicationId());
         source.put(ServiceAlarmTable.COLUMN_INSTANCE_ID, data.getInstanceId());
@@ -96,13 +100,14 @@ public class ServiceAlarmEsPersistenceDAO extends EsDAO implements IServiceAlarm
         return getClient().prepareUpdate(ServiceAlarmTable.TABLE, data.getId()).setDoc(source);
     }
 
-    @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
+    @Override
+    public void deleteHistory(Long startTimestamp, Long endTimestamp) {
         long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
         long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
-        BulkByScrollResponse response = getClient().prepareDelete()
-            .filter(QueryBuilders.rangeQuery(ServiceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
-            .source(ServiceAlarmTable.TABLE)
-            .get();
+        BulkByScrollResponse response = getClient().prepareDelete(
+                QueryBuilders.rangeQuery(ServiceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+                ServiceAlarmTable.TABLE)
+                .get();
 
         long deleted = response.getDeleted();
         logger.info("Delete {} rows history from {} index.", deleted, ServiceAlarmTable.TABLE);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmListEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmListEsPersistenceDAO.java
index 56bf3c8..6cb8175 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmListEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmListEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
 
 import java.util.HashMap;
 import java.util.Map;
+
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
 import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceAlarmListPersistenceDAO;
@@ -45,28 +46,30 @@ public class ServiceAlarmListEsPersistenceDAO extends EsDAO implements IServiceA
         super(client);
     }
 
-    @Override public ServiceAlarmList get(String id) {
+    @Override
+    public ServiceAlarmList get(String id) {
         GetResponse getResponse = getClient().prepareGet(ServiceAlarmListTable.TABLE, id).get();
         if (getResponse.isExists()) {
             ServiceAlarmList serviceAlarmList = new ServiceAlarmList();
             serviceAlarmList.setId(id);
             Map<String, Object> source = getResponse.getSource();
-            serviceAlarmList.setApplicationId(((Number)source.get(ServiceAlarmListTable.COLUMN_APPLICATION_ID)).intValue());
-            serviceAlarmList.setInstanceId(((Number)source.get(ServiceAlarmListTable.COLUMN_INSTANCE_ID)).intValue());
-            serviceAlarmList.setServiceId(((Number)source.get(ServiceAlarmListTable.COLUMN_SERVICE_ID)).intValue());
-            serviceAlarmList.setSourceValue(((Number)source.get(ServiceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
+            serviceAlarmList.setApplicationId(((Number) source.get(ServiceAlarmListTable.COLUMN_APPLICATION_ID)).intValue());
+            serviceAlarmList.setInstanceId(((Number) source.get(ServiceAlarmListTable.COLUMN_INSTANCE_ID)).intValue());
+            serviceAlarmList.setServiceId(((Number) source.get(ServiceAlarmListTable.COLUMN_SERVICE_ID)).intValue());
+            serviceAlarmList.setSourceValue(((Number) source.get(ServiceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
 
-            serviceAlarmList.setAlarmType(((Number)source.get(ServiceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
-            serviceAlarmList.setAlarmContent((String)source.get(ServiceAlarmListTable.COLUMN_ALARM_CONTENT));
+            serviceAlarmList.setAlarmType(((Number) source.get(ServiceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
+            serviceAlarmList.setAlarmContent((String) source.get(ServiceAlarmListTable.COLUMN_ALARM_CONTENT));
 
-            serviceAlarmList.setTimeBucket(((Number)source.get(ServiceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
+            serviceAlarmList.setTimeBucket(((Number) source.get(ServiceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
             return serviceAlarmList;
         } else {
             return null;
         }
     }
 
-    @Override public IndexRequestBuilder prepareBatchInsert(ServiceAlarmList data) {
+    @Override
+    public IndexRequestBuilder prepareBatchInsert(ServiceAlarmList data) {
         Map<String, Object> source = new HashMap<>();
         source.put(ServiceAlarmListTable.COLUMN_APPLICATION_ID, data.getApplicationId());
         source.put(ServiceAlarmListTable.COLUMN_INSTANCE_ID, data.getInstanceId());
@@ -81,7 +84,8 @@ public class ServiceAlarmListEsPersistenceDAO extends EsDAO implements IServiceA
         return getClient().prepareIndex(ServiceAlarmListTable.TABLE, data.getId()).setSource(source);
     }
 
-    @Override public UpdateRequestBuilder prepareBatchUpdate(ServiceAlarmList data) {
+    @Override
+    public UpdateRequestBuilder prepareBatchUpdate(ServiceAlarmList data) {
         Map<String, Object> source = new HashMap<>();
         source.put(ServiceAlarmListTable.COLUMN_APPLICATION_ID, data.getApplicationId());
         source.put(ServiceAlarmListTable.COLUMN_INSTANCE_ID, data.getInstanceId());
@@ -96,13 +100,14 @@ public class ServiceAlarmListEsPersistenceDAO extends EsDAO implements IServiceA
         return getClient().prepareUpdate(ServiceAlarmListTable.TABLE, data.getId()).setDoc(source);
     }
 
-    @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
+    @Override
+    public void deleteHistory(Long startTimestamp, Long endTimestamp) {
         long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
         long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
-        BulkByScrollResponse response = getClient().prepareDelete()
-            .filter(QueryBuilders.rangeQuery(ServiceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
-            .source(ServiceAlarmListTable.TABLE)
-            .get();
+        BulkByScrollResponse response = getClient().prepareDelete(
+                QueryBuilders.rangeQuery(ServiceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+                ServiceAlarmListTable.TABLE)
+                .get();
 
         long deleted = response.getDeleted();
         logger.info("Delete {} rows history from {} index.", deleted, ServiceAlarmListTable.TABLE);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmEsPersistenceDAO.java
index 6934afc..9400024 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
 
 import java.util.HashMap;
 import java.util.Map;
+
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
 import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceReferenceAlarmPersistenceDAO;
@@ -45,31 +46,33 @@ public class ServiceReferenceAlarmEsPersistenceDAO extends EsDAO implements ISer
         super(client);
     }
 
-    @Override public ServiceReferenceAlarm get(String id) {
+    @Override
+    public ServiceReferenceAlarm get(String id) {
         GetResponse getResponse = getClient().prepareGet(ServiceReferenceAlarmTable.TABLE, id).get();
         if (getResponse.isExists()) {
             ServiceReferenceAlarm serviceReferenceAlarm = new ServiceReferenceAlarm();
             serviceReferenceAlarm.setId(id);
             Map<String, Object> source = getResponse.getSource();
-            serviceReferenceAlarm.setFrontApplicationId(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
-            serviceReferenceAlarm.setBehindApplicationId(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
-            serviceReferenceAlarm.setFrontInstanceId(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
-            serviceReferenceAlarm.setBehindInstanceId(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
-            serviceReferenceAlarm.setFrontServiceId(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_FRONT_SERVICE_ID)).intValue());
-            serviceReferenceAlarm.setBehindServiceId(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_BEHIND_SERVICE_ID)).intValue());
-            serviceReferenceAlarm.setSourceValue(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
-
-            serviceReferenceAlarm.setAlarmType(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
-            serviceReferenceAlarm.setAlarmContent((String)source.get(ServiceReferenceAlarmTable.COLUMN_ALARM_CONTENT));
-
-            serviceReferenceAlarm.setLastTimeBucket(((Number)source.get(ServiceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
+            serviceReferenceAlarm.setFrontApplicationId(((Number) source.get(ServiceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
+            serviceReferenceAlarm.setBehindApplicationId(((Number) source.get(ServiceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
+            serviceReferenceAlarm.setFrontInstanceId(((Number) source.get(ServiceReferenceAlarmTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
+            serviceReferenceAlarm.setBehindInstanceId(((Number) source.get(ServiceReferenceAlarmTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
+            serviceReferenceAlarm.setFrontServiceId(((Number) source.get(ServiceReferenceAlarmTable.COLUMN_FRONT_SERVICE_ID)).intValue());
+            serviceReferenceAlarm.setBehindServiceId(((Number) source.get(ServiceReferenceAlarmTable.COLUMN_BEHIND_SERVICE_ID)).intValue());
+            serviceReferenceAlarm.setSourceValue(((Number) source.get(ServiceReferenceAlarmTable.COLUMN_SOURCE_VALUE)).intValue());
+
+            serviceReferenceAlarm.setAlarmType(((Number) source.get(ServiceReferenceAlarmTable.COLUMN_ALARM_TYPE)).intValue());
+            serviceReferenceAlarm.setAlarmContent((String) source.get(ServiceReferenceAlarmTable.COLUMN_ALARM_CONTENT));
+
+            serviceReferenceAlarm.setLastTimeBucket(((Number) source.get(ServiceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET)).longValue());
             return serviceReferenceAlarm;
         } else {
             return null;
         }
     }
 
-    @Override public IndexRequestBuilder prepareBatchInsert(ServiceReferenceAlarm data) {
+    @Override
+    public IndexRequestBuilder prepareBatchInsert(ServiceReferenceAlarm data) {
         Map<String, Object> source = new HashMap<>();
         source.put(ServiceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
         source.put(ServiceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
@@ -87,7 +90,8 @@ public class ServiceReferenceAlarmEsPersistenceDAO extends EsDAO implements ISer
         return getClient().prepareIndex(ServiceReferenceAlarmTable.TABLE, data.getId()).setSource(source);
     }
 
-    @Override public UpdateRequestBuilder prepareBatchUpdate(ServiceReferenceAlarm data) {
+    @Override
+    public UpdateRequestBuilder prepareBatchUpdate(ServiceReferenceAlarm data) {
         Map<String, Object> source = new HashMap<>();
         source.put(ServiceReferenceAlarmTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
         source.put(ServiceReferenceAlarmTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
@@ -105,13 +109,14 @@ public class ServiceReferenceAlarmEsPersistenceDAO extends EsDAO implements ISer
         return getClient().prepareUpdate(ServiceReferenceAlarmTable.TABLE, data.getId()).setDoc(source);
     }
 
-    @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
+    @Override
+    public void deleteHistory(Long startTimestamp, Long endTimestamp) {
         long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
         long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
-        BulkByScrollResponse response = getClient().prepareDelete()
-            .filter(QueryBuilders.rangeQuery(ServiceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
-            .source(ServiceReferenceAlarmTable.TABLE)
-            .get();
+        BulkByScrollResponse response = getClient().prepareDelete(
+                QueryBuilders.rangeQuery(ServiceReferenceAlarmTable.COLUMN_LAST_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+                ServiceReferenceAlarmTable.TABLE)
+                .get();
 
         long deleted = response.getDeleted();
         logger.info("Delete {} rows history from {} index.", deleted, ServiceReferenceAlarmTable.TABLE);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmListEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmListEsPersistenceDAO.java
index b3a8e4e..3014ca1 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmListEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmListEsPersistenceDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
 
 import java.util.HashMap;
 import java.util.Map;
+
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
 import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceReferenceAlarmListPersistenceDAO;
@@ -45,31 +46,33 @@ public class ServiceReferenceAlarmListEsPersistenceDAO extends EsDAO implements
         super(client);
     }
 
-    @Override public ServiceReferenceAlarmList get(String id) {
+    @Override
+    public ServiceReferenceAlarmList get(String id) {
         GetResponse getResponse = getClient().prepareGet(ServiceReferenceAlarmListTable.TABLE, id).get();
         if (getResponse.isExists()) {
             ServiceReferenceAlarmList serviceReferenceAlarmList = new ServiceReferenceAlarmList();
             serviceReferenceAlarmList.setId(id);
             Map<String, Object> source = getResponse.getSource();
-            serviceReferenceAlarmList.setFrontApplicationId(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
-            serviceReferenceAlarmList.setBehindApplicationId(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
-            serviceReferenceAlarmList.setFrontInstanceId(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
-            serviceReferenceAlarmList.setBehindInstanceId(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
-            serviceReferenceAlarmList.setFrontServiceId(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_FRONT_SERVICE_ID)).intValue());
-            serviceReferenceAlarmList.setBehindServiceId(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_BEHIND_SERVICE_ID)).intValue());
-            serviceReferenceAlarmList.setSourceValue(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
-
-            serviceReferenceAlarmList.setAlarmType(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
-            serviceReferenceAlarmList.setAlarmContent((String)source.get(ServiceReferenceAlarmListTable.COLUMN_ALARM_CONTENT));
-
-            serviceReferenceAlarmList.setTimeBucket(((Number)source.get(ServiceReferenceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
+            serviceReferenceAlarmList.setFrontApplicationId(((Number) source.get(ServiceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
+            serviceReferenceAlarmList.setBehindApplicationId(((Number) source.get(ServiceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
+            serviceReferenceAlarmList.setFrontInstanceId(((Number) source.get(ServiceReferenceAlarmListTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
+            serviceReferenceAlarmList.setBehindInstanceId(((Number) source.get(ServiceReferenceAlarmListTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
+            serviceReferenceAlarmList.setFrontServiceId(((Number) source.get(ServiceReferenceAlarmListTable.COLUMN_FRONT_SERVICE_ID)).intValue());
+            serviceReferenceAlarmList.setBehindServiceId(((Number) source.get(ServiceReferenceAlarmListTable.COLUMN_BEHIND_SERVICE_ID)).intValue());
+            serviceReferenceAlarmList.setSourceValue(((Number) source.get(ServiceReferenceAlarmListTable.COLUMN_SOURCE_VALUE)).intValue());
+
+            serviceReferenceAlarmList.setAlarmType(((Number) source.get(ServiceReferenceAlarmListTable.COLUMN_ALARM_TYPE)).intValue());
+            serviceReferenceAlarmList.setAlarmContent((String) source.get(ServiceReferenceAlarmListTable.COLUMN_ALARM_CONTENT));
+
+            serviceReferenceAlarmList.setTimeBucket(((Number) source.get(ServiceReferenceAlarmListTable.COLUMN_TIME_BUCKET)).longValue());
             return serviceReferenceAlarmList;
         } else {
             return null;
         }
     }
 
-    @Override public IndexRequestBuilder prepareBatchInsert(ServiceReferenceAlarmList data) {
+    @Override
+    public IndexRequestBuilder prepareBatchInsert(ServiceReferenceAlarmList data) {
         Map<String, Object> source = new HashMap<>();
         source.put(ServiceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
         source.put(ServiceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
@@ -87,7 +90,8 @@ public class ServiceReferenceAlarmListEsPersistenceDAO extends EsDAO implements
         return getClient().prepareIndex(ServiceReferenceAlarmListTable.TABLE, data.getId()).setSource(source);
     }
 
-    @Override public UpdateRequestBuilder prepareBatchUpdate(ServiceReferenceAlarmList data) {
+    @Override
+    public UpdateRequestBuilder prepareBatchUpdate(ServiceReferenceAlarmList data) {
         Map<String, Object> source = new HashMap<>();
         source.put(ServiceReferenceAlarmListTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
         source.put(ServiceReferenceAlarmListTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
@@ -105,13 +109,14 @@ public class ServiceReferenceAlarmListEsPersistenceDAO extends EsDAO implements
         return getClient().prepareUpdate(ServiceReferenceAlarmListTable.TABLE, data.getId()).setDoc(source);
     }
 
-    @Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
+    @Override
+    public void deleteHistory(Long startTimestamp, Long endTimestamp) {
         long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
         long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
-        BulkByScrollResponse response = getClient().prepareDelete()
-            .filter(QueryBuilders.rangeQuery(ServiceReferenceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
-            .source(ServiceReferenceAlarmListTable.TABLE)
-            .get();
+        BulkByScrollResponse response = getClient().prepareDelete(
+                QueryBuilders.rangeQuery(ServiceReferenceAlarmListTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket),
+                ServiceReferenceAlarmListTable.TABLE)
+                .get();
 
         long deleted = response.getDeleted();
         logger.info("Delete {} rows history from {} index.", deleted, ServiceReferenceAlarmListTable.TABLE);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/mpool/AbstractMemoryPoolMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/mpool/AbstractMemoryPoolMetricEsPersistenceDAO.java
index 2d10fdf..4741f2e 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/mpool/AbstractMemoryPoolMetricEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/mpool/AbstractMemoryPoolMetricEsPersistenceDAO.java
@@ -18,13 +18,14 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.mpool;
 
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
 import org.apache.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetric;
 import org.apache.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetricTable;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * @author peng-yongsheng
  */
@@ -34,28 +35,31 @@ public abstract class AbstractMemoryPoolMetricEsPersistenceDAO extends AbstractP
         super(client);
     }
 
-    @Override protected final String timeBucketColumnNameForDelete() {
+    @Override
+    protected final String timeBucketColumnNameForDelete() {
         return MemoryPoolMetricTable.COLUMN_TIME_BUCKET;
     }
 
-    @Override protected final MemoryPoolMetric esDataToStreamData(Map<String, Object> source) {
+    @Override
+    protected final MemoryPoolMetric esDataToStreamData(Map<String, Object> source) {
         MemoryPoolMetric memoryPoolMetric = new MemoryPoolMetric();
-        memoryPoolMetric.setMetricId((String)source.get(MemoryPoolMetricTable.COLUMN_METRIC_ID));
+        memoryPoolMetric.setMetricId((String) source.get(MemoryPoolMetricTable.COLUMN_METRIC_ID));
 
-        memoryPoolMetric.setInstanceId(((Number)source.get(MemoryPoolMetricTable.COLUMN_INSTANCE_ID)).intValue());
-        memoryPoolMetric.setPoolType(((Number)source.get(MemoryPoolMetricTable.COLUMN_POOL_TYPE)).intValue());
+        memoryPoolMetric.setInstanceId(((Number) source.get(MemoryPoolMetricTable.COLUMN_INSTANCE_ID)).intValue());
+        memoryPoolMetric.setPoolType(((Number) source.get(MemoryPoolMetricTable.COLUMN_POOL_TYPE)).intValue());
 
-        memoryPoolMetric.setInit(((Number)source.get(MemoryPoolMetricTable.COLUMN_INIT)).longValue());
-        memoryPoolMetric.setMax(((Number)source.get(MemoryPoolMetricTable.COLUMN_MAX)).longValue());
-        memoryPoolMetric.setUsed(((Number)source.get(MemoryPoolMetricTable.COLUMN_USED)).longValue());
-        memoryPoolMetric.setCommitted(((Number)source.get(MemoryPoolMetricTable.COLUMN_COMMITTED)).longValue());
-        memoryPoolMetric.setTimes(((Number)source.get(MemoryPoolMetricTable.COLUMN_TIMES)).longValue());
+        memoryPoolMetric.setInit(((Number) source.get(MemoryPoolMetricTable.COLUMN_INIT)).longValue());
+        memoryPoolMetric.setMax(((Number) source.get(MemoryPoolMetricTable.COLUMN_MAX)).longValue());
+        memoryPoolMetric.setUsed(((Number) source.get(MemoryPoolMetricTable.COLUMN_USED)).longValue());
+        memoryPoolMetric.setCommitted(((Number) source.get(MemoryPoolMetricTable.COLUMN_COMMITTED)).longValue());
+        memoryPoolMetric.setTimes(((Number) source.get(MemoryPoolMetricTable.COLUMN_TIMES)).longValue());
 
-        memoryPoolMetric.setTimeBucket(((Number)source.get(MemoryPoolMetricTable.COLUMN_TIME_BUCKET)).longValue());
+        memoryPoolMetric.setTimeBucket(((Number) source.get(MemoryPoolMetricTable.COLUMN_TIME_BUCKET)).longValue());
         return memoryPoolMetric;
     }
 
-    @Override protected final Map<String, Object> esStreamDataToEsData(MemoryPoolMetric streamData) {
+    @Override
+    protected final Map<String, Object> esStreamDataToEsData(MemoryPoolMetric streamData) {
         Map<String, Object> source = new HashMap<>();
         source.put(MemoryPoolMetricTable.COLUMN_METRIC_ID, streamData.getMetricId());
 
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/register/InstanceRegisterEsDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/register/InstanceRegisterEsDAO.java
index 074ec58..93a75dc 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/register/InstanceRegisterEsDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/register/InstanceRegisterEsDAO.java
@@ -18,8 +18,6 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.register;
 
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
 import org.apache.skywalking.apm.collector.storage.dao.register.IInstanceRegisterDAO;
@@ -28,10 +26,13 @@ import org.apache.skywalking.apm.collector.storage.table.register.Instance;
 import org.apache.skywalking.apm.collector.storage.table.register.InstanceTable;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.support.WriteRequest;
-import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.action.update.UpdateRequestBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * @author peng-yongsheng
  */
@@ -43,15 +44,18 @@ public class InstanceRegisterEsDAO extends EsDAO implements IInstanceRegisterDAO
         super(client);
     }
 
-    @Override public int getMaxInstanceId() {
+    @Override
+    public int getMaxInstanceId() {
         return getMaxId(InstanceTable.TABLE, InstanceTable.COLUMN_INSTANCE_ID);
     }
 
-    @Override public int getMinInstanceId() {
+    @Override
+    public int getMinInstanceId() {
         return getMinId(InstanceTable.TABLE, InstanceTable.COLUMN_INSTANCE_ID);
     }
 
-    @Override public void save(Instance instance) {
+    @Override
+    public void save(Instance instance) {
         logger.debug("save instance register info, application getApplicationId: {}, agentUUID: {}", instance.getApplicationId(), instance.getAgentUUID());
         ElasticSearchClient client = getClient();
         Map<String, Object> source = new HashMap<>();
@@ -69,18 +73,14 @@ public class InstanceRegisterEsDAO extends EsDAO implements IInstanceRegisterDAO
         logger.debug("save instance register info, application getApplicationId: {}, agentUUID: {}, status: {}", instance.getApplicationId(), instance.getAgentUUID(), response.status().name());
     }
 
-    @Override public void updateHeartbeatTime(int instanceId, long heartbeatTime) {
-        ElasticSearchClient client = getClient();
-        UpdateRequest updateRequest = new UpdateRequest();
-        updateRequest.index(InstanceTable.TABLE);
-        updateRequest.type(InstanceTable.TABLE_TYPE);
-        updateRequest.id(String.valueOf(instanceId));
-        updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
-
+    @Override
+    public void updateHeartbeatTime(int instanceId, long heartbeatTime) {
+        UpdateRequestBuilder updateRequestBuilder = getClient().prepareUpdate(InstanceTable.TABLE, String.valueOf(instanceId));
+        updateRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
         Map<String, Object> source = new HashMap<>();
         source.put(InstanceTable.COLUMN_HEARTBEAT_TIME, TimeBucketUtils.INSTANCE.getSecondTimeBucket(heartbeatTime));
+        updateRequestBuilder.setDoc(source);
 
-        updateRequest.doc(source);
-        client.update(updateRequest);
+        updateRequestBuilder.get();
     }
 }
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/CpuMetricEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/CpuMetricEsUIDAO.java
index bd07a48..1d4b1ea 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/CpuMetricEsUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/CpuMetricEsUIDAO.java
@@ -18,8 +18,6 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.ui;
 
-import java.util.LinkedList;
-import java.util.List;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.util.Const;
 import org.apache.skywalking.apm.collector.storage.dao.ui.ICpuMetricUIDAO;
@@ -32,6 +30,9 @@ import org.elasticsearch.action.get.MultiGetItemResponse;
 import org.elasticsearch.action.get.MultiGetRequestBuilder;
 import org.elasticsearch.action.get.MultiGetResponse;
 
+import java.util.LinkedList;
+import java.util.List;
+
 /**
  * @author peng-yongsheng
  */
@@ -41,22 +42,26 @@ public class CpuMetricEsUIDAO extends EsDAO implements ICpuMetricUIDAO {
         super(client);
     }
 
-    @Override public List<Integer> getCPUTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
-        MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
+    @Override
+    public List<Integer> getCPUTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
         String tableName = TimePyramidTableNameBuilder.build(step, CpuMetricTable.TABLE);
 
-        durationPoints.forEach(durationPoint -> {
-            String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId;
-            prepareMultiGet.add(tableName, CpuMetricTable.TABLE_TYPE, id);
+        MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet(durationPoints, new ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
+            @Override
+            public void accept(DurationPoint durationPoint) {
+                String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId;
+                this.add(tableName, CpuMetricTable.TABLE_TYPE, id);
+            }
         });
 
+
         List<Integer> cpuTrends = new LinkedList<>();
         MultiGetResponse multiGetResponse = prepareMultiGet.get();
         for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
             if (response.getResponse().isExists()) {
-                double cpuUsed = ((Number)response.getResponse().getSource().get(CpuMetricTable.COLUMN_USAGE_PERCENT)).doubleValue();
-                long times = ((Number)response.getResponse().getSource().get(CpuMetricTable.COLUMN_TIMES)).longValue();
-                cpuTrends.add((int)((cpuUsed / times) * 100));
+                double cpuUsed = ((Number) response.getResponse().getSource().get(CpuMetricTable.COLUMN_USAGE_PERCENT)).doubleValue();
+                long times = ((Number) response.getResponse().getSource().get(CpuMetricTable.COLUMN_TIMES)).longValue();
+                cpuTrends.add((int) ((cpuUsed / times) * 100));
             } else {
                 cpuTrends.add(0);
             }
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/GCMetricEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/GCMetricEsUIDAO.java
index d983bcd..30db5bd 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/GCMetricEsUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/GCMetricEsUIDAO.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.collector.storage.es.dao.ui;
 
 import java.util.LinkedList;
 import java.util.List;
+
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.util.Const;
 import org.apache.skywalking.apm.collector.storage.dao.ui.IGCMetricUIDAO;
@@ -42,30 +43,34 @@ public class GCMetricEsUIDAO extends EsDAO implements IGCMetricUIDAO {
         super(client);
     }
 
-    @Override public List<Integer> getYoungGCTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
+    @Override
+    public List<Integer> getYoungGCTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
         return getGCTrend(instanceId, step, durationPoints, GCPhrase.NEW_VALUE);
     }
 
-    @Override public List<Integer> getOldGCTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
+    @Override
+    public List<Integer> getOldGCTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
         return getGCTrend(instanceId, step, durationPoints, GCPhrase.OLD_VALUE);
     }
 
     private List<Integer> getGCTrend(int instanceId, Step step, List<DurationPoint> durationPoints, int gcPhrase) {
         String tableName = TimePyramidTableNameBuilder.build(step, GCMetricTable.TABLE);
 
-        MultiGetRequestBuilder youngPrepareMultiGet = getClient().prepareMultiGet();
-        durationPoints.forEach(durationPoint -> {
-            String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + gcPhrase;
-            youngPrepareMultiGet.add(tableName, GCMetricTable.TABLE_TYPE, id);
+        MultiGetRequestBuilder youngPrepareMultiGet = getClient().prepareMultiGet(durationPoints, new ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
+            @Override
+            public void accept(DurationPoint durationPoint) {
+                String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + gcPhrase;
+                add(tableName, GCMetricTable.TABLE_TYPE, id);
+            }
         });
 
         List<Integer> gcTrends = new LinkedList<>();
         MultiGetResponse multiGetResponse = youngPrepareMultiGet.get();
         for (MultiGetItemResponse itemResponse : multiGetResponse.getResponses()) {
             if (itemResponse.getResponse().isExists()) {
-                long count = ((Number)itemResponse.getResponse().getSource().get(GCMetricTable.COLUMN_COUNT)).longValue();
-                long times = ((Number)itemResponse.getResponse().getSource().get(GCMetricTable.COLUMN_TIMES)).intValue();
-                gcTrends.add((int)(count / times));
+                long count = ((Number) itemResponse.getResponse().getSource().get(GCMetricTable.COLUMN_COUNT)).longValue();
+                long times = ((Number) itemResponse.getResponse().getSource().get(GCMetricTable.COLUMN_TIMES)).intValue();
+                gcTrends.add((int) (count / times));
             } else {
                 gcTrends.add(0);
             }
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceMetricEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceMetricEsUIDAO.java
index 81cde47..dbac89c 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceMetricEsUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceMetricEsUIDAO.java
@@ -18,8 +18,6 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.ui;
 
-import java.util.LinkedList;
-import java.util.List;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.util.Const;
 import org.apache.skywalking.apm.collector.storage.dao.ui.IInstanceMetricUIDAO;
@@ -44,6 +42,9 @@ import org.elasticsearch.search.aggregations.bucket.terms.Terms;
 import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
 import org.elasticsearch.search.aggregations.metrics.sum.Sum;
 
+import java.util.LinkedList;
+import java.util.List;
+
 /**
  * @author peng-yongsheng
  */
@@ -53,8 +54,9 @@ public class InstanceMetricEsUIDAO extends EsDAO implements IInstanceMetricUIDAO
         super(client);
     }
 
-    @Override public List<AppServerInfo> getServerThroughput(int applicationId, Step step, long startTimeBucket,
-        long endTimeBucket, int secondBetween, int topN, MetricSource metricSource) {
+    @Override
+    public List<AppServerInfo> getServerThroughput(int applicationId, Step step, long startTimeBucket,
+                                                   long endTimeBucket, int secondBetween, int topN, MetricSource metricSource) {
         String tableName = TimePyramidTableNameBuilder.build(step, InstanceMetricTable.TABLE);
 
         SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(tableName);
@@ -82,8 +84,8 @@ public class InstanceMetricEsUIDAO extends EsDAO implements IInstanceMetricUIDAO
         instanceIdTerms.getBuckets().forEach(instanceIdTerm -> {
             int instanceId = instanceIdTerm.getKeyAsNumber().intValue();
             Sum callSum = instanceIdTerm.getAggregations().get(ApplicationMetricTable.COLUMN_TRANSACTION_CALLS);
-            long calls = (long)callSum.getValue();
-            int callsPerSec = (int)(secondBetween == 0 ? 0 : calls / secondBetween);
+            long calls = (long) callSum.getValue();
+            int callsPerSec = (int) (secondBetween == 0 ? 0 : calls / secondBetween);
 
             AppServerInfo appServerInfo = new AppServerInfo();
             appServerInfo.setId(instanceId);
@@ -103,13 +105,15 @@ public class InstanceMetricEsUIDAO extends EsDAO implements IInstanceMetricUIDAO
         }
     }
 
-    @Override public List<Integer> getServerTPSTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
-        MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
+    @Override
+    public List<Integer> getServerTPSTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
         String tableName = TimePyramidTableNameBuilder.build(step, InstanceMetricTable.TABLE);
-
-        durationPoints.forEach(durationPoint -> {
-            String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
-            prepareMultiGet.add(tableName, InstanceMetricTable.TABLE_TYPE, id);
+        MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet(durationPoints, new ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
+            @Override
+            public void accept(DurationPoint durationPoint) {
+                String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
+                add(tableName, InstanceMetricTable.TABLE_TYPE, id);
+            }
         });
 
         List<Integer> throughputTrend = new LinkedList<>();
@@ -118,8 +122,8 @@ public class InstanceMetricEsUIDAO extends EsDAO implements IInstanceMetricUIDAO
         for (int i = 0; i < multiGetResponse.getResponses().length; i++) {
             MultiGetItemResponse response = multiGetResponse.getResponses()[i];
             if (response.getResponse().isExists()) {
-                long callTimes = ((Number)response.getResponse().getSource().get(InstanceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
-                throughputTrend.add((int)(callTimes / durationPoints.get(i).getSecondsBetween()));
+                long callTimes = ((Number) response.getResponse().getSource().get(InstanceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
+                throughputTrend.add((int) (callTimes / durationPoints.get(i).getSecondsBetween()));
             } else {
                 throughputTrend.add(0);
             }
@@ -127,15 +131,19 @@ public class InstanceMetricEsUIDAO extends EsDAO implements IInstanceMetricUIDAO
         return throughputTrend;
     }
 
-    @Override public List<Integer> getResponseTimeTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
-        MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
+    @Override
+    public List<Integer> getResponseTimeTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
         String tableName = TimePyramidTableNameBuilder.build(step, InstanceMetricTable.TABLE);
+        MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet(durationPoints, new ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
+            @Override
+            public void accept(DurationPoint durationPoint) {
+                String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
+                add(tableName, InstanceMetricTable.TABLE_TYPE, id);
+            }
 
-        durationPoints.forEach(durationPoint -> {
-            String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
-            prepareMultiGet.add(tableName, InstanceMetricTable.TABLE_TYPE, id);
         });
 
+
         List<Integer> responseTimeTrends = new LinkedList<>();
         MultiGetResponse multiGetResponse = prepareMultiGet.get();
         for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryMetricEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryMetricEsUIDAO.java
index 6e5e891..fb30adf 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryMetricEsUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryMetricEsUIDAO.java
@@ -18,7 +18,6 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.ui;
 
-import java.util.List;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.util.BooleanUtils;
 import org.apache.skywalking.apm.collector.core.util.Const;
@@ -32,6 +31,8 @@ import org.elasticsearch.action.get.MultiGetItemResponse;
 import org.elasticsearch.action.get.MultiGetRequestBuilder;
 import org.elasticsearch.action.get.MultiGetResponse;
 
+import java.util.List;
+
 /**
  * @author peng-yongsheng
  */
@@ -41,38 +42,42 @@ public class MemoryMetricEsUIDAO extends EsDAO implements IMemoryMetricUIDAO {
         super(client);
     }
 
-    @Override public Trend getHeapMemoryTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
+    @Override
+    public Trend getHeapMemoryTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
         return getMemoryTrend(instanceId, step, durationPoints, true);
     }
 
-    @Override public Trend getNoHeapMemoryTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
+    @Override
+    public Trend getNoHeapMemoryTrend(int instanceId, Step step, List<DurationPoint> durationPoints) {
         return getMemoryTrend(instanceId, step, durationPoints, false);
     }
 
     private Trend getMemoryTrend(int instanceId, Step step, List<DurationPoint> durationPoints,
-        boolean isHeap) {
+                                 boolean isHeap) {
         String tableName = TimePyramidTableNameBuilder.build(step, MemoryMetricTable.TABLE);
-        MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
+        MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet(durationPoints, new ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
+            @Override
+            public void accept(DurationPoint durationPoint) {
+                String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + BooleanUtils.booleanToValue(isHeap);
+                add(tableName, MemoryMetricTable.TABLE_TYPE, id);
+            }
 
-        durationPoints.forEach(durationPoint -> {
-            String id = durationPoint.getPoint() + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + BooleanUtils.booleanToValue(isHeap);
-            prepareMultiGet.add(tableName, MemoryMetricTable.TABLE_TYPE, id);
         });
 
         Trend trend = new Trend();
         MultiGetResponse multiGetResponse = prepareMultiGet.get();
         for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
             if (response.getResponse().isExists()) {
-                long max = ((Number)response.getResponse().getSource().get(MemoryMetricTable.COLUMN_MAX)).longValue();
-                long used = ((Number)response.getResponse().getSource().get(MemoryMetricTable.COLUMN_USED)).longValue();
-                long times = ((Number)response.getResponse().getSource().get(MemoryMetricTable.COLUMN_TIMES)).longValue();
+                long max = ((Number) response.getResponse().getSource().get(MemoryMetricTable.COLUMN_MAX)).longValue();
+                long used = ((Number) response.getResponse().getSource().get(MemoryMetricTable.COLUMN_USED)).longValue();
+                long times = ((Number) response.getResponse().getSource().get(MemoryMetricTable.COLUMN_TIMES)).longValue();
 
-                trend.getMetrics().add((int)(used / times));
+                trend.getMetrics().add((int) (used / times));
 
                 if (max < 0) {
-                    trend.getMaxMetrics().add((int)(used / times));
+                    trend.getMaxMetrics().add((int) (used / times));
                 } else {
-                    trend.getMaxMetrics().add((int)(max / times));
+                    trend.getMaxMetrics().add((int) (max / times));
                 }
             } else {
                 trend.getMetrics().add(0);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryPoolMetricEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryPoolMetricEsUIDAO.java
index 2201b43..5f769b7 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryPoolMetricEsUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/MemoryPoolMetricEsUIDAO.java
@@ -18,17 +18,14 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.ui;
 
-import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.apm.collector.core.UnexpectedException;
 import org.apache.skywalking.apm.collector.core.util.Const;
 import org.apache.skywalking.apm.collector.storage.dao.ui.IMemoryPoolMetricUIDAO;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO;
 import org.apache.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetricTable;
 import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.get.MultiGetItemResponse;
-import org.elasticsearch.action.get.MultiGetRequestBuilder;
-import org.elasticsearch.action.get.MultiGetResponse;
 
 /**
  * @author peng-yongsheng
@@ -39,15 +36,16 @@ public class MemoryPoolMetricEsUIDAO extends EsDAO implements IMemoryPoolMetricU
         super(client);
     }
 
-    @Override public JsonObject getMetric(int instanceId, long timeBucket, int poolType) {
+    @Override
+    public JsonObject getMetric(int instanceId, long timeBucket, int poolType) {
         String id = timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + poolType;
         GetResponse getResponse = getClient().prepareGet(MemoryPoolMetricTable.TABLE, id).get();
 
         JsonObject metric = new JsonObject();
         if (getResponse.isExists()) {
-            metric.addProperty("max", ((Number)getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_MAX)).intValue());
-            metric.addProperty("init", ((Number)getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_INIT)).intValue());
-            metric.addProperty("used", ((Number)getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_USED)).intValue());
+            metric.addProperty("max", ((Number) getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_MAX)).intValue());
+            metric.addProperty("init", ((Number) getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_INIT)).intValue());
+            metric.addProperty("used", ((Number) getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_USED)).intValue());
         } else {
             metric.addProperty("max", 0);
             metric.addProperty("init", 0);
@@ -56,32 +54,8 @@ public class MemoryPoolMetricEsUIDAO extends EsDAO implements IMemoryPoolMetricU
         return metric;
     }
 
-    @Override public JsonObject getMetric(int instanceId, long startTimeBucket, long endTimeBucket, int poolType) {
-        MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
-
-        long timeBucket = startTimeBucket;
-        do {
-//            timeBucket = TimeBucketUtils.INSTANCE.addSecondForSecondTimeBucket(TimeBucketUtils.TimeBucketType.SECOND, timeBucket, 1);
-            String id = timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + poolType;
-            prepareMultiGet.add(MemoryPoolMetricTable.TABLE, MemoryPoolMetricTable.TABLE_TYPE, id);
-        }
-        while (timeBucket <= endTimeBucket);
-
-        JsonObject metric = new JsonObject();
-        JsonArray usedMetric = new JsonArray();
-        MultiGetResponse multiGetResponse = prepareMultiGet.get();
-        for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
-            if (response.getResponse().isExists()) {
-                metric.addProperty("max", ((Number)response.getResponse().getSource().get(MemoryPoolMetricTable.COLUMN_MAX)).longValue());
-                metric.addProperty("init", ((Number)response.getResponse().getSource().get(MemoryPoolMetricTable.COLUMN_INIT)).longValue());
-                usedMetric.add(((Number)response.getResponse().getSource().get(MemoryPoolMetricTable.COLUMN_USED)).longValue());
-            } else {
-                metric.addProperty("max", 0);
-                metric.addProperty("init", 0);
-                usedMetric.add(0);
-            }
-        }
-        metric.add("used", usedMetric);
-        return metric;
+    @Override
+    public JsonObject getMetric(int instanceId, long startTimeBucket, long endTimeBucket, int poolType) {
+        throw new UnexpectedException("Not implement methodø");
     }
 }
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceMetricEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceMetricEsUIDAO.java
index 96761a1..34ae413 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceMetricEsUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ServiceMetricEsUIDAO.java
@@ -18,11 +18,6 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.ui;
 
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.util.Const;
 import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceMetricUIDAO;
@@ -51,6 +46,8 @@ import org.elasticsearch.search.aggregations.metrics.sum.Sum;
 import org.elasticsearch.search.sort.SortBuilders;
 import org.elasticsearch.search.sort.SortOrder;
 
+import java.util.*;
+
 /**
  * @author peng-yongsheng
  */
@@ -62,23 +59,24 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO {
 
     @Override
     public List<Integer> getServiceResponseTimeTrend(int serviceId, Step step, List<DurationPoint> durationPoints) {
-        MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
         String tableName = TimePyramidTableNameBuilder.build(step, ServiceMetricTable.TABLE);
-
-        durationPoints.forEach(durationPoint -> {
-            String id = durationPoint.getPoint() + Const.ID_SPLIT + serviceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
-            prepareMultiGet.add(tableName, ServiceMetricTable.TABLE_TYPE, id);
+        MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet(durationPoints, new ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
+            @Override
+            public void accept(DurationPoint durationPoint) {
+                String id = durationPoint.getPoint() + Const.ID_SPLIT + serviceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
+                add(tableName, ServiceMetricTable.TABLE_TYPE, id);
+            }
         });
 
         List<Integer> trends = new LinkedList<>();
         MultiGetResponse multiGetResponse = prepareMultiGet.get();
         for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
             if (response.getResponse().isExists()) {
-                long calls = ((Number)response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
-                long errorCalls = ((Number)response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)).longValue();
-                long durationSum = ((Number)response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM)).longValue();
-                long errorDurationSum = ((Number)response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM)).longValue();
-                trends.add((int)((durationSum - errorDurationSum) / (calls - errorCalls)));
+                long calls = ((Number) response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
+                long errorCalls = ((Number) response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)).longValue();
+                long durationSum = ((Number) response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_DURATION_SUM)).longValue();
+                long errorDurationSum = ((Number) response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM)).longValue();
+                trends.add((int) ((durationSum - errorDurationSum) / (calls - errorCalls)));
             } else {
                 trends.add(0);
             }
@@ -86,13 +84,15 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO {
         return trends;
     }
 
-    @Override public List<Integer> getServiceTPSTrend(int serviceId, Step step, List<DurationPoint> durationPoints) {
-        MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
+    @Override
+    public List<Integer> getServiceTPSTrend(int serviceId, Step step, List<DurationPoint> durationPoints) {
         String tableName = TimePyramidTableNameBuilder.build(step, ServiceMetricTable.TABLE);
-
-        durationPoints.forEach(durationPoint -> {
-            String id = durationPoint.getPoint() + Const.ID_SPLIT + serviceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
-            prepareMultiGet.add(tableName, ServiceMetricTable.TABLE_TYPE, id);
+        MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet(durationPoints, new ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
+            @Override
+            public void accept(DurationPoint durationPoint) {
+                String id = durationPoint.getPoint() + Const.ID_SPLIT + serviceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
+                add(tableName, ServiceMetricTable.TABLE_TYPE, id);
+            }
         });
 
         List<Integer> trends = new LinkedList<>();
@@ -101,9 +101,9 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO {
         int index = 0;
         for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
             if (response.getResponse().isExists()) {
-                long calls = ((Number)response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
+                long calls = ((Number) response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
                 long secondBetween = durationPoints.get(index).getSecondsBetween();
-                trends.add((int)(calls / secondBetween));
+                trends.add((int) (calls / secondBetween));
             } else {
                 trends.add(0);
             }
@@ -112,22 +112,24 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO {
         return trends;
     }
 
-    @Override public List<Integer> getServiceSLATrend(int serviceId, Step step, List<DurationPoint> durationPoints) {
-        MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet();
+    @Override
+    public List<Integer> getServiceSLATrend(int serviceId, Step step, List<DurationPoint> durationPoints) {
         String tableName = TimePyramidTableNameBuilder.build(step, ServiceMetricTable.TABLE);
-
-        durationPoints.forEach(durationPoint -> {
-            String id = durationPoint.getPoint() + Const.ID_SPLIT + serviceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
-            prepareMultiGet.add(tableName, ServiceMetricTable.TABLE_TYPE, id);
+        MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet(durationPoints, new ElasticSearchClient.MultiGetRowHandler<DurationPoint>() {
+            @Override
+            public void accept(DurationPoint durationPoint) {
+                String id = durationPoint.getPoint() + Const.ID_SPLIT + serviceId + Const.ID_SPLIT + MetricSource.Callee.getValue();
+                add(tableName, ServiceMetricTable.TABLE_TYPE, id);
+            }
         });
 
         List<Integer> trends = new LinkedList<>();
         MultiGetResponse multiGetResponse = prepareMultiGet.get();
         for (MultiGetItemResponse response : multiGetResponse.getResponses()) {
             if (response.getResponse().isExists()) {
-                long calls = ((Number)response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
-                long errorCalls = ((Number)response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)).longValue();
-                trends.add((int)(((calls - errorCalls) / calls)) * 10000);
+                long calls = ((Number) response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue();
+                long errorCalls = ((Number) response.getResponse().getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)).longValue();
+                trends.add((int) (((calls - errorCalls) / calls)) * 10000);
             } else {
                 trends.add(10000);
             }
@@ -137,7 +139,7 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO {
 
     @Override
     public List<Node> getServicesMetric(Step step, long startTime, long endTime, MetricSource metricSource,
-        Collection<Integer> serviceIds) {
+                                        Collection<Integer> serviceIds) {
         String tableName = TimePyramidTableNameBuilder.build(step, ServiceMetricTable.TABLE);
 
         SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(tableName);
@@ -169,8 +171,8 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO {
 
             ServiceNode serviceNode = new ServiceNode();
             serviceNode.setId(serviceId);
-            serviceNode.setCalls((long)callsSum.getValue());
-            serviceNode.setSla((int)(((callsSum.getValue() - errorCallsSum.getValue()) / callsSum.getValue()) * 10000));
+            serviceNode.setCalls((long) callsSum.getValue());
+            serviceNode.setSla((int) (((callsSum.getValue() - errorCallsSum.getValue()) / callsSum.getValue()) * 10000));
             nodes.add(serviceNode);
         });
         return nodes;
@@ -178,7 +180,7 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO {
 
     @Override
     public List<ServiceMetric> getSlowService(int applicationId, Step step, long startTimeBucket, long endTimeBucket,
-        Integer topN, MetricSource metricSource) {
+                                              Integer topN, MetricSource metricSource) {
         String tableName = TimePyramidTableNameBuilder.build(step, ServiceMetricTable.TABLE);
 
         SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(tableName);
@@ -202,12 +204,12 @@ public class ServiceMetricEsUIDAO extends EsDAO implements IServiceMetricUIDAO {
         Set<Integer> serviceIds = new HashSet<>();
         List<ServiceMetric> serviceMetrics = new LinkedList<>();
         for (SearchHit searchHit : searchHits) {
-            int serviceId = ((Number)searchHit.getSource().get(ServiceMetricTable.COLUMN_SERVICE_ID)).intValue();
+            int serviceId = ((Number) searchHit.getSource().get(ServiceMetricTable.COLUMN_SERVICE_ID)).intValue();
             if (!serviceIds.contains(serviceId)) {
                 ServiceMetric serviceMetric = new ServiceMetric();
                 serviceMetric.setId(serviceId);
-                serviceMetric.setCalls(((Number)searchHit.getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue());
-                serviceMetric.setAvgResponseTime(((Number)searchHit.getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_AVERAGE_DURATION)).intValue());
+                serviceMetric.setCalls(((Number) searchHit.getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue());
+                serviceMetric.setAvgResponseTime(((Number) searchHit.getSource().get(ServiceMetricTable.COLUMN_TRANSACTION_AVERAGE_DURATION)).intValue());
                 serviceMetrics.add(serviceMetric);
 
                 serviceIds.add(serviceId);

-- 
To stop receiving notification emails like this one, please contact
wusheng@apache.org.