You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by pe...@apache.org on 2018/06/25 13:19:56 UTC

[incubator-skywalking] branch master updated: [Collector] Topology query tuning, Batch Process instead of bulk. (#1384)

This is an automated email from the ASF dual-hosted git repository.

pengys pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e03ec8  [Collector] Topology query tuning, Batch Process instead of bulk. (#1384)
5e03ec8 is described below

commit 5e03ec8845105fc2fb56ab680a8d838e108639c4
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Mon Jun 25 21:19:54 2018 +0800

    [Collector] Topology query tuning, Batch Process instead of bulk. (#1384)
    
    * #1202
    
    1. Determine the log is enabled for the DEBUG level before printing message.
    2. Make the columns initialize to be static attribute.
    3. Topology build optimize: Cache query result to avoid repeating queries.
    
    * #1202
    
    Add elasticsearch batch process setting into application.yml.
    
    * #1202
    
    Fixed check style error.
    
    * #1202
    
    Use XContentFactory to build source to insert into elasticsearch.
---
 .../handler/ApplicationRegisterServiceHandler.java |   4 +-
 .../provider/handler/JVMMetricsServiceHandler.java |  21 ++--
 .../NetworkAddressRegisterServiceHandler.java      |   5 +-
 .../handler/TraceSegmentServiceHandler.java        |   5 +-
 .../NetworkAddressRegisterServletHandler.java      |  19 ++--
 .../handler/TraceSegmentServletHandler.java        |  20 ++--
 .../jvm/provider/service/GCMetricService.java      |   6 +-
 .../provider/AnalysisMetricModuleProvider.java     |  16 +--
 .../provider/service/InstanceHeartBeatService.java |   4 +-
 .../ApplicationComponentSpanListener.java          |   2 +-
 .../mapping/ApplicationMappingSpanListener.java    |  15 ++-
 .../worker/global/GlobalTraceSpanListener.java     |  10 +-
 .../mapping/InstanceMappingSpanListener.java       |  15 ++-
 .../segment/SegmentDurationSpanListener.java       |   6 +-
 .../heartbeat/ServiceNameAggregationWorker.java    |   2 +-
 .../ServiceNameHeartBeatPersistenceWorker.java     |   2 +-
 .../service/heartbeat/ServiceNameSpanListener.java |   2 +-
 .../ServiceReferenceMetricSpanListener.java        |  10 +-
 .../register/ApplicationRegisterRemoteWorker.java  |  17 ++-
 .../register/ApplicationRegisterSerialWorker.java  |  16 ++-
 .../register/InstanceRegisterRemoteWorker.java     |  16 ++-
 .../register/InstanceRegisterSerialWorker.java     |  16 ++-
 .../NetworkAddressRegisterRemoteWorker.java        |  17 ++-
 .../NetworkAddressRegisterSerialWorker.java        |  14 +--
 .../register/ServiceNameRegisterSerialWorker.java  |   5 +-
 .../provider/service/InstanceIDService.java        |  10 +-
 .../provider/buffer/SegmentBufferManager.java      |   5 +-
 .../provider/buffer/SegmentBufferReader.java       |  11 +-
 .../parser/provider/parser/SegmentParse.java       |  16 ++-
 .../provider/parser/SegmentPersistenceWorker.java  |   2 +-
 .../parser/standardization/SpanIdExchanger.java    |  12 ++-
 .../provider/service/SegmentBase64Printer.java     |   2 +-
 .../base/AbstractLocalAsyncWorkerProvider.java     |   2 +-
 .../analysis/worker/model/base/AbstractWorker.java |   2 +-
 .../worker/model/base/LocalAsyncWorkerRef.java     |   8 +-
 .../worker/model/base/RemoteWorkerRef.java         |   2 +-
 .../worker/model/impl/AggregationWorker.java       |  21 ++--
 .../worker/model/impl/MergePersistenceWorker.java  |  14 ++-
 .../model/impl/NonMergePersistenceWorker.java      |   4 +-
 .../worker/model/impl/PersistenceWorker.java       |   2 +-
 .../analysis/worker/timer/PersistenceTimer.java    |  25 +++--
 .../src/main/resources/application.yml             |   8 +-
 .../guava/service/ServiceIdCacheGuavaService.java  |   2 +-
 .../service/ServiceNameCacheGuavaService.java      |  13 ++-
 .../client/elasticsearch/ElasticSearchClient.java  |  22 ++--
 .../apm/collector/core/cache/Window.java           |   2 +-
 .../apm/collector/core/data/StreamData.java        |  16 ++-
 .../core/data/operator/NonMergeOperation.java      |   1 +
 .../apm/collector/instrument/MetricTree.java       |  12 +--
 .../collector/instrument/ServiceMetricTracing.java |  10 +-
 .../instrument/tools/ReportFormatter.java          |  27 ++---
 .../service/selector/ForeverFirstSelector.java     |   4 +-
 .../storage/base/dao/IPersistenceDAO.java          |   5 +-
 .../storage/table/alarm/ApplicationAlarm.java      |   5 +-
 .../storage/table/alarm/ApplicationAlarmList.java  |   5 +-
 .../table/alarm/ApplicationReferenceAlarm.java     |   5 +-
 .../table/alarm/ApplicationReferenceAlarmList.java |   5 +-
 .../storage/table/alarm/InstanceAlarm.java         |   5 +-
 .../storage/table/alarm/InstanceAlarmList.java     |   5 +-
 .../table/alarm/InstanceReferenceAlarm.java        |   5 +-
 .../table/alarm/InstanceReferenceAlarmList.java    |   5 +-
 .../storage/table/alarm/ServiceAlarm.java          |   5 +-
 .../storage/table/alarm/ServiceAlarmList.java      |   5 +-
 .../storage/table/alarm/ServiceReferenceAlarm.java |   5 +-
 .../table/alarm/ServiceReferenceAlarmList.java     |   5 +-
 .../table/application/ApplicationComponent.java    |   5 +-
 .../table/application/ApplicationMapping.java      |   5 +-
 .../table/application/ApplicationMetric.java       |   5 +-
 .../application/ApplicationReferenceMetric.java    |   5 +-
 .../storage/table/global/GlobalTrace.java          |   8 +-
 .../table/global/ResponseTimeDistribution.java     |   5 +-
 .../storage/table/instance/InstanceMapping.java    |   5 +-
 .../storage/table/instance/InstanceMetric.java     |   5 +-
 .../table/instance/InstanceReferenceMetric.java    |   5 +-
 .../apm/collector/storage/table/jvm/GCMetric.java  |   5 +-
 .../collector/storage/table/jvm/MemoryMetric.java  |   5 +-
 .../storage/table/jvm/MemoryPoolMetric.java        |   5 +-
 .../storage/table/register/Application.java        |   8 +-
 .../collector/storage/table/register/Instance.java |   5 +-
 .../storage/table/register/NetworkAddress.java     |   9 +-
 .../storage/table/register/ServiceName.java        |   5 +-
 .../collector/storage/table/segment/Segment.java   |   8 +-
 .../storage/table/service/ServiceMetric.java       |   5 +-
 .../table/service/ServiceReferenceMetric.java      |   6 +-
 .../collector/storage/es/MetricTransformUtil.java  |  41 +++----
 .../storage/es/StorageModuleEsConfig.java          |  36 +++++++
 .../storage/es/StorageModuleEsProvider.java        |   4 +-
 .../es/base/dao/AbstractPersistenceEsDAO.java      |  12 ++-
 .../collector/storage/es/base/dao/BatchEsDAO.java  |  72 -------------
 .../storage/es/base/dao/BatchProcessEsDAO.java     | 120 +++++++++++++++++++++
 .../es/dao/GlobalTraceEsPersistenceDAO.java        |  17 +--
 .../es/dao/InstanceHeartBeatEsPersistenceDAO.java  |  12 ++-
 .../es/dao/SegmentDurationEsPersistenceDAO.java    |  30 +++---
 .../storage/es/dao/SegmentEsPersistenceDAO.java    |  12 ++-
 .../dao/ServiceNameHeartBeatEsPersistenceDAO.java  |  30 ++++--
 ...stractApplicationComponentEsPersistenceDAO.java |  21 ++--
 ...stractApplicationAlarmListEsPersistenceDAO.java |  24 ++---
 .../alarm/ApplicationAlarmEsPersistenceDAO.java    |  22 ++--
 .../ApplicationReferenceAlarmEsPersistenceDAO.java |  24 ++---
 ...licationReferenceAlarmListEsPersistenceDAO.java |  25 ++---
 .../dao/alarm/InstanceAlarmEsPersistenceDAO.java   |  24 ++---
 .../alarm/InstanceAlarmListEsPersistenceDAO.java   |  24 ++---
 .../InstanceReferenceAlarmEsPersistenceDAO.java    |  28 ++---
 ...InstanceReferenceAlarmListEsPersistenceDAO.java |  28 ++---
 .../es/dao/alarm/ServiceAlarmEsPersistenceDAO.java |  26 ++---
 .../alarm/ServiceAlarmListEsPersistenceDAO.java    |  26 ++---
 .../ServiceReferenceAlarmEsPersistenceDAO.java     |  32 +++---
 .../ServiceReferenceAlarmListEsPersistenceDAO.java |  32 +++---
 .../AbstractApplicationMetricEsPersistenceDAO.java |  25 ++---
 ...AbstractApplicationMappingEsPersistenceDAO.java |  21 ++--
 ...ApplicationReferenceMetricEsPersistenceDAO.java |  26 ++---
 .../dao/cpu/AbstractCpuMetricEsPersistenceDAO.java |  23 ++--
 .../dao/gc/AbstractGCMetricEsPersistenceDAO.java   |  25 ++---
 .../AbstractInstanceMetricEsPersistenceDAO.java    |  17 +--
 .../AbstractInstanceMappingEsPersistenceDAO.java   |  23 ++--
 ...actInstanceReferenceMetricEsPersistenceDAO.java |  23 ++--
 .../AbstractMemoryMetricEsPersistenceDAO.java      |  31 +++---
 .../AbstractMemoryPoolMetricEsPersistenceDAO.java  |  32 +++---
 ...ctResponseTimeDistributionEsPersistenceDAO.java |  26 ++---
 .../smp/AbstractServiceMetricEsPersistenceDAO.java |  19 ++--
 ...ractServiceReferenceMetricEsPersistenceDAO.java |  27 ++---
 .../es/dao/ui/ApplicationComponentEsUIDAO.java     |   2 +-
 .../es/dao/ui/ApplicationMappingEsUIDAO.java       |   2 +-
 .../es/dao/ui/ApplicationMetricEsUIDAO.java        |   2 +-
 .../dao/ui/ApplicationReferenceMetricEsUIDAO.java  |   2 +-
 .../storage/es/dao/ui/InstanceEsUIDAO.java         |   6 +-
 .../collector/storage/h2/base/dao/BatchH2DAO.java  |  12 ++-
 .../collector/ui/jetty/handler/GraphQLHandler.java |  53 +++------
 .../ui/service/ApplicationTopologyService.java     |   2 +-
 .../ui/service/ClusterTopologyService.java         |   2 +-
 .../apm/collector/ui/service/TopologyBuilder.java  |  50 +++++----
 .../collector/ui/service/TopologyBuilderTest.java  |   3 +-
 132 files changed, 1069 insertions(+), 801 deletions(-)

diff --git a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/ApplicationRegisterServiceHandler.java b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/ApplicationRegisterServiceHandler.java
index 9ca2ca2..7d58e57 100644
--- a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/ApplicationRegisterServiceHandler.java
+++ b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/ApplicationRegisterServiceHandler.java
@@ -45,7 +45,9 @@ public class ApplicationRegisterServiceHandler extends ApplicationRegisterServic
 
     @Override
     public void applicationCodeRegister(Application request, StreamObserver<ApplicationMapping> responseObserver) {
-        logger.debug("register application");
+        if (logger.isDebugEnabled()) {
+            logger.debug("register application");
+        }
 
         ApplicationMapping.Builder builder = ApplicationMapping.newBuilder();
         String applicationCode = request.getApplicationCode();
diff --git a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/JVMMetricsServiceHandler.java b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/JVMMetricsServiceHandler.java
index 05d728f..54dfb22 100644
--- a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/JVMMetricsServiceHandler.java
+++ b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/JVMMetricsServiceHandler.java
@@ -21,24 +21,14 @@ package org.apache.skywalking.apm.collector.agent.grpc.provider.handler;
 import io.grpc.stub.StreamObserver;
 import java.util.List;
 import org.apache.skywalking.apm.collector.analysis.jvm.define.AnalysisJVMModule;
-import org.apache.skywalking.apm.collector.analysis.jvm.define.service.ICpuMetricService;
-import org.apache.skywalking.apm.collector.analysis.jvm.define.service.IGCMetricService;
-import org.apache.skywalking.apm.collector.analysis.jvm.define.service.IMemoryMetricService;
-import org.apache.skywalking.apm.collector.analysis.jvm.define.service.IMemoryPoolMetricService;
+import org.apache.skywalking.apm.collector.analysis.jvm.define.service.*;
 import org.apache.skywalking.apm.collector.analysis.metric.define.AnalysisMetricModule;
 import org.apache.skywalking.apm.collector.analysis.metric.define.service.IInstanceHeartBeatService;
 import org.apache.skywalking.apm.collector.core.module.ModuleManager;
 import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
 import org.apache.skywalking.apm.collector.server.grpc.GRPCHandler;
-import org.apache.skywalking.apm.network.proto.CPU;
-import org.apache.skywalking.apm.network.proto.Downstream;
-import org.apache.skywalking.apm.network.proto.GC;
-import org.apache.skywalking.apm.network.proto.JVMMetrics;
-import org.apache.skywalking.apm.network.proto.JVMMetricsServiceGrpc;
-import org.apache.skywalking.apm.network.proto.Memory;
-import org.apache.skywalking.apm.network.proto.MemoryPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.skywalking.apm.network.proto.*;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
@@ -63,7 +53,10 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe
 
     @Override public void collect(JVMMetrics request, StreamObserver<Downstream> responseObserver) {
         int instanceId = request.getApplicationInstanceId();
-        logger.debug("receive the jvm metric from application instance, id: {}", instanceId);
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("receive the jvm metric from application instance, id: {}", instanceId);
+        }
 
         request.getMetricsList().forEach(metric -> {
             long minuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(metric.getTime());
diff --git a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/NetworkAddressRegisterServiceHandler.java b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/NetworkAddressRegisterServiceHandler.java
index d925d3d..35d46ba 100644
--- a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/NetworkAddressRegisterServiceHandler.java
+++ b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/NetworkAddressRegisterServiceHandler.java
@@ -46,7 +46,10 @@ public class NetworkAddressRegisterServiceHandler extends NetworkAddressRegister
 
     @Override
     public void batchRegister(NetworkAddresses request, StreamObserver<NetworkAddressMappings> responseObserver) {
-        logger.debug("register application");
+        if (logger.isDebugEnabled()) {
+            logger.debug("register application");
+        }
+
         ProtocolStringList addressesList = request.getAddressesList();
 
         NetworkAddressMappings.Builder builder = NetworkAddressMappings.newBuilder();
diff --git a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/TraceSegmentServiceHandler.java b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/TraceSegmentServiceHandler.java
index 4298e79..e8fd6dc 100644
--- a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/TraceSegmentServiceHandler.java
+++ b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/TraceSegmentServiceHandler.java
@@ -44,7 +44,10 @@ public class TraceSegmentServiceHandler extends TraceSegmentServiceGrpc.TraceSeg
     @Override public StreamObserver<UpstreamSegment> collect(StreamObserver<Downstream> responseObserver) {
         return new StreamObserver<UpstreamSegment>() {
             @Override public void onNext(UpstreamSegment segment) {
-                logger.debug("receive segment");
+                if (logger.isDebugEnabled()) {
+                    logger.debug("receive segment");
+                }
+
                 segmentParseService.parse(segment, ISegmentParseService.Source.Agent);
 
                 if (debug) {
diff --git a/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/NetworkAddressRegisterServletHandler.java b/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/NetworkAddressRegisterServletHandler.java
index 4fc8852..1da81e2 100644
--- a/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/NetworkAddressRegisterServletHandler.java
+++ b/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/NetworkAddressRegisterServletHandler.java
@@ -18,19 +18,14 @@
 
 package org.apache.skywalking.apm.collector.agent.jetty.provider.handler;
 
-import com.google.gson.Gson;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
+import com.google.gson.*;
 import java.io.IOException;
 import javax.servlet.http.HttpServletRequest;
 import org.apache.skywalking.apm.collector.analysis.register.define.AnalysisRegisterModule;
 import org.apache.skywalking.apm.collector.analysis.register.define.service.INetworkAddressIDService;
 import org.apache.skywalking.apm.collector.core.module.ModuleManager;
-import org.apache.skywalking.apm.collector.server.jetty.ArgumentsParseException;
 import org.apache.skywalking.apm.collector.server.jetty.JettyJsonHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
@@ -52,17 +47,21 @@ public class NetworkAddressRegisterServletHandler extends JettyJsonHandler {
         return "/networkAddress/register";
     }
 
-    @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
+    @Override protected JsonElement doGet(HttpServletRequest req) {
         throw new UnsupportedOperationException();
     }
 
-    @Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
+    @Override protected JsonElement doPost(HttpServletRequest req) {
         JsonArray responseArray = new JsonArray();
         try {
             JsonArray networkAddresses = gson.fromJson(req.getReader(), JsonArray.class);
             for (int i = 0; i < networkAddresses.size(); i++) {
                 String networkAddress = networkAddresses.get(i).getAsString();
-                logger.debug("network address register, network address: {}", networkAddress);
+
+                if (logger.isDebugEnabled()) {
+                    logger.debug("network address register, network address: {}", networkAddress);
+                }
+
                 int addressId = networkAddressIDService.get(networkAddress);
                 JsonObject mapping = new JsonObject();
                 mapping.addProperty(ADDRESS_ID, addressId);
diff --git a/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/TraceSegmentServletHandler.java b/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/TraceSegmentServletHandler.java
index 169329b..83f9143 100644
--- a/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/TraceSegmentServletHandler.java
+++ b/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/handler/TraceSegmentServletHandler.java
@@ -20,18 +20,14 @@ package org.apache.skywalking.apm.collector.agent.jetty.provider.handler;
 
 import com.google.gson.JsonElement;
 import com.google.gson.stream.JsonReader;
-import java.io.BufferedReader;
-import java.io.IOException;
+import java.io.*;
 import javax.servlet.http.HttpServletRequest;
-import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.reader.TraceSegment;
-import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.reader.TraceSegmentJsonReader;
+import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.reader.*;
 import org.apache.skywalking.apm.collector.analysis.segment.parser.define.AnalysisSegmentParserModule;
 import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParseService;
 import org.apache.skywalking.apm.collector.core.module.ModuleManager;
-import org.apache.skywalking.apm.collector.server.jetty.ArgumentsParseException;
 import org.apache.skywalking.apm.collector.server.jetty.JettyJsonHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
@@ -50,18 +46,22 @@ public class TraceSegmentServletHandler extends JettyJsonHandler {
         return "/segments";
     }
 
-    @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
+    @Override protected JsonElement doGet(HttpServletRequest req) {
         throw new UnsupportedOperationException();
     }
 
-    @Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
-        logger.debug("receive stream segment");
+    @Override protected JsonElement doPost(HttpServletRequest req) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("receive stream segment");
+        }
+
         try {
             BufferedReader bufferedReader = req.getReader();
             read(bufferedReader);
         } catch (IOException e) {
             logger.error(e.getMessage(), e);
         }
+        
         return null;
     }
 
diff --git a/apm-collector/apm-collector-analysis/analysis-jvm/jvm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/jvm/provider/service/GCMetricService.java b/apm-collector/apm-collector-analysis/analysis-jvm/jvm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/jvm/provider/service/GCMetricService.java
index 9bc73d3..38c252f 100644
--- a/apm-collector/apm-collector-analysis/analysis-jvm/jvm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/jvm/provider/service/GCMetricService.java
+++ b/apm-collector/apm-collector-analysis/analysis-jvm/jvm-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/jvm/provider/service/GCMetricService.java
@@ -34,7 +34,7 @@ import static java.util.Objects.isNull;
  */
 public class GCMetricService implements IGCMetricService {
 
-    private final Logger logger = LoggerFactory.getLogger(GCMetricService.class);
+    private static final Logger logger = LoggerFactory.getLogger(GCMetricService.class);
 
     private Graph<GCMetric> gcMetricGraph;
 
@@ -59,7 +59,9 @@ public class GCMetricService implements IGCMetricService {
         gcMetric.setTimes(1L);
         gcMetric.setTimeBucket(timeBucket);
 
-        logger.debug("push to gc metric graph, id: {}", gcMetric.getId());
+        if (logger.isDebugEnabled()) {
+            logger.debug("push to gc metric graph, id: {}", gcMetric.getId());
+        }
         getGcMetricGraph().start(gcMetric);
     }
 }
diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/AnalysisMetricModuleProvider.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/AnalysisMetricModuleProvider.java
index 35db2d9..4258a28 100644
--- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/AnalysisMetricModuleProvider.java
+++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/AnalysisMetricModuleProvider.java
@@ -100,14 +100,14 @@ public class AnalysisMetricModuleProvider extends ModuleProvider {
 
     private void segmentParserListenerRegister() {
         ISegmentParserListenerRegister segmentParserListenerRegister = getManager().find(AnalysisSegmentParserModule.NAME).getService(ISegmentParserListenerRegister.class);
-        segmentParserListenerRegister.register(new ServiceReferenceMetricSpanListener.Factory());
-        segmentParserListenerRegister.register(new ApplicationComponentSpanListener.Factory());
-        segmentParserListenerRegister.register(new ApplicationMappingSpanListener.Factory());
-        segmentParserListenerRegister.register(new InstanceMappingSpanListener.Factory());
-        segmentParserListenerRegister.register(new GlobalTraceSpanListener.Factory());
-        segmentParserListenerRegister.register(new SegmentDurationSpanListener.Factory());
-        segmentParserListenerRegister.register(new ResponseTimeDistributionSpanListener.Factory());
-        segmentParserListenerRegister.register(new ServiceNameSpanListener.Factory());
+        segmentParserListenerRegister.register(new ServiceReferenceMetricSpanListener.Factory()); //11000TPS
+        segmentParserListenerRegister.register(new ApplicationComponentSpanListener.Factory()); //17000TPS
+        segmentParserListenerRegister.register(new ApplicationMappingSpanListener.Factory()); //22000TPS
+        segmentParserListenerRegister.register(new InstanceMappingSpanListener.Factory()); //22000TPS
+        segmentParserListenerRegister.register(new GlobalTraceSpanListener.Factory()); //15000TPS
+        segmentParserListenerRegister.register(new SegmentDurationSpanListener.Factory()); //13000TPS
+        segmentParserListenerRegister.register(new ResponseTimeDistributionSpanListener.Factory()); //25000TPS
+        segmentParserListenerRegister.register(new ServiceNameSpanListener.Factory()); //20000TPS
     }
 
     private void graphCreate(WorkerCreateListener workerCreateListener) {
diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/service/InstanceHeartBeatService.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/service/InstanceHeartBeatService.java
index 48bd92f..1409f02 100644
--- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/service/InstanceHeartBeatService.java
+++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/service/InstanceHeartBeatService.java
@@ -51,7 +51,9 @@ public class InstanceHeartBeatService implements IInstanceHeartBeatService {
         instance.setHeartBeatTime(TimeBucketUtils.INSTANCE.getSecondTimeBucket(heartBeatTime));
         instance.setInstanceId(instanceId);
 
-        logger.debug("push to instance heart beat persistence worker, id: {}", instance.getId());
+        if (logger.isDebugEnabled()) {
+            logger.debug("push to instance heart beat persistence worker, id: {}", instance.getId());
+        }
         getHeartBeatGraph().start(instance);
     }
 }
diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/application/component/ApplicationComponentSpanListener.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/application/component/ApplicationComponentSpanListener.java
index dc095c5..4511305 100644
--- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/application/component/ApplicationComponentSpanListener.java
+++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/application/component/ApplicationComponentSpanListener.java
@@ -36,7 +36,7 @@ import org.apache.skywalking.apm.collector.storage.table.application.Application
 public class ApplicationComponentSpanListener implements EntrySpanListener, ExitSpanListener {
 
     private final ApplicationCacheService applicationCacheService;
-    private List<ApplicationComponent> applicationComponents = new LinkedList<>();
+    private final List<ApplicationComponent> applicationComponents = new LinkedList<>();
 
     private ApplicationComponentSpanListener(ModuleManager moduleManager) {
         this.applicationCacheService = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class);
diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/application/mapping/ApplicationMappingSpanListener.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/application/mapping/ApplicationMappingSpanListener.java
index f107ad7..c30a29a 100644
--- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/application/mapping/ApplicationMappingSpanListener.java
+++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/application/mapping/ApplicationMappingSpanListener.java
@@ -51,7 +51,10 @@ public class ApplicationMappingSpanListener implements EntrySpanListener {
     }
 
     @Override public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
-        logger.debug("application mapping listener parse reference");
+        if (logger.isDebugEnabled()) {
+            logger.debug("application mapping listener parse reference");
+        }
+
         if (!spanDecorator.getSpanLayer().equals(SpanLayer.MQ)) {
             if (spanDecorator.getRefsCount() > 0) {
                 for (int i = 0; i < spanDecorator.getRefsCount(); i++) {
@@ -73,10 +76,16 @@ public class ApplicationMappingSpanListener implements EntrySpanListener {
     }
 
     @Override public void build() {
-        logger.debug("application mapping listener build");
+        if (logger.isDebugEnabled()) {
+            logger.debug("application mapping listener build");
+        }
+
         Graph<ApplicationMapping> graph = GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.APPLICATION_MAPPING_GRAPH_ID, ApplicationMapping.class);
         applicationMappings.forEach(applicationMapping -> {
-            logger.debug("push to application mapping aggregation worker, id: {}", applicationMapping.getId());
+            if (logger.isDebugEnabled()) {
+                logger.debug("push to application mapping aggregation worker, id: {}", applicationMapping.getId());
+            }
+
             graph.start(applicationMapping);
         });
     }
diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/global/GlobalTraceSpanListener.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/global/GlobalTraceSpanListener.java
index 0a48369..655fb4d 100644
--- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/global/GlobalTraceSpanListener.java
+++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/global/GlobalTraceSpanListener.java
@@ -37,7 +37,7 @@ public class GlobalTraceSpanListener implements GlobalTraceIdsListener {
 
     private static final Logger logger = LoggerFactory.getLogger(GlobalTraceSpanListener.class);
 
-    private List<String> globalTraceIds = new LinkedList<>();
+    private final List<String> globalTraceIds = new LinkedList<>();
     private SegmentCoreInfo segmentCoreInfo;
 
     @Override public boolean containsPoint(Point point) {
@@ -58,17 +58,19 @@ public class GlobalTraceSpanListener implements GlobalTraceIdsListener {
     }
 
     @Override public void build() {
-        logger.debug("global trace listener build");
+        if (logger.isDebugEnabled()) {
+            logger.debug("global trace listener build");
+        }
 
         Graph<GlobalTrace> graph = GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.GLOBAL_TRACE_GRAPH_ID, GlobalTrace.class);
-        for (String globalTraceId : globalTraceIds) {
+        globalTraceIds.forEach(globalTraceId -> {
             GlobalTrace globalTrace = new GlobalTrace();
             globalTrace.setId(segmentCoreInfo.getSegmentId() + Const.ID_SPLIT + globalTraceId);
             globalTrace.setTraceId(globalTraceId);
             globalTrace.setSegmentId(segmentCoreInfo.getSegmentId());
             globalTrace.setTimeBucket(segmentCoreInfo.getMinuteTimeBucket());
             graph.start(globalTrace);
-        }
+        });
     }
 
     public static class Factory implements SpanListenerFactory {
diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/mapping/InstanceMappingSpanListener.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/mapping/InstanceMappingSpanListener.java
index a620870..8213a32 100644
--- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/mapping/InstanceMappingSpanListener.java
+++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/instance/mapping/InstanceMappingSpanListener.java
@@ -43,7 +43,10 @@ public class InstanceMappingSpanListener implements EntrySpanListener {
     }
 
     @Override public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
-        logger.debug("instance mapping listener parse reference");
+        if (logger.isDebugEnabled()) {
+            logger.debug("instance mapping listener parse reference");
+        }
+
         if (spanDecorator.getRefsCount() > 0) {
             for (int i = 0; i < spanDecorator.getRefsCount(); i++) {
                 InstanceMapping instanceMapping = new InstanceMapping();
@@ -60,10 +63,16 @@ public class InstanceMappingSpanListener implements EntrySpanListener {
     }
 
     @Override public void build() {
-        logger.debug("instance mapping listener build");
+        if (logger.isDebugEnabled()) {
+            logger.debug("instance mapping listener build");
+        }
+
         Graph<InstanceMapping> graph = GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.INSTANCE_MAPPING_GRAPH_ID, InstanceMapping.class);
         instanceMappings.forEach(instanceMapping -> {
-            logger.debug("push to instance mapping aggregation worker, id: {}", instanceMapping.getId());
+            if (logger.isDebugEnabled()) {
+                logger.debug("push to instance mapping aggregation worker, id: {}", instanceMapping.getId());
+            }
+
             graph.start(instanceMapping);
         });
     }
diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/segment/SegmentDurationSpanListener.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/segment/SegmentDurationSpanListener.java
index 30269c4..c9e7b13 100644
--- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/segment/SegmentDurationSpanListener.java
+++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/segment/SegmentDurationSpanListener.java
@@ -76,7 +76,11 @@ public class SegmentDurationSpanListener implements FirstSpanListener, EntrySpan
 
     @Override public void build() {
         Graph<SegmentDuration> graph = GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.SEGMENT_DURATION_GRAPH_ID, SegmentDuration.class);
-        logger.debug("segment duration listener build");
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("segment duration listener build");
+        }
+
         if (entryOperationNameIds.size() == 0) {
             segmentDuration.getServiceName().add(serviceNameCacheService.get(firstOperationNameId).getServiceName());
         } else {
diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameAggregationWorker.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameAggregationWorker.java
index 1079bbc..ed5c7d7 100644
--- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameAggregationWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameAggregationWorker.java
@@ -49,7 +49,7 @@ public class ServiceNameAggregationWorker extends AggregationWorker<ServiceName,
         }
 
         @Override public int queueSize() {
-            return 256;
+            return 4096;
         }
     }
 
diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameHeartBeatPersistenceWorker.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameHeartBeatPersistenceWorker.java
index 1491c95..2c2b6f1 100644
--- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameHeartBeatPersistenceWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameHeartBeatPersistenceWorker.java
@@ -61,7 +61,7 @@ public class ServiceNameHeartBeatPersistenceWorker extends MergePersistenceWorke
 
         @Override
         public int queueSize() {
-            return 1024;
+            return 4096;
         }
     }
 
diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameSpanListener.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameSpanListener.java
index 44a8ca6..261f163 100644
--- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameSpanListener.java
+++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/heartbeat/ServiceNameSpanListener.java
@@ -32,7 +32,7 @@ import org.apache.skywalking.apm.collector.storage.table.register.ServiceName;
  */
 public class ServiceNameSpanListener implements EntrySpanListener, ExitSpanListener, LocalSpanListener {
 
-    private List<ServiceName> serviceNames;
+    private final List<ServiceName> serviceNames;
 
     private ServiceNameSpanListener() {
         this.serviceNames = new LinkedList<>();
diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/refmetric/ServiceReferenceMetricSpanListener.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/refmetric/ServiceReferenceMetricSpanListener.java
index 2d4fbcb..fe03954 100644
--- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/refmetric/ServiceReferenceMetricSpanListener.java
+++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/service/refmetric/ServiceReferenceMetricSpanListener.java
@@ -148,7 +148,10 @@ public class ServiceReferenceMetricSpanListener implements EntrySpanListener, Ex
     }
 
     @Override public void build() {
-        logger.debug("service reference listener build");
+        if (logger.isDebugEnabled()) {
+            logger.debug("service reference listener build");
+        }
+
         Graph<ServiceReferenceMetric> graph = GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.SERVICE_REFERENCE_METRIC_GRAPH_ID, ServiceReferenceMetric.class);
         entryReferenceMetric.forEach(serviceReferenceMetric -> {
             String metricId = serviceReferenceMetric.getFrontServiceId() + Const.ID_SPLIT + serviceReferenceMetric.getBehindServiceId() + Const.ID_SPLIT + serviceReferenceMetric.getSourceValue();
@@ -157,7 +160,10 @@ public class ServiceReferenceMetricSpanListener implements EntrySpanListener, Ex
             serviceReferenceMetric.setId(id);
             serviceReferenceMetric.setMetricId(metricId);
             serviceReferenceMetric.setTimeBucket(minuteTimeBucket);
-            logger.debug("push to service reference aggregation worker, id: {}", serviceReferenceMetric.getId());
+
+            if (logger.isDebugEnabled()) {
+                logger.debug("push to service reference aggregation worker, id: {}", serviceReferenceMetric.getId());
+            }
 
             graph.start(serviceReferenceMetric);
         });
diff --git a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/ApplicationRegisterRemoteWorker.java b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/ApplicationRegisterRemoteWorker.java
index 6ab9f48..147166e 100644
--- a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/ApplicationRegisterRemoteWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/ApplicationRegisterRemoteWorker.java
@@ -19,15 +19,11 @@
 package org.apache.skywalking.apm.collector.analysis.register.provider.register;
 
 import org.apache.skywalking.apm.collector.analysis.register.define.graph.WorkerIdDefine;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractRemoteWorker;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractRemoteWorkerProvider;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerException;
+import org.apache.skywalking.apm.collector.analysis.worker.model.base.*;
 import org.apache.skywalking.apm.collector.core.module.ModuleManager;
-import org.apache.skywalking.apm.collector.remote.service.RemoteSenderService;
-import org.apache.skywalking.apm.collector.remote.service.Selector;
+import org.apache.skywalking.apm.collector.remote.service.*;
 import org.apache.skywalking.apm.collector.storage.table.register.Application;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
@@ -44,8 +40,11 @@ public class ApplicationRegisterRemoteWorker extends AbstractRemoteWorker<Applic
         return WorkerIdDefine.APPLICATION_REGISTER_REMOTE_WORKER;
     }
 
-    @Override protected void onWork(Application message) throws WorkerException {
-        logger.debug("application code: {}", message.getApplicationCode());
+    @Override protected void onWork(Application message) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("application code: {}", message.getApplicationCode());
+        }
+
         onNext(message);
     }
 
diff --git a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/ApplicationRegisterSerialWorker.java b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/ApplicationRegisterSerialWorker.java
index 4b34ffc..7db65e1 100644
--- a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/ApplicationRegisterSerialWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/ApplicationRegisterSerialWorker.java
@@ -19,19 +19,15 @@
 package org.apache.skywalking.apm.collector.analysis.register.provider.register;
 
 import org.apache.skywalking.apm.collector.analysis.register.define.graph.WorkerIdDefine;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorker;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorkerProvider;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerException;
+import org.apache.skywalking.apm.collector.analysis.worker.model.base.*;
 import org.apache.skywalking.apm.collector.cache.CacheModule;
 import org.apache.skywalking.apm.collector.cache.service.ApplicationCacheService;
 import org.apache.skywalking.apm.collector.core.module.ModuleManager;
-import org.apache.skywalking.apm.collector.core.util.BooleanUtils;
-import org.apache.skywalking.apm.collector.core.util.Const;
+import org.apache.skywalking.apm.collector.core.util.*;
 import org.apache.skywalking.apm.collector.storage.StorageModule;
 import org.apache.skywalking.apm.collector.storage.dao.register.IApplicationRegisterDAO;
 import org.apache.skywalking.apm.collector.storage.table.register.Application;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
@@ -53,8 +49,10 @@ public class ApplicationRegisterSerialWorker extends AbstractLocalAsyncWorker<Ap
         return WorkerIdDefine.APPLICATION_REGISTER_SERIAL_WORKER;
     }
 
-    @Override protected void onWork(Application application) throws WorkerException {
-        logger.debug("register application, application code: {}", application.getApplicationCode());
+    @Override protected void onWork(Application application) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("register application, application code: {}", application.getApplicationCode());
+        }
 
         int applicationId;
 
diff --git a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/InstanceRegisterRemoteWorker.java b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/InstanceRegisterRemoteWorker.java
index 9d43322..0b9da8a 100644
--- a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/InstanceRegisterRemoteWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/InstanceRegisterRemoteWorker.java
@@ -19,15 +19,11 @@
 package org.apache.skywalking.apm.collector.analysis.register.provider.register;
 
 import org.apache.skywalking.apm.collector.analysis.register.define.graph.WorkerIdDefine;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractRemoteWorker;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractRemoteWorkerProvider;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerException;
+import org.apache.skywalking.apm.collector.analysis.worker.model.base.*;
 import org.apache.skywalking.apm.collector.core.module.ModuleManager;
-import org.apache.skywalking.apm.collector.remote.service.RemoteSenderService;
-import org.apache.skywalking.apm.collector.remote.service.Selector;
+import org.apache.skywalking.apm.collector.remote.service.*;
 import org.apache.skywalking.apm.collector.storage.table.register.Instance;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
@@ -44,8 +40,10 @@ public class InstanceRegisterRemoteWorker extends AbstractRemoteWorker<Instance,
         super(moduleManager);
     }
 
-    @Override protected void onWork(Instance instance) throws WorkerException {
-        logger.debug("application id: {}, agentUUID: {}, register time: {}", instance.getApplicationId(), instance.getAgentUUID(), instance.getRegisterTime());
+    @Override protected void onWork(Instance instance) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("application id: {}, agentUUID: {}, register time: {}", instance.getApplicationId(), instance.getAgentUUID(), instance.getRegisterTime());
+        }
         onNext(instance);
     }
 
diff --git a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/InstanceRegisterSerialWorker.java b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/InstanceRegisterSerialWorker.java
index 1880b2b..8aa02f1 100644
--- a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/InstanceRegisterSerialWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/InstanceRegisterSerialWorker.java
@@ -19,19 +19,15 @@
 package org.apache.skywalking.apm.collector.analysis.register.provider.register;
 
 import org.apache.skywalking.apm.collector.analysis.register.define.graph.WorkerIdDefine;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorker;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorkerProvider;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerException;
+import org.apache.skywalking.apm.collector.analysis.worker.model.base.*;
 import org.apache.skywalking.apm.collector.cache.CacheModule;
 import org.apache.skywalking.apm.collector.cache.service.InstanceCacheService;
 import org.apache.skywalking.apm.collector.core.module.ModuleManager;
-import org.apache.skywalking.apm.collector.core.util.BooleanUtils;
-import org.apache.skywalking.apm.collector.core.util.Const;
+import org.apache.skywalking.apm.collector.core.util.*;
 import org.apache.skywalking.apm.collector.storage.StorageModule;
 import org.apache.skywalking.apm.collector.storage.dao.register.IInstanceRegisterDAO;
 import org.apache.skywalking.apm.collector.storage.table.register.Instance;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
@@ -53,8 +49,10 @@ public class InstanceRegisterSerialWorker extends AbstractLocalAsyncWorker<Insta
         return WorkerIdDefine.INSTANCE_REGISTER_SERIAL_WORKER;
     }
 
-    @Override protected void onWork(Instance instance) throws WorkerException {
-        logger.debug("register instance, application id: {}, agentUUID: {}", instance.getApplicationId(), instance.getAgentUUID());
+    @Override protected void onWork(Instance instance) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("register instance, application id: {}, agentUUID: {}", instance.getApplicationId(), instance.getAgentUUID());
+        }
 
         int instanceId;
         if (BooleanUtils.valueToBoolean(instance.getIsAddress())) {
diff --git a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/NetworkAddressRegisterRemoteWorker.java b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/NetworkAddressRegisterRemoteWorker.java
index 6c720fe..69c6784 100644
--- a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/NetworkAddressRegisterRemoteWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/NetworkAddressRegisterRemoteWorker.java
@@ -19,15 +19,11 @@
 package org.apache.skywalking.apm.collector.analysis.register.provider.register;
 
 import org.apache.skywalking.apm.collector.analysis.register.define.graph.WorkerIdDefine;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractRemoteWorker;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractRemoteWorkerProvider;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerException;
+import org.apache.skywalking.apm.collector.analysis.worker.model.base.*;
 import org.apache.skywalking.apm.collector.core.module.ModuleManager;
-import org.apache.skywalking.apm.collector.remote.service.RemoteSenderService;
-import org.apache.skywalking.apm.collector.remote.service.Selector;
+import org.apache.skywalking.apm.collector.remote.service.*;
 import org.apache.skywalking.apm.collector.storage.table.register.NetworkAddress;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
@@ -44,8 +40,11 @@ public class NetworkAddressRegisterRemoteWorker extends AbstractRemoteWorker<Net
         return WorkerIdDefine.NETWORK_ADDRESS_REGISTER_REMOTE_WORKER;
     }
 
-    @Override protected void onWork(NetworkAddress message) throws WorkerException {
-        logger.debug("network address: {}", message.getNetworkAddress());
+    @Override protected void onWork(NetworkAddress message) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("network address: {}", message.getNetworkAddress());
+        }
+
         onNext(message);
     }
 
diff --git a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/NetworkAddressRegisterSerialWorker.java b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/NetworkAddressRegisterSerialWorker.java
index 728a581..1b1b97f 100644
--- a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/NetworkAddressRegisterSerialWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/NetworkAddressRegisterSerialWorker.java
@@ -19,17 +19,14 @@
 package org.apache.skywalking.apm.collector.analysis.register.provider.register;
 
 import org.apache.skywalking.apm.collector.analysis.register.define.graph.WorkerIdDefine;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorker;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorkerProvider;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerException;
+import org.apache.skywalking.apm.collector.analysis.worker.model.base.*;
 import org.apache.skywalking.apm.collector.cache.CacheModule;
 import org.apache.skywalking.apm.collector.cache.service.NetworkAddressCacheService;
 import org.apache.skywalking.apm.collector.core.module.ModuleManager;
 import org.apache.skywalking.apm.collector.storage.StorageModule;
 import org.apache.skywalking.apm.collector.storage.dao.register.INetworkAddressRegisterDAO;
 import org.apache.skywalking.apm.collector.storage.table.register.NetworkAddress;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
@@ -51,8 +48,11 @@ public class NetworkAddressRegisterSerialWorker extends AbstractLocalAsyncWorker
         return WorkerIdDefine.NETWORK_ADDRESS_REGISTER_SERIAL_WORKER;
     }
 
-    @Override protected void onWork(NetworkAddress networkAddress) throws WorkerException {
-        logger.debug("register network address, address: {}", networkAddress.getNetworkAddress());
+    @Override protected void onWork(NetworkAddress networkAddress) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("register network address, address: {}", networkAddress.getNetworkAddress());
+        }
+
         if (networkAddress.getAddressId() == 0) {
             int addressId = networkAddressCacheService.getAddressId(networkAddress.getNetworkAddress());
 
diff --git a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/ServiceNameRegisterSerialWorker.java b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/ServiceNameRegisterSerialWorker.java
index 800a49a..c90ad78 100644
--- a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/ServiceNameRegisterSerialWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/register/ServiceNameRegisterSerialWorker.java
@@ -50,7 +50,10 @@ public class ServiceNameRegisterSerialWorker extends AbstractLocalAsyncWorker<Se
     }
 
     @Override protected void onWork(ServiceName serviceName) {
-        logger.debug("register service name: {}, application id: {}", serviceName.getServiceName(), serviceName.getApplicationId());
+        if (logger.isDebugEnabled()) {
+            logger.debug("register service name: {}, application id: {}", serviceName.getServiceName(), serviceName.getApplicationId());
+        }
+
         int serviceId = serviceIdCacheService.get(serviceName.getApplicationId(), serviceName.getSrcSpanType(), serviceName.getServiceName());
         if (serviceId == 0) {
             long now = System.currentTimeMillis();
diff --git a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/service/InstanceIDService.java b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/service/InstanceIDService.java
index b4cbf98..400a033 100644
--- a/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/service/InstanceIDService.java
+++ b/apm-collector/apm-collector-analysis/analysis-register/register-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/register/provider/service/InstanceIDService.java
@@ -73,7 +73,10 @@ public class InstanceIDService implements IInstanceIDService {
     }
 
     @Override public int getOrCreateByAgentUUID(int applicationId, String agentUUID, long registerTime, AgentOsInfo osInfo) {
-        logger.debug("get or getOrCreate instance id by agent UUID, application id: {}, agentUUID: {}, registerTime: {}, osInfo: {}", applicationId, agentUUID, registerTime, osInfo);
+        if (logger.isDebugEnabled()) {
+            logger.debug("get or getOrCreate instance id by agent UUID, application id: {}, agentUUID: {}, registerTime: {}, osInfo: {}", applicationId, agentUUID, registerTime, osInfo);
+        }
+
         int instanceId = getInstanceCacheService().getInstanceIdByAgentUUID(applicationId, agentUUID);
 
         if (instanceId == 0) {
@@ -95,7 +98,10 @@ public class InstanceIDService implements IInstanceIDService {
     }
 
     @Override public int getOrCreateByAddressId(int applicationId, int addressId, long registerTime) {
-        logger.debug("get or getOrCreate instance id by address id, application id: {}, address id: {}, registerTime: {}", applicationId, addressId, registerTime);
+        if (logger.isDebugEnabled()) {
+            logger.debug("get or getOrCreate instance id by address id, application id: {}, address id: {}, registerTime: {}", applicationId, addressId, registerTime);
+        }
+
         int instanceId = getInstanceCacheService().getInstanceIdByAddressId(applicationId, addressId);
 
         if (instanceId == 0) {
diff --git a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/buffer/SegmentBufferManager.java b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/buffer/SegmentBufferManager.java
index 09fc97c..75f5cc6 100644
--- a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/buffer/SegmentBufferManager.java
+++ b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/buffer/SegmentBufferManager.java
@@ -97,7 +97,10 @@ public enum SegmentBufferManager {
     }
 
     private void newDataFile() throws IOException {
-        logger.debug("getOrCreate new segment buffer file");
+        if (logger.isDebugEnabled()) {
+            logger.debug("getOrCreate new segment buffer file");
+        }
+
         String timeBucket = String.valueOf(TimeBucketUtils.INSTANCE.getSecondTimeBucket(System.currentTimeMillis()));
         String writeFileName = DATA_FILE_PREFIX + "_" + timeBucket + "." + Const.FILE_SUFFIX;
         File dataFile = new File(BufferFileConfig.BUFFER_PATH + writeFileName);
diff --git a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/buffer/SegmentBufferReader.java b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/buffer/SegmentBufferReader.java
index 79cc25e..d1ce1c4 100644
--- a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/buffer/SegmentBufferReader.java
+++ b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/buffer/SegmentBufferReader.java
@@ -109,7 +109,10 @@ public enum SegmentBufferReader {
         }
 
         for (File dataFile : dataFiles) {
-            logger.debug("Reading segment buffer data file, file name: {}", dataFile.getAbsolutePath());
+            if (logger.isDebugEnabled()) {
+                logger.debug("Reading segment buffer data file, file name: {}", dataFile.getAbsolutePath());
+            }
+
             OffsetManager.INSTANCE.setReadOffset(dataFile.getName(), 0);
             if (!read(dataFile, 0)) {
                 break;
@@ -137,7 +140,11 @@ public enum SegmentBufferReader {
 
                 final int serialized = upstreamSegment.getSerializedSize();
                 readFileOffset = readFileOffset + CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized;
-                logger.debug("read segment buffer from file: {}, offset: {}, file length: {}", readFile.getName(), readFileOffset, readFile.length());
+
+                if (logger.isDebugEnabled()) {
+                    logger.debug("read segment buffer from file: {}, offset: {}, file length: {}", readFile.getName(), readFileOffset, readFile.length());
+                }
+
                 OffsetManager.INSTANCE.setReadOffset(readFileOffset);
             }
 
diff --git a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/SegmentParse.java b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/SegmentParse.java
index 85acfc0..335cbab 100644
--- a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/SegmentParse.java
+++ b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/SegmentParse.java
@@ -41,7 +41,7 @@ public class SegmentParse {
     private static final Logger logger = LoggerFactory.getLogger(SegmentParse.class);
 
     private final ModuleManager moduleManager;
-    private List<SpanListener> spanListeners;
+    private final List<SpanListener> spanListeners;
     private final SegmentParserListenerManager listenerManager;
     private final SegmentCoreInfo segmentCoreInfo;
 
@@ -65,14 +65,19 @@ public class SegmentParse {
             SegmentDecorator segmentDecorator = new SegmentDecorator(segmentObject);
 
             if (!preBuild(traceIds, segmentDecorator)) {
-                logger.debug("This segment id exchange not success, write to buffer file, id: {}", segmentCoreInfo.getSegmentId());
+                if (logger.isDebugEnabled()) {
+                    logger.debug("This segment id exchange not success, write to buffer file, id: {}", segmentCoreInfo.getSegmentId());
+                }
 
                 if (source.equals(ISegmentParseService.Source.Agent)) {
                     writeToBufferFile(segmentCoreInfo.getSegmentId(), segment);
                 }
                 return false;
             } else {
-                logger.debug("This segment id exchange success, id: {}", segmentCoreInfo.getSegmentId());
+                if (logger.isDebugEnabled()) {
+                    logger.debug("This segment id exchange success, id: {}", segmentCoreInfo.getSegmentId());
+                }
+
                 notifyListenerToBuild();
                 buildSegment(segmentCoreInfo.getSegmentId(), segmentDecorator.toByteArray());
                 return true;
@@ -167,7 +172,10 @@ public class SegmentParse {
 
     @GraphComputingMetric(name = "/segment/parse/bufferFile/write")
     private void writeToBufferFile(String id, UpstreamSegment upstreamSegment) {
-        logger.debug("push to segment buffer write worker, id: {}", id);
+        if (logger.isDebugEnabled()) {
+            logger.debug("push to segment buffer write worker, id: {}", id);
+        }
+
         SegmentStandardization standardization = new SegmentStandardization(id);
         standardization.setUpstreamSegment(upstreamSegment);
         Graph<SegmentStandardization> graph = GraphManager.INSTANCE.findGraph(GraphIdDefine.SEGMENT_STANDARDIZATION_GRAPH_ID, SegmentStandardization.class);
diff --git a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/SegmentPersistenceWorker.java b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/SegmentPersistenceWorker.java
index 8803e2d..f811a34 100644
--- a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/SegmentPersistenceWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/SegmentPersistenceWorker.java
@@ -59,7 +59,7 @@ public class SegmentPersistenceWorker extends NonMergePersistenceWorker<Segment>
 
         @Override
         public int queueSize() {
-            return 1024;
+            return 2048;
         }
     }
 }
diff --git a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/standardization/SpanIdExchanger.java b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/standardization/SpanIdExchanger.java
index b3b0b59..0465a9e 100644
--- a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/standardization/SpanIdExchanger.java
+++ b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/standardization/SpanIdExchanger.java
@@ -62,7 +62,9 @@ public class SpanIdExchanger implements IdExchanger<SpanDecorator> {
             int componentId = componentLibraryCatalogService.getComponentId(standardBuilder.getComponent());
 
             if (componentId == 0) {
-                logger.debug("component: {} in application: {} exchange failed", standardBuilder.getComponent(), applicationId);
+                if (logger.isDebugEnabled()) {
+                    logger.debug("component: {} in application: {} exchange failed", standardBuilder.getComponent(), applicationId);
+                }
                 return false;
             } else {
                 standardBuilder.toBuilder();
@@ -75,7 +77,9 @@ public class SpanIdExchanger implements IdExchanger<SpanDecorator> {
             int peerId = networkAddressIDService.getOrCreate(standardBuilder.getPeer());
 
             if (peerId == 0) {
-                logger.debug("peer: {} in application: {} exchange failed", standardBuilder.getPeer(), applicationId);
+                if (logger.isDebugEnabled()) {
+                    logger.debug("peer: {} in application: {} exchange failed", standardBuilder.getPeer(), applicationId);
+                }
                 return false;
             } else {
                 standardBuilder.toBuilder();
@@ -93,7 +97,9 @@ public class SpanIdExchanger implements IdExchanger<SpanDecorator> {
             int operationNameId = serviceNameService.getOrCreate(applicationId, standardBuilder.getSpanTypeValue(), operationName);
 
             if (operationNameId == 0) {
-                logger.debug("service name: {} from application id: {} exchange failed", operationName, applicationId);
+                if (logger.isDebugEnabled()) {
+                    logger.debug("service name: {} from application id: {} exchange failed", operationName, applicationId);
+                }
                 return false;
             } else {
                 standardBuilder.toBuilder();
diff --git a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/test/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/service/SegmentBase64Printer.java b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/test/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/service/SegmentBase64Printer.java
index f41d248..2fdd346 100644
--- a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/test/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/service/SegmentBase64Printer.java
+++ b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/test/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/service/SegmentBase64Printer.java
@@ -35,7 +35,7 @@ public class SegmentBase64Printer {
     private static final Logger LOGGER = LoggerFactory.getLogger(SegmentBase64Printer.class);
 
     public static void main(String[] args) throws InvalidProtocolBufferException {
-        String segmentBase64 = "CgwKCgMXjPKUga3WgBsSvAEIARiF7Jq1nywgp+yatZ8sKlASDAoKAnPAqKD5rNaAGxgBIAIqDjEyNy4wLjAuMTo5MDkyOAJCFC9zZW5kTWVzc2FnZS97Y291bnR9UhQvc2VuZE1lc3NhZ2Uve2NvdW50fTocS2Fma2EvVHJhY2UtdG9waWMtMS9Db25zdW1lclgEYBt6GwoJbXEuYnJva2VyEg4xMjcuMC4wLjE6OTA5MnoZCghtcS50b3BpYxINVHJhY2UtdG9waWMtMRImEP///////////wEY/+uatZ8sILTsmrWfLDD///////////8BUAIYAiAD";
+        String segmentBase64 = "CgoKCJbf2NPCLBAQEiAQ////////////ARiV39jTwiwg2+7Y08IsMNQPWANgARIlCAEYn9/Y08IsILns2NPCLDCUyAJA////////////AVABWAJgAxInCAIQARif39jTwiwguezY08IsMJTIAkD///////////8BUAFYAmADEicIAxACGJ/f2NPCLCC57NjTwiwwlMgCQP///////////wFQAVgCYAMSJwgEEAMYn9/Y08IsILns2NPCLDCUyAJA////////////AVABWAJgAxInCAUQBBif39jTwiwguezY08IsMJTIAkD///////////8BUAFYAmADEicIBhAFGJ/f2NPCLCC57NjTwiwwlMgCQP///////////wFQAVgCYAMSJwgHEAYYn9/Y08IsILns2NPCLDCUyAJA////////////AVABWAJgAxInCAgQBxif39jTwiwg [...]
         byte[] binarySegment = Base64.getDecoder().decode(segmentBase64);
         TraceSegmentObject segmentObject = TraceSegmentObject.parseFrom(binarySegment);
 
diff --git a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/AbstractLocalAsyncWorkerProvider.java b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/AbstractLocalAsyncWorkerProvider.java
index 472f4d1..3a62831 100644
--- a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/AbstractLocalAsyncWorkerProvider.java
+++ b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/AbstractLocalAsyncWorkerProvider.java
@@ -39,7 +39,7 @@ public abstract class AbstractLocalAsyncWorkerProvider<INPUT extends QueueData,
         workerCreateListener.addWorker(localAsyncWorker);
 
         LocalAsyncWorkerRef<INPUT, OUTPUT> localAsyncWorkerRef = new LocalAsyncWorkerRef<>(localAsyncWorker);
-        DataCarrier<INPUT> dataCarrier = new DataCarrier<>(1, queueSize());
+        DataCarrier<INPUT> dataCarrier = new DataCarrier<>(1, 10000);
         localAsyncWorkerRef.setQueueEventHandler(dataCarrier);
         dataCarrier.consume(localAsyncWorkerRef, 1);
         return localAsyncWorkerRef;
diff --git a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/AbstractWorker.java b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/AbstractWorker.java
index e3b697b..1c0d9a0 100644
--- a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/AbstractWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/AbstractWorker.java
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class AbstractWorker<INPUT, OUTPUT> implements NodeProcessor<INPUT, OUTPUT> {
 
-    private final Logger logger = LoggerFactory.getLogger(AbstractWorker.class);
+    private static final Logger logger = LoggerFactory.getLogger(AbstractWorker.class);
 
     private final ModuleManager moduleManager;
 
diff --git a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/LocalAsyncWorkerRef.java b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/LocalAsyncWorkerRef.java
index 7da4828..7dcc19d 100644
--- a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/LocalAsyncWorkerRef.java
+++ b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/LocalAsyncWorkerRef.java
@@ -18,23 +18,21 @@
 
 package org.apache.skywalking.apm.collector.analysis.worker.model.base;
 
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
 import org.apache.skywalking.apm.collector.core.annotations.trace.BatchParameter;
 import org.apache.skywalking.apm.collector.core.data.QueueData;
 import org.apache.skywalking.apm.collector.core.graph.NodeProcessor;
 import org.apache.skywalking.apm.collector.core.queue.EndOfBatchContext;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
  */
 public class LocalAsyncWorkerRef<INPUT extends QueueData, OUTPUT extends QueueData> extends WorkerRef<INPUT, OUTPUT> implements IConsumer<INPUT> {
 
-    private final Logger logger = LoggerFactory.getLogger(LocalAsyncWorkerRef.class);
+    private static final Logger logger = LoggerFactory.getLogger(LocalAsyncWorkerRef.class);
 
     private DataCarrier<INPUT> dataCarrier;
 
diff --git a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/RemoteWorkerRef.java b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/RemoteWorkerRef.java
index 28e3847..78410c4 100644
--- a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/RemoteWorkerRef.java
+++ b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/base/RemoteWorkerRef.java
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
  */
 public class RemoteWorkerRef<INPUT extends RemoteData, OUTPUT extends RemoteData> extends WorkerRef<INPUT, OUTPUT> {
 
-    private final Logger logger = LoggerFactory.getLogger(RemoteWorkerRef.class);
+    private static final Logger logger = LoggerFactory.getLogger(RemoteWorkerRef.class);
 
     private final AbstractRemoteWorker<INPUT, OUTPUT> remoteWorker;
     private final RemoteSenderService remoteSenderService;
diff --git a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/AggregationWorker.java b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/AggregationWorker.java
index 93498f8..8639b78 100644
--- a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/AggregationWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/AggregationWorker.java
@@ -18,22 +18,20 @@
 
 package org.apache.skywalking.apm.collector.analysis.worker.model.impl;
 
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorker;
-import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerException;
+import org.apache.skywalking.apm.collector.analysis.worker.model.base.*;
 import org.apache.skywalking.apm.collector.analysis.worker.model.impl.data.MergeDataCache;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
 import org.apache.skywalking.apm.collector.core.module.ModuleManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
  */
 public abstract class AggregationWorker<INPUT extends StreamData, OUTPUT extends StreamData> extends AbstractLocalAsyncWorker<INPUT, OUTPUT> {
 
-    private final Logger logger = LoggerFactory.getLogger(AggregationWorker.class);
+    private static final Logger logger = LoggerFactory.getLogger(AggregationWorker.class);
 
-    private MergeDataCache<OUTPUT> mergeDataCache;
+    private final MergeDataCache<OUTPUT> mergeDataCache;
     private int messageNum;
 
     public AggregationWorker(ModuleManager moduleManager) {
@@ -52,13 +50,10 @@ public abstract class AggregationWorker<INPUT extends StreamData, OUTPUT extends
         messageNum++;
         aggregate(output);
 
-        if (messageNum >= 100) {
+        if (messageNum >= 1000 || message.getEndOfBatchContext().isEndOfBatch()) {
             sendToNext();
             messageNum = 0;
         }
-        if (message.getEndOfBatchContext().isEndOfBatch()) {
-            sendToNext();
-        }
     }
 
     private void sendToNext() throws WorkerException {
@@ -70,8 +65,12 @@ public abstract class AggregationWorker<INPUT extends StreamData, OUTPUT extends
                 throw new WorkerException(e.getMessage(), e);
             }
         }
+
         mergeDataCache.getLast().collection().forEach((String id, OUTPUT data) -> {
-            logger.debug(data.toString());
+            if (logger.isDebugEnabled()) {
+                logger.debug(data.toString());
+            }
+
             onNext(data);
         });
         mergeDataCache.finishReadingLast();
diff --git a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/MergePersistenceWorker.java b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/MergePersistenceWorker.java
index 2fc7d80..ae4a14e 100644
--- a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/MergePersistenceWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/MergePersistenceWorker.java
@@ -32,7 +32,7 @@ import static java.util.Objects.nonNull;
  */
 public abstract class MergePersistenceWorker<INPUT_AND_OUTPUT extends StreamData> extends PersistenceWorker<INPUT_AND_OUTPUT, MergeDataCollection<INPUT_AND_OUTPUT>> {
 
-    private final Logger logger = LoggerFactory.getLogger(MergePersistenceWorker.class);
+    private static final Logger logger = LoggerFactory.getLogger(MergePersistenceWorker.class);
 
     private final MergeDataCache<INPUT_AND_OUTPUT> mergeDataCache;
 
@@ -46,22 +46,21 @@ public abstract class MergePersistenceWorker<INPUT_AND_OUTPUT extends StreamData
     }
 
     @Override protected List<Object> prepareBatch(MergeDataCollection<INPUT_AND_OUTPUT> collection) {
-        List<Object> insertBatchCollection = new LinkedList<>();
-        List<Object> updateBatchCollection = new LinkedList<>();
+        List<Object> batchCollection = new LinkedList<>();
         collection.collection().forEach((id, data) -> {
             if (needMergeDBData()) {
                 INPUT_AND_OUTPUT dbData = persistenceDAO().get(id);
                 if (nonNull(dbData)) {
                     dbData.mergeAndFormulaCalculateData(data);
                     try {
-                        updateBatchCollection.add(persistenceDAO().prepareBatchUpdate(dbData));
+                        batchCollection.add(persistenceDAO().prepareBatchUpdate(dbData));
                         onNext(dbData);
                     } catch (Throwable t) {
                         logger.error(t.getMessage(), t);
                     }
                 } else {
                     try {
-                        insertBatchCollection.add(persistenceDAO().prepareBatchInsert(data));
+                        batchCollection.add(persistenceDAO().prepareBatchInsert(data));
                         onNext(data);
                     } catch (Throwable t) {
                         logger.error(t.getMessage(), t);
@@ -69,7 +68,7 @@ public abstract class MergePersistenceWorker<INPUT_AND_OUTPUT extends StreamData
                 }
             } else {
                 try {
-                    insertBatchCollection.add(persistenceDAO().prepareBatchInsert(data));
+                    batchCollection.add(persistenceDAO().prepareBatchInsert(data));
                     onNext(data);
                 } catch (Throwable t) {
                     logger.error(t.getMessage(), t);
@@ -77,8 +76,7 @@ public abstract class MergePersistenceWorker<INPUT_AND_OUTPUT extends StreamData
             }
         });
 
-        insertBatchCollection.addAll(updateBatchCollection);
-        return insertBatchCollection;
+        return batchCollection;
     }
 
     @Override protected void cacheData(INPUT_AND_OUTPUT input) {
diff --git a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/NonMergePersistenceWorker.java b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/NonMergePersistenceWorker.java
index 86e3eac..767e092 100644
--- a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/NonMergePersistenceWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/NonMergePersistenceWorker.java
@@ -30,7 +30,7 @@ import org.slf4j.*;
  */
 public abstract class NonMergePersistenceWorker<INPUT_AND_OUTPUT extends StreamData> extends PersistenceWorker<INPUT_AND_OUTPUT, NonMergeDataCollection<INPUT_AND_OUTPUT>> {
 
-    private final Logger logger = LoggerFactory.getLogger(NonMergePersistenceWorker.class);
+    private static final Logger logger = LoggerFactory.getLogger(NonMergePersistenceWorker.class);
 
     private final NonMergeDataCache<INPUT_AND_OUTPUT> mergeDataCache;
 
@@ -50,7 +50,7 @@ public abstract class NonMergePersistenceWorker<INPUT_AND_OUTPUT extends StreamD
     }
 
     @Override protected List<Object> prepareBatch(NonMergeDataCollection<INPUT_AND_OUTPUT> collection) {
-        List<Object> insertBatchCollection = new LinkedList<>();
+        List<Object> insertBatchCollection = new ArrayList<>(collection.collection().size());
         collection.collection().forEach(data -> {
             try {
                 insertBatchCollection.add(persistenceDAO().prepareBatchInsert(data));
diff --git a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/PersistenceWorker.java b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/PersistenceWorker.java
index 056f2cc..f06efa9 100644
--- a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/PersistenceWorker.java
+++ b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/model/impl/PersistenceWorker.java
@@ -36,7 +36,7 @@ import org.slf4j.*;
  */
 public abstract class PersistenceWorker<INPUT_AND_OUTPUT extends StreamData, COLLECTION extends Collection> extends AbstractLocalAsyncWorker<INPUT_AND_OUTPUT, INPUT_AND_OUTPUT> {
 
-    private final Logger logger = LoggerFactory.getLogger(PersistenceWorker.class);
+    private static final Logger logger = LoggerFactory.getLogger(PersistenceWorker.class);
 
     private final IBatchDAO batchDAO;
     private final int blockBatchPersistenceSize;
diff --git a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/timer/PersistenceTimer.java b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/timer/PersistenceTimer.java
index 7aac165..e37deb6 100644
--- a/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/timer/PersistenceTimer.java
+++ b/apm-collector/apm-collector-analysis/analysis-worker-model/src/main/java/org/apache/skywalking/apm/collector/analysis/worker/timer/PersistenceTimer.java
@@ -62,29 +62,42 @@ public enum PersistenceTimer {
 
     @SuppressWarnings("unchecked")
     private void extractDataAndSave(IBatchDAO batchDAO, List<PersistenceWorker> persistenceWorkers) {
-        logger.debug("Extract data and save");
+        if (logger.isDebugEnabled()) {
+            logger.debug("Extract data and save");
+        }
+
         long startTime = System.currentTimeMillis();
         try {
             List batchAllCollection = new LinkedList();
             persistenceWorkers.forEach((PersistenceWorker worker) -> {
-                logger.debug("extract {} worker data and save", worker.getClass().getName());
+                if (logger.isDebugEnabled()) {
+                    logger.debug("extract {} worker data and save", worker.getClass().getName());
+                }
+
                 if (worker.flushAndSwitch()) {
                     List<?> batchCollection = worker.buildBatchCollection();
-                    logger.debug("extract {} worker data size: {}", worker.getClass().getName(), batchCollection.size());
+
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("extract {} worker data size: {}", worker.getClass().getName(), batchCollection.size());
+                    }
                     batchAllCollection.addAll(batchCollection);
                 }
             });
 
+            if (debug) {
+                logger.info("build batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
+            }
             batchDAO.batchPersistence(batchAllCollection);
         } catch (Throwable e) {
             logger.error(e.getMessage(), e);
         } finally {
-            logger.debug("persistence data save finish");
+            if (logger.isDebugEnabled()) {
+                logger.debug("persistence data save finish");
+            }
         }
 
         if (debug) {
-            long endTime = System.currentTimeMillis();
-            logger.info("batch persistence duration: {} ms", endTime - startTime);
+            logger.info("batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
         }
     }
 }
diff --git a/apm-collector/apm-collector-boot/src/main/resources/application.yml b/apm-collector/apm-collector-boot/src/main/resources/application.yml
index 05f9e61..7732967 100644
--- a/apm-collector/apm-collector-boot/src/main/resources/application.yml
+++ b/apm-collector/apm-collector-boot/src/main/resources/application.yml
@@ -76,6 +76,11 @@ storage:
     indexShardsNumber: 2
     indexReplicasNumber: 0
     highPerformanceMode: true
+    # Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
+    bulkActions: 2000 # Execute the bulk every 2000 requests
+    bulkSize: 20 # flush the bulk every 20mb
+    flushInterval: 10 # flush the bulk every 10 seconds whatever the number of requests
+    concurrentRequests: 2 # the number of concurrent requests
     # Set a timeout on metric data. After the timeout has expired, the metric data will automatically be deleted.
     traceDataTTL: 90 # Unit is minute
     minuteMetricDataTTL: 90 # Unit is minute
@@ -106,5 +111,4 @@ configuration:
 #  default:
 #    host: localhost
 #    port: 9411
-#    contextPath: /
-#
\ No newline at end of file
+#    contextPath: /
\ No newline at end of file
diff --git a/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/apache/skywalking/apm/collector/cache/guava/service/ServiceIdCacheGuavaService.java b/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/apache/skywalking/apm/collector/cache/guava/service/ServiceIdCacheGuavaService.java
index 3cae234..95226c2 100644
--- a/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/apache/skywalking/apm/collector/cache/guava/service/ServiceIdCacheGuavaService.java
+++ b/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/apache/skywalking/apm/collector/cache/guava/service/ServiceIdCacheGuavaService.java
@@ -37,7 +37,7 @@ public class ServiceIdCacheGuavaService implements ServiceIdCacheService {
 
     private final Logger logger = LoggerFactory.getLogger(ServiceIdCacheGuavaService.class);
 
-    private final Cache<String, Integer> serviceIdCache = CacheBuilder.newBuilder().maximumSize(10000).build();
+    private final Cache<String, Integer> serviceIdCache = CacheBuilder.newBuilder().maximumSize(1000000).build();
 
     private final ModuleManager moduleManager;
     private IServiceNameCacheDAO serviceNameCacheDAO;
diff --git a/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/apache/skywalking/apm/collector/cache/guava/service/ServiceNameCacheGuavaService.java b/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/apache/skywalking/apm/collector/cache/guava/service/ServiceNameCacheGuavaService.java
index c587815..37deb09 100644
--- a/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/apache/skywalking/apm/collector/cache/guava/service/ServiceNameCacheGuavaService.java
+++ b/apm-collector/apm-collector-cache/collector-cache-guava-provider/src/main/java/org/apache/skywalking/apm/collector/cache/guava/service/ServiceNameCacheGuavaService.java
@@ -18,18 +18,15 @@
 
 package org.apache.skywalking.apm.collector.cache.guava.service;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.*;
 import org.apache.skywalking.apm.collector.cache.service.ServiceNameCacheService;
 import org.apache.skywalking.apm.collector.core.module.ModuleManager;
 import org.apache.skywalking.apm.collector.storage.StorageModule;
 import org.apache.skywalking.apm.collector.storage.dao.cache.IServiceNameCacheDAO;
 import org.apache.skywalking.apm.collector.storage.table.register.ServiceName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
 
-import static java.util.Objects.isNull;
-import static java.util.Objects.nonNull;
+import static java.util.Objects.*;
 
 /**
  * @author peng-yongsheng
@@ -38,7 +35,7 @@ public class ServiceNameCacheGuavaService implements ServiceNameCacheService {
 
     private final Logger logger = LoggerFactory.getLogger(ServiceNameCacheGuavaService.class);
 
-    private final Cache<Integer, ServiceName> serviceCache = CacheBuilder.newBuilder().maximumSize(10000).build();
+    private final Cache<Integer, ServiceName> serviceCache = CacheBuilder.newBuilder().maximumSize(1000000).build();
 
     private final ModuleManager moduleManager;
     private IServiceNameCacheDAO serviceNameCacheDAO;
@@ -66,6 +63,8 @@ public class ServiceNameCacheGuavaService implements ServiceNameCacheService {
             serviceName = getServiceNameCacheDAO().get(serviceId);
             if (nonNull(serviceName)) {
                 serviceCache.put(serviceId, serviceName);
+            } else {
+                logger.warn("Service id {} is not in cache and persistent storage.", serviceId);
             }
         }
 
diff --git a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
index 8499a14..20a982c 100644
--- a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
+++ b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
@@ -18,24 +18,18 @@
 
 package org.apache.skywalking.apm.collector.client.elasticsearch;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.LinkedList;
-import java.util.List;
+import java.net.*;
+import java.util.*;
 import java.util.function.Consumer;
-import org.apache.skywalking.apm.collector.client.Client;
-import org.apache.skywalking.apm.collector.client.ClientException;
-import org.apache.skywalking.apm.collector.client.NameSpace;
+import org.apache.skywalking.apm.collector.client.*;
 import org.apache.skywalking.apm.collector.core.data.CommonTable;
-import org.apache.skywalking.apm.collector.core.util.Const;
-import org.apache.skywalking.apm.collector.core.util.StringUtils;
+import org.apache.skywalking.apm.collector.core.util.*;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
 import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
 import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.get.GetRequestBuilder;
-import org.elasticsearch.action.get.MultiGetRequestBuilder;
+import org.elasticsearch.action.get.*;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.update.UpdateRequestBuilder;
@@ -45,11 +39,9 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.reindex.DeleteByQueryAction;
-import org.elasticsearch.index.reindex.DeleteByQueryRequestBuilder;
+import org.elasticsearch.index.reindex.*;
 import org.elasticsearch.transport.client.PreBuiltTransportClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/cache/Window.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/cache/Window.java
index 5e8dd4d..9790526 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/cache/Window.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/cache/Window.java
@@ -67,7 +67,7 @@ public abstract class Window<WINDOW_COLLECTION extends Collection> {
         }
     }
 
-    protected WINDOW_COLLECTION getCurrent() {
+    private WINDOW_COLLECTION getCurrent() {
         return pointer;
     }
 
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/StreamData.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/StreamData.java
index ece2da9..0ee3a6b 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/StreamData.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/StreamData.java
@@ -26,6 +26,16 @@ import org.apache.skywalking.apm.collector.core.queue.EndOfBatchContext;
  */
 public abstract class StreamData extends AbstractData implements QueueData {
 
+    private static final ByteColumn[] BYTE_COLUMNS = {};
+
+    private static final StringListColumn[] STRING_LIST_COLUMNS = {};
+
+    private static final LongListColumn[] LONG_LIST_COLUMNS = {};
+
+    private static final IntegerListColumn[] INTEGER_LIST_COLUMNS = {};
+
+    private static final DoubleListColumn[] DOUBLE_LIST_COLUMNS = {};
+
     private EndOfBatchContext endOfBatchContext;
 
     @Override public final EndOfBatchContext getEndOfBatchContext() {
@@ -41,18 +51,18 @@ public abstract class StreamData extends AbstractData implements QueueData {
         DoubleColumn[] doubleColumns, StringListColumn[] stringListColumns,
         LongListColumn[] longListColumns,
         IntegerListColumn[] integerListColumns, DoubleListColumn[] doubleListColumns) {
-        super(stringColumns, longColumns, integerColumns, doubleColumns, new ByteColumn[0], stringListColumns, longListColumns, integerListColumns, doubleListColumns);
+        super(stringColumns, longColumns, integerColumns, doubleColumns, BYTE_COLUMNS, stringListColumns, longListColumns, integerListColumns, doubleListColumns);
     }
 
     public StreamData(StringColumn[] stringColumns, LongColumn[] longColumns,
         IntegerColumn[] integerColumns, DoubleColumn[] doubleColumns) {
-        super(stringColumns, longColumns, integerColumns, doubleColumns, new ByteColumn[0], new StringListColumn[0], new LongListColumn[0], new IntegerListColumn[0], new DoubleListColumn[0]);
+        super(stringColumns, longColumns, integerColumns, doubleColumns, BYTE_COLUMNS, STRING_LIST_COLUMNS, LONG_LIST_COLUMNS, INTEGER_LIST_COLUMNS, DOUBLE_LIST_COLUMNS);
     }
 
     public StreamData(StringColumn[] stringColumns, LongColumn[] longColumns,
         IntegerColumn[] integerColumns, DoubleColumn[] doubleColumns,
         ByteColumn[] byteColumns) {
-        super(stringColumns, longColumns, integerColumns, doubleColumns, byteColumns, new StringListColumn[0], new LongListColumn[0], new IntegerListColumn[0], new DoubleListColumn[0]);
+        super(stringColumns, longColumns, integerColumns, doubleColumns, byteColumns, STRING_LIST_COLUMNS, LONG_LIST_COLUMNS, INTEGER_LIST_COLUMNS, DOUBLE_LIST_COLUMNS);
     }
 
     @Override public final String selectKey() {
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/NonMergeOperation.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/NonMergeOperation.java
index d62d11f..8a5f017 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/NonMergeOperation.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/NonMergeOperation.java
@@ -24,6 +24,7 @@ import org.apache.skywalking.apm.collector.core.data.*;
  * @author peng-yongsheng
  */
 public class NonMergeOperation implements MergeOperation {
+
     @Override public String operate(String newValue, String oldValue) {
         return oldValue;
     }
diff --git a/apm-collector/apm-collector-instrument/src/main/java/org/apache/skywalking/apm/collector/instrument/MetricTree.java b/apm-collector/apm-collector-instrument/src/main/java/org/apache/skywalking/apm/collector/instrument/MetricTree.java
index 63e2e6d..1500b84 100644
--- a/apm-collector/apm-collector-instrument/src/main/java/org/apache/skywalking/apm/collector/instrument/MetricTree.java
+++ b/apm-collector/apm-collector-instrument/src/main/java/org/apache/skywalking/apm/collector/instrument/MetricTree.java
@@ -20,14 +20,10 @@ package org.apache.skywalking.apm.collector.instrument;
 
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Method;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.*;
+import java.util.concurrent.*;
 import org.apache.skywalking.apm.collector.core.annotations.trace.BatchParameter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
 
 /**
  * @author wusheng, peng-yongsheng
@@ -61,8 +57,8 @@ public enum MetricTree implements Runnable {
             logBuffer.append("##################################################################################################################").append(lineSeparator);
             logBuffer.append("#                                             Collector Service Report                                           #").append(lineSeparator);
             logBuffer.append("##################################################################################################################").append(lineSeparator);
-            metrics.forEach((MetricNode metric) -> metric.toOutput(new ReportWriter() {
 
+            metrics.forEach((MetricNode metric) -> metric.toOutput(new ReportWriter() {
                 @Override public void writeMetricName(String name) {
                     logBuffer.append(name).append(lineSeparator);
                 }
diff --git a/apm-collector/apm-collector-instrument/src/main/java/org/apache/skywalking/apm/collector/instrument/ServiceMetricTracing.java b/apm-collector/apm-collector-instrument/src/main/java/org/apache/skywalking/apm/collector/instrument/ServiceMetricTracing.java
index 29b6af9..add628e 100644
--- a/apm-collector/apm-collector-instrument/src/main/java/org/apache/skywalking/apm/collector/instrument/ServiceMetricTracing.java
+++ b/apm-collector/apm-collector-instrument/src/main/java/org/apache/skywalking/apm/collector/instrument/ServiceMetricTracing.java
@@ -19,19 +19,15 @@
 package org.apache.skywalking.apm.collector.instrument;
 
 import java.lang.reflect.Method;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import net.bytebuddy.implementation.bind.annotation.AllArguments;
-import net.bytebuddy.implementation.bind.annotation.Origin;
-import net.bytebuddy.implementation.bind.annotation.RuntimeType;
-import net.bytebuddy.implementation.bind.annotation.SuperCall;
-import net.bytebuddy.implementation.bind.annotation.This;
+import java.util.concurrent.*;
+import net.bytebuddy.implementation.bind.annotation.*;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
 
 /**
  * @author wu-sheng
  */
 public class ServiceMetricTracing {
+
     private volatile ConcurrentHashMap<Method, ServiceMetric> metrics = new ConcurrentHashMap<>();
 
     ServiceMetricTracing() {
diff --git a/apm-collector/apm-collector-instrument/src/main/java/org/apache/skywalking/apm/collector/instrument/tools/ReportFormatter.java b/apm-collector/apm-collector-instrument/src/main/java/org/apache/skywalking/apm/collector/instrument/tools/ReportFormatter.java
index 595b5f1..7489510 100644
--- a/apm-collector/apm-collector-instrument/src/main/java/org/apache/skywalking/apm/collector/instrument/tools/ReportFormatter.java
+++ b/apm-collector/apm-collector-instrument/src/main/java/org/apache/skywalking/apm/collector/instrument/tools/ReportFormatter.java
@@ -18,10 +18,8 @@
 
 package org.apache.skywalking.apm.collector.instrument.tools;
 
-import java.util.LinkedHashMap;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.*;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
@@ -36,20 +34,13 @@ class ReportFormatter {
         logger.info(System.lineSeparator() + "Formatted report: ");
 
         report.getMetrics().forEach(metric -> {
-            String[] subMetricNames = metric.getMetricName().split("/");
-
-            String metricName = "";
-            for (String subMetricName : subMetricNames) {
-                if (subMetricName != null && !subMetricName.equals("")) {
-                    metricName = metricName + "/" + subMetricName;
-
-                    if (!metricMap.containsKey(metricName)) {
-                        Metric newMetric = new Metric();
-                        newMetric.setMetricName(metricName);
-                        metricMap.put(metricName, newMetric);
-                    }
-                    metricMap.get(metricName).merge(metric);
-                }
+            if (metricMap.containsKey(metric.getMetricName())) {
+                Metric existMetric = metricMap.get(metric.getMetricName());
+                existMetric.setTotal(existMetric.getTotal() + metric.getTotal());
+                existMetric.setCalls(existMetric.getCalls() + metric.getCalls());
+                existMetric.setAvg(existMetric.getTotal() / existMetric.getCalls());
+            } else {
+                metricMap.put(metric.getMetricName(), metric);
             }
         });
 
diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/service/selector/ForeverFirstSelector.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/service/selector/ForeverFirstSelector.java
index 98460f9..86d0a6e 100644
--- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/service/selector/ForeverFirstSelector.java
+++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/service/selector/ForeverFirstSelector.java
@@ -32,7 +32,9 @@ public class ForeverFirstSelector implements RemoteClientSelector {
     private static final Logger logger = LoggerFactory.getLogger(ForeverFirstSelector.class);
 
     @Override public RemoteClient select(List<RemoteClient> clients, RemoteData remoteData) {
-        logger.debug("clients size: {}", clients.size());
+        if (logger.isDebugEnabled()) {
+            logger.debug("clients size: {}", clients.size());
+        }
         return clients.get(0);
     }
 }
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IPersistenceDAO.java
index 723e757..fcf1af4 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IPersistenceDAO.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.apm.collector.storage.base.dao;
 
+import java.io.IOException;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
 
 /**
@@ -27,9 +28,9 @@ public interface IPersistenceDAO<INSERT, UPDATE, STREAM_DATA extends StreamData>
 
     STREAM_DATA get(String id);
 
-    INSERT prepareBatchInsert(STREAM_DATA data);
+    INSERT prepareBatchInsert(STREAM_DATA data) throws IOException;
 
-    UPDATE prepareBatchUpdate(STREAM_DATA data);
+    UPDATE prepareBatchUpdate(STREAM_DATA data) throws IOException;
 
     void deleteHistory(Long timeBucketBefore);
 }
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarm.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarm.java
index af918f3..78a8449 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarm.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarm.java
@@ -43,8 +43,11 @@ public class ApplicationAlarm extends StreamData implements Alarm {
         new IntegerColumn(ApplicationAlarmTable.APPLICATION_ID, new NonMergeOperation()),
     };
 
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public ApplicationAlarm() {
-        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarmList.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarmList.java
index d9627cc..9c4aaad 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarmList.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarmList.java
@@ -44,8 +44,11 @@ public class ApplicationAlarmList extends StreamData {
         new IntegerColumn(ApplicationAlarmListTable.APPLICATION_ID, new NonMergeOperation()),
     };
 
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public ApplicationAlarmList() {
-        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarm.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarm.java
index 8485720..a48b3a5 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarm.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarm.java
@@ -44,8 +44,11 @@ public class ApplicationReferenceAlarm extends StreamData implements Alarm {
         new IntegerColumn(ApplicationReferenceAlarmTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
     };
 
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public ApplicationReferenceAlarm() {
-        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarmList.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarmList.java
index 6ff140a..2349761 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarmList.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarmList.java
@@ -44,8 +44,11 @@ public class ApplicationReferenceAlarmList extends StreamData {
         new IntegerColumn(ApplicationReferenceAlarmListTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
     };
 
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public ApplicationReferenceAlarmList() {
-        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarm.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarm.java
index cd3ab1d..1643bdc 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarm.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarm.java
@@ -44,8 +44,11 @@ public class InstanceAlarm extends StreamData implements Alarm {
         new IntegerColumn(InstanceAlarmTable.INSTANCE_ID, new NonMergeOperation()),
     };
 
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public InstanceAlarm() {
-        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarmList.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarmList.java
index ef8f4bc..7acc3bc 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarmList.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarmList.java
@@ -44,8 +44,11 @@ public class InstanceAlarmList extends StreamData {
         new IntegerColumn(InstanceAlarmListTable.INSTANCE_ID, new NonMergeOperation()),
     };
 
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public InstanceAlarmList() {
-        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarm.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarm.java
index ca4a883..d94b97d 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarm.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarm.java
@@ -46,8 +46,11 @@ public class InstanceReferenceAlarm extends StreamData implements Alarm {
         new IntegerColumn(InstanceReferenceAlarmTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
     };
 
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public InstanceReferenceAlarm() {
-        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarmList.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarmList.java
index 4814a0b..c6e38da 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarmList.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarmList.java
@@ -46,8 +46,11 @@ public class InstanceReferenceAlarmList extends StreamData {
         new IntegerColumn(InstanceReferenceAlarmListTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
     };
 
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public InstanceReferenceAlarmList() {
-        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarm.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarm.java
index f3dc1dc..12220f6 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarm.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarm.java
@@ -45,8 +45,11 @@ public class ServiceAlarm extends StreamData implements Alarm {
         new IntegerColumn(ServiceAlarmTable.SERVICE_ID, new NonMergeOperation()),
     };
 
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public ServiceAlarm() {
-        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarmList.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarmList.java
index 98bb5aa..f60cc49 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarmList.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarmList.java
@@ -45,8 +45,11 @@ public class ServiceAlarmList extends StreamData {
         new IntegerColumn(ServiceAlarmListTable.SERVICE_ID, new NonMergeOperation()),
     };
 
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public ServiceAlarmList() {
-        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarm.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarm.java
index b4483f8..1d070d4 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarm.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarm.java
@@ -48,8 +48,11 @@ public class ServiceReferenceAlarm extends StreamData implements Alarm {
         new IntegerColumn(ServiceReferenceAlarmTable.BEHIND_SERVICE_ID, new NonMergeOperation()),
     };
 
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public ServiceReferenceAlarm() {
-        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarmList.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarmList.java
index d42b240..ead2316 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarmList.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarmList.java
@@ -48,8 +48,11 @@ public class ServiceReferenceAlarmList extends StreamData {
         new IntegerColumn(ServiceReferenceAlarmListTable.BEHIND_SERVICE_ID, new NonMergeOperation()),
     };
 
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public ServiceReferenceAlarmList() {
-        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationComponent.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationComponent.java
index 31c44a3..7bca4bd 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationComponent.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationComponent.java
@@ -42,8 +42,11 @@ public class ApplicationComponent extends StreamData {
         new IntegerColumn(ApplicationComponentTable.APPLICATION_ID, new CoverMergeOperation()),
     };
 
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public ApplicationComponent() {
-        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMapping.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMapping.java
index 811b051..cbce18f 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMapping.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMapping.java
@@ -42,8 +42,11 @@ public class ApplicationMapping extends StreamData {
         new IntegerColumn(ApplicationMappingTable.MAPPING_APPLICATION_ID, new CoverMergeOperation()),
     };
 
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public ApplicationMapping() {
-        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMetric.java
index 694ddef..2c16f27 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMetric.java
@@ -62,8 +62,11 @@ public class ApplicationMetric extends StreamData implements Metric {
         new IntegerColumn(ApplicationMetricTable.APPLICATION_ID, new NonMergeOperation()),
     };
 
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public ApplicationMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationReferenceMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationReferenceMetric.java
index e0f994d..ec7ef4d 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationReferenceMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationReferenceMetric.java
@@ -64,8 +64,11 @@ public class ApplicationReferenceMetric extends StreamData implements Metric {
         new IntegerColumn(ApplicationReferenceMetricTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
     };
 
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public ApplicationReferenceMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/GlobalTrace.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/GlobalTrace.java
index 44bb44c..7acc38d 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/GlobalTrace.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/GlobalTrace.java
@@ -37,8 +37,14 @@ public class GlobalTrace extends StreamData {
         new LongColumn(GlobalTraceTable.TIME_BUCKET, new CoverMergeOperation()),
     };
 
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+    };
+
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public GlobalTrace() {
-        super(STRING_COLUMNS, LONG_COLUMNS, new IntegerColumn[0], new DoubleColumn[0]);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/ResponseTimeDistribution.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/ResponseTimeDistribution.java
index 1974509..688899e 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/ResponseTimeDistribution.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/ResponseTimeDistribution.java
@@ -44,8 +44,11 @@ public class ResponseTimeDistribution extends StreamData {
         new IntegerColumn(ResponseTimeDistributionTable.STEP, new NonMergeOperation()),
     };
 
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public ResponseTimeDistribution() {
-        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMapping.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMapping.java
index 5183663..3982b24 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMapping.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMapping.java
@@ -43,8 +43,11 @@ public class InstanceMapping extends StreamData {
         new IntegerColumn(InstanceMappingTable.ADDRESS_ID, new CoverMergeOperation()),
     };
 
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public InstanceMapping() {
-        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMetric.java
index 8ffe0d8..0d5850e 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMetric.java
@@ -60,8 +60,11 @@ public class InstanceMetric extends StreamData implements Metric {
         new IntegerColumn(InstanceMetricTable.INSTANCE_ID, new CoverMergeOperation()),
     };
 
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public InstanceMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java
index 822d8e7..471a174 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java
@@ -62,8 +62,11 @@ public class InstanceReferenceMetric extends StreamData implements Metric {
         new IntegerColumn(InstanceReferenceMetricTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
     };
 
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public InstanceReferenceMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/GCMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/GCMetric.java
index 76d4cc6..ec50077 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/GCMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/GCMetric.java
@@ -44,8 +44,11 @@ public class GCMetric extends StreamData {
         new IntegerColumn(GCMetricTable.PHRASE, new CoverMergeOperation()),
     };
 
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public GCMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryMetric.java
index b37514a..9c5e6ea 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryMetric.java
@@ -46,8 +46,11 @@ public class MemoryMetric extends StreamData {
         new IntegerColumn(MemoryMetricTable.IS_HEAP, new CoverMergeOperation()),
     };
 
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public MemoryMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryPoolMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryPoolMetric.java
index 5418136..e9caf53 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryPoolMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryPoolMetric.java
@@ -46,8 +46,11 @@ public class MemoryPoolMetric extends StreamData {
         new IntegerColumn(MemoryPoolMetricTable.POOL_TYPE, new CoverMergeOperation()),
     };
 
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public MemoryPoolMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Application.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Application.java
index 928d032..38728da 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Application.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Application.java
@@ -39,8 +39,14 @@ public class Application extends StreamData {
         new IntegerColumn(ApplicationTable.IS_ADDRESS, new CoverMergeOperation()),
     };
 
+    private static final LongColumn[] LONG_COLUMNS = {
+    };
+
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public Application() {
-        super(STRING_COLUMNS, new LongColumn[0], INTEGER_COLUMNS, new DoubleColumn[0], new StringListColumn[0], new LongListColumn[0], new IntegerListColumn[0], new DoubleListColumn[0]);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Instance.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Instance.java
index e8a78cb..78f9a36 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Instance.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Instance.java
@@ -47,8 +47,11 @@ public class Instance extends StreamData {
         new IntegerColumn(InstanceTable.IS_ADDRESS, new CoverMergeOperation()),
     };
 
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public Instance() {
-        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/NetworkAddress.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/NetworkAddress.java
index 831f113..5e248b1 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/NetworkAddress.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/NetworkAddress.java
@@ -39,9 +39,14 @@ public class NetworkAddress extends StreamData {
         new IntegerColumn(NetworkAddressTable.SERVER_TYPE, new CoverMergeOperation()),
     };
 
-    public NetworkAddress() {
-        super(STRING_COLUMNS, new LongColumn[0], INTEGER_COLUMNS, new DoubleColumn[0], new StringListColumn[0], new LongListColumn[0], new IntegerListColumn[0], new DoubleListColumn[0]);
+    private static final LongColumn[] LONG_COLUMNS = {
+    };
 
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
+    public NetworkAddress() {
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/ServiceName.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/ServiceName.java
index d04a7f0..47c2c98 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/ServiceName.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/ServiceName.java
@@ -44,8 +44,11 @@ public class ServiceName extends StreamData {
         new IntegerColumn(ServiceNameTable.SRC_SPAN_TYPE, new CoverMergeOperation()),
     };
 
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public ServiceName() {
-        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/segment/Segment.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/segment/Segment.java
index 3a411c7..81581c3 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/segment/Segment.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/segment/Segment.java
@@ -39,8 +39,14 @@ public class Segment extends StreamData {
         new ByteColumn(SegmentTable.DATA_BINARY, new CoverMergeOperation()),
     };
 
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+    };
+
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public Segment() {
-        super(STRING_COLUMNS, LONG_COLUMNS, new IntegerColumn[0], new DoubleColumn[0], BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS, BYTE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceMetric.java
index 91cdf74..e7fa9a2 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceMetric.java
@@ -61,8 +61,11 @@ public class ServiceMetric extends StreamData implements Metric {
         new IntegerColumn(ServiceMetricTable.SERVICE_ID, new NonMergeOperation()),
     };
 
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
+
     public ServiceMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceReferenceMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceReferenceMetric.java
index e644e02..d51d32a 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceReferenceMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceReferenceMetric.java
@@ -67,9 +67,11 @@ public class ServiceReferenceMetric extends StreamData implements Metric {
         new IntegerColumn(ServiceReferenceMetricTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
     };
 
-    public ServiceReferenceMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0], new StringListColumn[0], new LongListColumn[0], new IntegerListColumn[0], new DoubleListColumn[0]);
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+    };
 
+    public ServiceReferenceMetric() {
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/MetricTransformUtil.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/MetricTransformUtil.java
index 708ce12..95efd15 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/MetricTransformUtil.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/MetricTransformUtil.java
@@ -18,9 +18,10 @@
 
 package org.apache.skywalking.apm.collector.storage.es;
 
+import java.io.IOException;
 import java.util.Map;
-import org.apache.skywalking.apm.collector.storage.table.Metric;
-import org.apache.skywalking.apm.collector.storage.table.MetricColumns;
+import org.apache.skywalking.apm.collector.storage.table.*;
+import org.elasticsearch.common.xcontent.XContentBuilder;
 
 /**
  * @author peng-yongsheng
@@ -51,26 +52,26 @@ public enum MetricTransformUtil {
         target.setMqTransactionAverageDuration(((Number)source.get(MetricColumns.MQ_TRANSACTION_AVERAGE_DURATION.getName())).longValue());
     }
 
-    public void esStreamDataToEsData(Metric source, Map<String, Object> target) {
-        target.put(MetricColumns.TIME_BUCKET.getName(), source.getTimeBucket());
-        target.put(MetricColumns.SOURCE_VALUE.getName(), source.getSourceValue());
+    public void esStreamDataToEsData(Metric source, XContentBuilder target) throws IOException {
+        target.field(MetricColumns.TIME_BUCKET.getName(), source.getTimeBucket());
+        target.field(MetricColumns.SOURCE_VALUE.getName(), source.getSourceValue());
 
-        target.put(MetricColumns.TRANSACTION_CALLS.getName(), source.getTransactionCalls());
-        target.put(MetricColumns.TRANSACTION_ERROR_CALLS.getName(), source.getTransactionErrorCalls());
-        target.put(MetricColumns.TRANSACTION_DURATION_SUM.getName(), source.getTransactionDurationSum());
-        target.put(MetricColumns.TRANSACTION_ERROR_DURATION_SUM.getName(), source.getTransactionErrorDurationSum());
-        target.put(MetricColumns.TRANSACTION_AVERAGE_DURATION.getName(), source.getTransactionAverageDuration());
+        target.field(MetricColumns.TRANSACTION_CALLS.getName(), source.getTransactionCalls());
+        target.field(MetricColumns.TRANSACTION_ERROR_CALLS.getName(), source.getTransactionErrorCalls());
+        target.field(MetricColumns.TRANSACTION_DURATION_SUM.getName(), source.getTransactionDurationSum());
+        target.field(MetricColumns.TRANSACTION_ERROR_DURATION_SUM.getName(), source.getTransactionErrorDurationSum());
+        target.field(MetricColumns.TRANSACTION_AVERAGE_DURATION.getName(), source.getTransactionAverageDuration());
 
-        target.put(MetricColumns.BUSINESS_TRANSACTION_CALLS.getName(), source.getBusinessTransactionCalls());
-        target.put(MetricColumns.BUSINESS_TRANSACTION_ERROR_CALLS.getName(), source.getBusinessTransactionErrorCalls());
-        target.put(MetricColumns.BUSINESS_TRANSACTION_DURATION_SUM.getName(), source.getBusinessTransactionDurationSum());
-        target.put(MetricColumns.BUSINESS_TRANSACTION_ERROR_DURATION_SUM.getName(), source.getBusinessTransactionErrorDurationSum());
-        target.put(MetricColumns.BUSINESS_TRANSACTION_AVERAGE_DURATION.getName(), source.getBusinessTransactionAverageDuration());
+        target.field(MetricColumns.BUSINESS_TRANSACTION_CALLS.getName(), source.getBusinessTransactionCalls());
+        target.field(MetricColumns.BUSINESS_TRANSACTION_ERROR_CALLS.getName(), source.getBusinessTransactionErrorCalls());
+        target.field(MetricColumns.BUSINESS_TRANSACTION_DURATION_SUM.getName(), source.getBusinessTransactionDurationSum());
+        target.field(MetricColumns.BUSINESS_TRANSACTION_ERROR_DURATION_SUM.getName(), source.getBusinessTransactionErrorDurationSum());
+        target.field(MetricColumns.BUSINESS_TRANSACTION_AVERAGE_DURATION.getName(), source.getBusinessTransactionAverageDuration());
 
-        target.put(MetricColumns.MQ_TRANSACTION_CALLS.getName(), source.getMqTransactionCalls());
-        target.put(MetricColumns.MQ_TRANSACTION_ERROR_CALLS.getName(), source.getMqTransactionErrorCalls());
-        target.put(MetricColumns.MQ_TRANSACTION_DURATION_SUM.getName(), source.getMqTransactionDurationSum());
-        target.put(MetricColumns.MQ_TRANSACTION_ERROR_DURATION_SUM.getName(), source.getMqTransactionErrorDurationSum());
-        target.put(MetricColumns.MQ_TRANSACTION_AVERAGE_DURATION.getName(), source.getMqTransactionAverageDuration());
+        target.field(MetricColumns.MQ_TRANSACTION_CALLS.getName(), source.getMqTransactionCalls());
+        target.field(MetricColumns.MQ_TRANSACTION_ERROR_CALLS.getName(), source.getMqTransactionErrorCalls());
+        target.field(MetricColumns.MQ_TRANSACTION_DURATION_SUM.getName(), source.getMqTransactionDurationSum());
+        target.field(MetricColumns.MQ_TRANSACTION_ERROR_DURATION_SUM.getName(), source.getMqTransactionErrorDurationSum());
+        target.field(MetricColumns.MQ_TRANSACTION_AVERAGE_DURATION.getName(), source.getMqTransactionAverageDuration());
     }
 }
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsConfig.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsConfig.java
index 83b28eb..cdde086 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsConfig.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsConfig.java
@@ -33,6 +33,10 @@ public class StorageModuleEsConfig extends ElasticSearchClientConfig {
     private int hourMetricDataTTL = 36;
     private int dayMetricDataTTL = 45;
     private int monthMetricDataTTL = 18;
+    private int bulkActions = 2000;
+    private int bulkSize = 20;
+    private int flushInterval = 10;
+    private int concurrentRequests = 2;
 
     int getIndexShardsNumber() {
         return indexShardsNumber;
@@ -97,4 +101,36 @@ public class StorageModuleEsConfig extends ElasticSearchClientConfig {
     void setMonthMetricDataTTL(int monthMetricDataTTL) {
         this.monthMetricDataTTL = monthMetricDataTTL == 0 ? 18 : monthMetricDataTTL;
     }
+
+    public int getBulkActions() {
+        return bulkActions;
+    }
+
+    public void setBulkActions(int bulkActions) {
+        this.bulkActions = bulkActions == 0 ? 2000 : bulkActions;
+    }
+
+    public int getBulkSize() {
+        return bulkSize;
+    }
+
+    public void setBulkSize(int bulkSize) {
+        this.bulkSize = bulkSize == 0 ? 20 : bulkSize;
+    }
+
+    public int getFlushInterval() {
+        return flushInterval;
+    }
+
+    public void setFlushInterval(int flushInterval) {
+        this.flushInterval = flushInterval == 0 ? 10 : flushInterval;
+    }
+
+    public int getConcurrentRequests() {
+        return concurrentRequests;
+    }
+
+    public void setConcurrentRequests(int concurrentRequests) {
+        this.concurrentRequests = concurrentRequests == 0 ? 2 : concurrentRequests;
+    }
 }
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java
index a71226a..b33000a 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java
@@ -48,7 +48,7 @@ import org.apache.skywalking.apm.collector.storage.dao.rtd.*;
 import org.apache.skywalking.apm.collector.storage.dao.smp.*;
 import org.apache.skywalking.apm.collector.storage.dao.srmp.*;
 import org.apache.skywalking.apm.collector.storage.dao.ui.*;
-import org.apache.skywalking.apm.collector.storage.es.base.dao.BatchEsDAO;
+import org.apache.skywalking.apm.collector.storage.es.base.dao.BatchProcessEsDAO;
 import org.apache.skywalking.apm.collector.storage.es.base.define.ElasticSearchStorageInstaller;
 import org.apache.skywalking.apm.collector.storage.es.dao.*;
 import org.apache.skywalking.apm.collector.storage.es.dao.acp.*;
@@ -105,7 +105,7 @@ public class StorageModuleEsProvider extends ModuleProvider {
         elasticSearchClient = new ElasticSearchClient(config.getClusterName(), config.getClusterTransportSniffer(), config.getClusterNodes(), nameSpace);
 
         this.registerServiceImplementation(ITTLConfigService.class, new TTLConfigService(config));
-        this.registerServiceImplementation(IBatchDAO.class, new BatchEsDAO(elasticSearchClient));
+        this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getBulkSize(), config.getFlushInterval(), config.getConcurrentRequests()));
 
         registerCacheDAO();
         registerRegisterDAO();
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java
index af6f70d..307cc81 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/AbstractPersistenceEsDAO.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.apm.collector.storage.es.base.dao;
 
+import java.io.IOException;
 import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
@@ -25,6 +26,7 @@ import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
 import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.reindex.BulkByScrollResponse;
 import org.slf4j.*;
@@ -56,17 +58,17 @@ public abstract class AbstractPersistenceEsDAO<STREAM_DATA extends StreamData> e
         }
     }
 
-    protected abstract Map<String, Object> esStreamDataToEsData(STREAM_DATA streamData);
+    protected abstract XContentBuilder esStreamDataToEsData(STREAM_DATA streamData) throws IOException;
 
     @Override
-    public final IndexRequestBuilder prepareBatchInsert(STREAM_DATA streamData) {
-        Map<String, Object> source = esStreamDataToEsData(streamData);
+    public final IndexRequestBuilder prepareBatchInsert(STREAM_DATA streamData) throws IOException {
+        XContentBuilder source = esStreamDataToEsData(streamData);
         return getClient().prepareIndex(tableName(), streamData.getId()).setSource(source);
     }
 
     @Override
-    public final UpdateRequestBuilder prepareBatchUpdate(STREAM_DATA streamData) {
-        Map<String, Object> source = esStreamDataToEsData(streamData);
+    public final UpdateRequestBuilder prepareBatchUpdate(STREAM_DATA streamData) throws IOException {
+        XContentBuilder source = esStreamDataToEsData(streamData);
         return getClient().prepareUpdate(tableName(), streamData.getId()).setDoc(source);
     }
 
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/BatchEsDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/BatchEsDAO.java
deleted file mode 100644
index 2cd5036..0000000
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/BatchEsDAO.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.apm.collector.storage.es.base.dao;
-
-import java.util.List;
-import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
-import org.apache.skywalking.apm.collector.core.annotations.trace.BatchParameter;
-import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
-import org.apache.skywalking.apm.collector.core.util.CollectionUtils;
-import org.apache.skywalking.apm.collector.storage.base.dao.IBatchDAO;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.action.update.UpdateRequestBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author peng-yongsheng
- */
-public class BatchEsDAO extends EsDAO implements IBatchDAO {
-
-    private final Logger logger = LoggerFactory.getLogger(BatchEsDAO.class);
-
-    public BatchEsDAO(ElasticSearchClient client) {
-        super(client);
-    }
-
-    @GraphComputingMetric(name = "/persistence/batchPersistence/")
-    @Override public void batchPersistence(@BatchParameter List<?> batchCollection) {
-        if (logger.isDebugEnabled()) {
-            logger.debug("bulk data size: {}", batchCollection.size());
-        }
-        if (CollectionUtils.isNotEmpty(batchCollection)) {
-            BulkRequestBuilder bulkRequest = getClient().prepareBulk();
-
-            batchCollection.forEach(builder -> {
-                if (builder instanceof IndexRequestBuilder) {
-                    bulkRequest.add((IndexRequestBuilder)builder);
-                }
-                if (builder instanceof UpdateRequestBuilder) {
-                    bulkRequest.add((UpdateRequestBuilder)builder);
-                }
-            });
-
-            BulkResponse bulkResponse = bulkRequest.execute().actionGet();
-            if (bulkResponse.hasFailures()) {
-                logger.error(bulkResponse.buildFailureMessage());
-                for (BulkItemResponse itemResponse : bulkResponse.getItems()) {
-                    logger.error("Bulk request failure, index: {}, id: {}", itemResponse.getIndex(), itemResponse.getId());
-                }
-            }
-        }
-    }
-}
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/BatchProcessEsDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/BatchProcessEsDAO.java
new file mode 100644
index 0000000..d3af889
--- /dev/null
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/dao/BatchProcessEsDAO.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.collector.storage.es.base.dao;
+
+import java.lang.reflect.Field;
+import java.util.List;
+import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.apm.collector.core.UnexpectedException;
+import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
+import org.apache.skywalking.apm.collector.core.util.CollectionUtils;
+import org.apache.skywalking.apm.collector.storage.base.dao.IBatchDAO;
+import org.elasticsearch.action.bulk.*;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.*;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
+
+    private static final Logger logger = LoggerFactory.getLogger(BatchProcessEsDAO.class);
+
+    private BulkProcessor bulkProcessor;
+    private final int bulkActions;
+    private final int bulkSize;
+    private final int flushInterval;
+    private final int concurrentRequests;
+
+    public BatchProcessEsDAO(ElasticSearchClient client, int bulkActions, int bulkSize, int flushInterval,
+        int concurrentRequests) {
+        super(client);
+        this.bulkActions = bulkActions;
+        this.bulkSize = bulkSize;
+        this.flushInterval = flushInterval;
+        this.concurrentRequests = concurrentRequests;
+    }
+
+    @GraphComputingMetric(name = "/persistence/batchPersistence/")
+    @Override public void batchPersistence(List<?> batchCollection) {
+        if (bulkProcessor == null) {
+            this.bulkProcessor = createBulkProcessor();
+        }
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("bulk data size: {}", batchCollection.size());
+        }
+
+        if (CollectionUtils.isNotEmpty(batchCollection)) {
+            batchCollection.forEach(builder -> {
+                if (builder instanceof IndexRequestBuilder) {
+                    this.bulkProcessor.add(((IndexRequestBuilder)builder).request());
+                }
+                if (builder instanceof UpdateRequestBuilder) {
+                    this.bulkProcessor.add(((UpdateRequestBuilder)builder).request());
+                }
+            });
+        }
+    }
+
+    private BulkProcessor createBulkProcessor() {
+        ElasticSearchClient elasticSearchClient = getClient();
+
+        Client client;
+        try {
+            Field field = elasticSearchClient.getClass().getDeclaredField("client");
+            field.setAccessible(true);
+            client = (Client)field.get(elasticSearchClient);
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            throw new UnexpectedException(e.getMessage());
+        }
+
+        return BulkProcessor.builder(
+            client,
+            new BulkProcessor.Listener() {
+                @Override
+                public void beforeBulk(long executionId,
+                    BulkRequest request) {
+                }
+
+                @Override
+                public void afterBulk(long executionId,
+                    BulkRequest request,
+                    BulkResponse response) {
+                }
+
+                @Override
+                public void afterBulk(long executionId,
+                    BulkRequest request,
+                    Throwable failure) {
+                    logger.error("{} data bulk failed, reason: {}", request.numberOfActions(), failure);
+                }
+            })
+            .setBulkActions(bulkActions)
+            .setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB))
+            .setFlushInterval(TimeValue.timeValueSeconds(flushInterval))
+            .setConcurrentRequests(concurrentRequests)
+            .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
+            .build();
+    }
+}
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/GlobalTraceEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/GlobalTraceEsPersistenceDAO.java
index 1ac1490..4dc8428 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/GlobalTraceEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/GlobalTraceEsPersistenceDAO.java
@@ -18,7 +18,8 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao;
 
-import java.util.*;
+import java.io.IOException;
+import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
 import org.apache.skywalking.apm.collector.storage.dao.IGlobalTracePersistenceDAO;
@@ -26,6 +27,7 @@ import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersisten
 import org.apache.skywalking.apm.collector.storage.table.global.*;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
 
 /**
  * @author peng-yongsheng
@@ -48,12 +50,13 @@ public class GlobalTraceEsPersistenceDAO extends AbstractPersistenceEsDAO<Global
         return globalTrace;
     }
 
-    @Override protected Map<String, Object> esStreamDataToEsData(GlobalTrace streamData) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(GlobalTraceTable.SEGMENT_ID.getName(), streamData.getSegmentId());
-        target.put(GlobalTraceTable.TRACE_ID.getName(), streamData.getTraceId());
-        target.put(GlobalTraceTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
-        return target;
+    @Override protected XContentBuilder esStreamDataToEsData(GlobalTrace streamData) throws IOException {
+        return XContentFactory.jsonBuilder()
+            .startObject()
+            .field(GlobalTraceTable.SEGMENT_ID.getName(), streamData.getSegmentId())
+            .field(GlobalTraceTable.TRACE_ID.getName(), streamData.getTraceId())
+            .field(GlobalTraceTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+            .endObject();
     }
 
     @Override protected String timeBucketColumnNameForDelete() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/InstanceHeartBeatEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/InstanceHeartBeatEsPersistenceDAO.java
index f7ee828..83f3452 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/InstanceHeartBeatEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/InstanceHeartBeatEsPersistenceDAO.java
@@ -18,7 +18,8 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao;
 
-import java.util.*;
+import java.io.IOException;
+import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.UnexpectedException;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
@@ -28,6 +29,7 @@ import org.apache.skywalking.apm.collector.storage.table.register.*;
 import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
 import org.slf4j.*;
 
 /**
@@ -63,9 +65,11 @@ public class InstanceHeartBeatEsPersistenceDAO extends EsDAO implements IInstanc
         throw new UnexpectedException("Received an instance heart beat message under instance id= " + data.getId() + " , which doesn't exist.");
     }
 
-    @Override public UpdateRequestBuilder prepareBatchUpdate(Instance data) {
-        Map<String, Object> source = new HashMap<>();
-        source.put(InstanceTable.HEARTBEAT_TIME.getName(), data.getHeartBeatTime());
+    @Override public UpdateRequestBuilder prepareBatchUpdate(Instance data) throws IOException {
+        XContentBuilder source = XContentFactory.jsonBuilder().startObject()
+            .field(InstanceTable.HEARTBEAT_TIME.getName(), data.getHeartBeatTime())
+            .endObject();
+        
         return getClient().prepareUpdate(InstanceTable.TABLE, data.getId()).setDoc(source);
     }
 
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentDurationEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentDurationEsPersistenceDAO.java
index 24a739d..9814964 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentDurationEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentDurationEsPersistenceDAO.java
@@ -18,14 +18,14 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao;
 
-import com.google.gson.Gson;
-import java.util.*;
+import java.io.IOException;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.storage.dao.ISegmentDurationPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO;
 import org.apache.skywalking.apm.collector.storage.table.segment.*;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.reindex.BulkByScrollResponse;
 import org.slf4j.*;
@@ -35,9 +35,7 @@ import org.slf4j.*;
  */
 public class SegmentDurationEsPersistenceDAO extends EsDAO implements ISegmentDurationPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, SegmentDuration> {
 
-    private final Logger logger = LoggerFactory.getLogger(SegmentDurationEsPersistenceDAO.class);
-
-    private final Gson gson = new Gson();
+    private static final Logger logger = LoggerFactory.getLogger(SegmentDurationEsPersistenceDAO.class);
 
     public SegmentDurationEsPersistenceDAO(ElasticSearchClient client) {
         super(client);
@@ -54,16 +52,18 @@ public class SegmentDurationEsPersistenceDAO extends EsDAO implements ISegmentDu
     }
 
     @Override
-    public IndexRequestBuilder prepareBatchInsert(SegmentDuration data) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(SegmentDurationTable.SEGMENT_ID.getName(), data.getSegmentId());
-        target.put(SegmentDurationTable.APPLICATION_ID.getName(), data.getApplicationId());
-        target.put(SegmentDurationTable.SERVICE_NAME.getName(), gson.toJson(data.getServiceName()));
-        target.put(SegmentDurationTable.DURATION.getName(), data.getDuration());
-        target.put(SegmentDurationTable.START_TIME.getName(), data.getStartTime());
-        target.put(SegmentDurationTable.END_TIME.getName(), data.getEndTime());
-        target.put(SegmentDurationTable.IS_ERROR.getName(), data.getIsError());
-        target.put(SegmentDurationTable.TIME_BUCKET.getName(), data.getTimeBucket());
+    public IndexRequestBuilder prepareBatchInsert(SegmentDuration data) throws IOException {
+        XContentBuilder target = XContentFactory.jsonBuilder().startObject()
+            .field(SegmentDurationTable.SEGMENT_ID.getName(), data.getSegmentId())
+            .field(SegmentDurationTable.APPLICATION_ID.getName(), data.getApplicationId())
+            .array(SegmentDurationTable.SERVICE_NAME.getName(), data.getServiceName())
+            .field(SegmentDurationTable.DURATION.getName(), data.getDuration())
+            .field(SegmentDurationTable.START_TIME.getName(), data.getStartTime())
+            .field(SegmentDurationTable.END_TIME.getName(), data.getEndTime())
+            .field(SegmentDurationTable.IS_ERROR.getName(), data.getIsError())
+            .field(SegmentDurationTable.TIME_BUCKET.getName(), data.getTimeBucket())
+            .endObject();
+
         return getClient().prepareIndex(SegmentDurationTable.TABLE, data.getId()).setSource(target);
     }
 
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentEsPersistenceDAO.java
index 00dcdc3..6662607 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentEsPersistenceDAO.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao;
 
+import java.io.IOException;
 import java.util.*;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
@@ -26,6 +27,7 @@ import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersisten
 import org.apache.skywalking.apm.collector.storage.table.segment.*;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
 
 /**
  * @author peng-yongsheng
@@ -47,11 +49,11 @@ public class SegmentEsPersistenceDAO extends AbstractPersistenceEsDAO<Segment> i
         return segment;
     }
 
-    @Override protected Map<String, Object> esStreamDataToEsData(Segment streamData) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(SegmentTable.DATA_BINARY.getName(), new String(Base64.getEncoder().encode(streamData.getDataBinary())));
-        target.put(SegmentTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
-        return target;
+    @Override protected XContentBuilder esStreamDataToEsData(Segment streamData) throws IOException {
+        return XContentFactory.jsonBuilder().startObject()
+            .field(SegmentTable.DATA_BINARY.getName(), new String(Base64.getEncoder().encode(streamData.getDataBinary())))
+            .field(SegmentTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+            .endObject();
     }
 
     @Override protected String timeBucketColumnNameForDelete() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ServiceNameHeartBeatEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ServiceNameHeartBeatEsPersistenceDAO.java
index 1c9b826..c3d0525 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ServiceNameHeartBeatEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ServiceNameHeartBeatEsPersistenceDAO.java
@@ -18,7 +18,8 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao;
 
-import java.util.*;
+import java.io.IOException;
+import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.UnexpectedException;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
@@ -28,6 +29,7 @@ import org.apache.skywalking.apm.collector.storage.table.register.*;
 import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
 import org.slf4j.*;
 
 /**
@@ -43,18 +45,24 @@ public class ServiceNameHeartBeatEsPersistenceDAO extends EsDAO implements IServ
 
     @GraphComputingMetric(name = "/persistence/get/" + ServiceNameTable.TABLE + "/heartbeat")
     @Override public ServiceName get(String id) {
-        GetResponse getResponse = getClient().prepareGet(ServiceNameTable.TABLE, id).get();
+        String[] includeSources = {ServiceNameTable.HEARTBEAT_TIME.getName()};
+        GetResponse getResponse = getClient().prepareGet(ServiceNameTable.TABLE, id).setFetchSource(includeSources, null).get();
         if (getResponse.isExists()) {
             Map<String, Object> source = getResponse.getSource();
 
             ServiceName serviceName = new ServiceName();
             serviceName.setId(id);
-            serviceName.setServiceId(((Number)source.get(ServiceNameTable.SERVICE_ID.getName())).intValue());
+            serviceName.setServiceId(Integer.valueOf(id));
             serviceName.setHeartBeatTime(((Number)source.get(ServiceNameTable.HEARTBEAT_TIME.getName())).longValue());
-            logger.debug("service id: {} is exists", id);
+
+            if (logger.isDebugEnabled()) {
+                logger.debug("service id: {} is exists", id);
+            }
             return serviceName;
         } else {
-            logger.debug("service id: {} is not exists", id);
+            if (logger.isDebugEnabled()) {
+                logger.debug("service id: {} is not exists", id);
+            }
             return null;
         }
     }
@@ -63,11 +71,15 @@ public class ServiceNameHeartBeatEsPersistenceDAO extends EsDAO implements IServ
         throw new UnexpectedException("Received an service name heart beat message under service id= " + data.getId() + " , which doesn't exist.");
     }
 
-    @Override public UpdateRequestBuilder prepareBatchUpdate(ServiceName data) {
-        logger.info("service name heart beat, service id: {}, heart beat time: {}", data.getId(), data.getHeartBeatTime());
+    @Override public UpdateRequestBuilder prepareBatchUpdate(ServiceName data) throws IOException {
+        if (logger.isDebugEnabled()) {
+            logger.debug("service name heart beat, service id: {}, heart beat time: {}", data.getId(), data.getHeartBeatTime());
+        }
+
+        XContentBuilder source = XContentFactory.jsonBuilder().startObject()
+            .field(ServiceNameTable.HEARTBEAT_TIME.getName(), data.getHeartBeatTime())
+            .endObject();
 
-        Map<String, Object> source = new HashMap<>();
-        source.put(ServiceNameTable.HEARTBEAT_TIME.getName(), data.getHeartBeatTime());
         return getClient().prepareUpdate(ServiceNameTable.TABLE, data.getId()).setDoc(source);
     }
 
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/acp/AbstractApplicationComponentEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/acp/AbstractApplicationComponentEsPersistenceDAO.java
index 42377e8..45fa5f9 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/acp/AbstractApplicationComponentEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/acp/AbstractApplicationComponentEsPersistenceDAO.java
@@ -18,13 +18,13 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.acp;
 
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.application.ApplicationComponent;
-import org.apache.skywalking.apm.collector.storage.table.application.ApplicationComponentTable;
+import org.apache.skywalking.apm.collector.storage.table.application.*;
+import org.elasticsearch.common.xcontent.*;
 
 /**
  * @author peng-yongsheng
@@ -49,15 +49,14 @@ public abstract class AbstractApplicationComponentEsPersistenceDAO extends Abstr
         return applicationComponent;
     }
 
-    @Override protected final Map<String, Object> esStreamDataToEsData(ApplicationComponent streamData) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(ApplicationComponentTable.METRIC_ID.getName(), streamData.getMetricId());
+    @Override protected final XContentBuilder esStreamDataToEsData(ApplicationComponent streamData) throws IOException {
+        return XContentFactory.jsonBuilder().startObject()
+            .field(ApplicationComponentTable.METRIC_ID.getName(), streamData.getMetricId())
 
-        target.put(ApplicationComponentTable.COMPONENT_ID.getName(), streamData.getComponentId());
-        target.put(ApplicationComponentTable.APPLICATION_ID.getName(), streamData.getApplicationId());
-        target.put(ApplicationComponentTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
-
-        return target;
+            .field(ApplicationComponentTable.COMPONENT_ID.getName(), streamData.getComponentId())
+            .field(ApplicationComponentTable.APPLICATION_ID.getName(), streamData.getApplicationId())
+            .field(ApplicationComponentTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+            .endObject();
     }
 
     @GraphComputingMetric(name = "/persistence/get/" + ApplicationComponentTable.TABLE)
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/AbstractApplicationAlarmListEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/AbstractApplicationAlarmListEsPersistenceDAO.java
index 047a786..958f7d7 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/AbstractApplicationAlarmListEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/AbstractApplicationAlarmListEsPersistenceDAO.java
@@ -18,13 +18,13 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
 
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ApplicationAlarmList;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ApplicationAlarmListTable;
+import org.apache.skywalking.apm.collector.storage.table.alarm.*;
+import org.elasticsearch.common.xcontent.*;
 
 /**
  * @author peng-yongsheng
@@ -52,17 +52,17 @@ public abstract class AbstractApplicationAlarmListEsPersistenceDAO extends Abstr
         return applicationAlarmList;
     }
 
-    @Override protected final Map<String, Object> esStreamDataToEsData(ApplicationAlarmList streamData) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(ApplicationAlarmListTable.METRIC_ID.getName(), streamData.getMetricId());
-        target.put(ApplicationAlarmListTable.APPLICATION_ID.getName(), streamData.getApplicationId());
-        target.put(ApplicationAlarmListTable.SOURCE_VALUE.getName(), streamData.getSourceValue());
+    @Override protected final XContentBuilder esStreamDataToEsData(ApplicationAlarmList streamData) throws IOException {
+        return XContentFactory.jsonBuilder().startObject()
+            .field(ApplicationAlarmListTable.METRIC_ID.getName(), streamData.getMetricId())
+            .field(ApplicationAlarmListTable.APPLICATION_ID.getName(), streamData.getApplicationId())
+            .field(ApplicationAlarmListTable.SOURCE_VALUE.getName(), streamData.getSourceValue())
 
-        target.put(ApplicationAlarmListTable.ALARM_TYPE.getName(), streamData.getAlarmType());
-        target.put(ApplicationAlarmListTable.ALARM_CONTENT.getName(), streamData.getAlarmContent());
+            .field(ApplicationAlarmListTable.ALARM_TYPE.getName(), streamData.getAlarmType())
+            .field(ApplicationAlarmListTable.ALARM_CONTENT.getName(), streamData.getAlarmContent())
 
-        target.put(ApplicationAlarmListTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
-        return target;
+            .field(ApplicationAlarmListTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+            .endObject();
     }
 
     @GraphComputingMetric(name = "/persistence/get/" + ApplicationAlarmListTable.TABLE)
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationAlarmEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationAlarmEsPersistenceDAO.java
index 6b7e3d1..15a4b32 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationAlarmEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationAlarmEsPersistenceDAO.java
@@ -18,16 +18,16 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
 
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
 import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationAlarmPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ApplicationAlarm;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ApplicationAlarmTable;
+import org.apache.skywalking.apm.collector.storage.table.alarm.*;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
 
 /**
  * @author peng-yongsheng
@@ -54,16 +54,16 @@ public class ApplicationAlarmEsPersistenceDAO extends AbstractPersistenceEsDAO<A
         return instanceAlarm;
     }
 
-    @Override protected Map<String, Object> esStreamDataToEsData(ApplicationAlarm streamData) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(ApplicationAlarmTable.APPLICATION_ID.getName(), streamData.getApplicationId());
-        target.put(ApplicationAlarmTable.SOURCE_VALUE.getName(), streamData.getSourceValue());
+    @Override protected XContentBuilder esStreamDataToEsData(ApplicationAlarm streamData) throws IOException {
+        return XContentFactory.jsonBuilder().startObject()
+            .field(ApplicationAlarmTable.APPLICATION_ID.getName(), streamData.getApplicationId())
+            .field(ApplicationAlarmTable.SOURCE_VALUE.getName(), streamData.getSourceValue())
 
-        target.put(ApplicationAlarmTable.ALARM_TYPE.getName(), streamData.getAlarmType());
-        target.put(ApplicationAlarmTable.ALARM_CONTENT.getName(), streamData.getAlarmContent());
+            .field(ApplicationAlarmTable.ALARM_TYPE.getName(), streamData.getAlarmType())
+            .field(ApplicationAlarmTable.ALARM_CONTENT.getName(), streamData.getAlarmContent())
 
-        target.put(ApplicationAlarmTable.LAST_TIME_BUCKET.getName(), streamData.getLastTimeBucket());
-        return target;
+            .field(ApplicationAlarmTable.LAST_TIME_BUCKET.getName(), streamData.getLastTimeBucket())
+            .endObject();
     }
 
     @Override protected String timeBucketColumnNameForDelete() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmEsPersistenceDAO.java
index fd5069e..a723f71 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmEsPersistenceDAO.java
@@ -18,16 +18,16 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
 
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
 import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationReferenceAlarmPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ApplicationReferenceAlarm;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ApplicationReferenceAlarmTable;
+import org.apache.skywalking.apm.collector.storage.table.alarm.*;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
 
 /**
  * @author peng-yongsheng
@@ -55,17 +55,17 @@ public class ApplicationReferenceAlarmEsPersistenceDAO extends AbstractPersisten
         return applicationReferenceAlarm;
     }
 
-    @Override protected Map<String, Object> esStreamDataToEsData(ApplicationReferenceAlarm streamData) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(ApplicationReferenceAlarmTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId());
-        target.put(ApplicationReferenceAlarmTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId());
-        target.put(ApplicationReferenceAlarmTable.SOURCE_VALUE.getName(), streamData.getSourceValue());
+    @Override protected XContentBuilder esStreamDataToEsData(ApplicationReferenceAlarm streamData) throws IOException {
+        return XContentFactory.jsonBuilder().startObject()
+            .field(ApplicationReferenceAlarmTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId())
+            .field(ApplicationReferenceAlarmTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId())
+            .field(ApplicationReferenceAlarmTable.SOURCE_VALUE.getName(), streamData.getSourceValue())
 
-        target.put(ApplicationReferenceAlarmTable.ALARM_TYPE.getName(), streamData.getAlarmType());
-        target.put(ApplicationReferenceAlarmTable.ALARM_CONTENT.getName(), streamData.getAlarmContent());
+            .field(ApplicationReferenceAlarmTable.ALARM_TYPE.getName(), streamData.getAlarmType())
+            .field(ApplicationReferenceAlarmTable.ALARM_CONTENT.getName(), streamData.getAlarmContent())
 
-        target.put(ApplicationReferenceAlarmTable.LAST_TIME_BUCKET.getName(), streamData.getLastTimeBucket());
-        return target;
+            .field(ApplicationReferenceAlarmTable.LAST_TIME_BUCKET.getName(), streamData.getLastTimeBucket())
+            .endObject();
     }
 
     @Override protected String timeBucketColumnNameForDelete() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmListEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmListEsPersistenceDAO.java
index e988a1b..8945091 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmListEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ApplicationReferenceAlarmListEsPersistenceDAO.java
@@ -18,16 +18,16 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
 
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
 import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationReferenceAlarmListPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ApplicationReferenceAlarmList;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ApplicationReferenceAlarmListTable;
+import org.apache.skywalking.apm.collector.storage.table.alarm.*;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
 
 /**
  * @author peng-yongsheng
@@ -55,17 +55,18 @@ public class ApplicationReferenceAlarmListEsPersistenceDAO extends AbstractPersi
         return applicationReferenceAlarmList;
     }
 
-    @Override protected Map<String, Object> esStreamDataToEsData(ApplicationReferenceAlarmList streamData) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(ApplicationReferenceAlarmListTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId());
-        target.put(ApplicationReferenceAlarmListTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId());
-        target.put(ApplicationReferenceAlarmListTable.SOURCE_VALUE.getName(), streamData.getSourceValue());
+    @Override
+    protected XContentBuilder esStreamDataToEsData(ApplicationReferenceAlarmList streamData) throws IOException {
+        return XContentFactory.jsonBuilder().startObject()
+            .field(ApplicationReferenceAlarmListTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId())
+            .field(ApplicationReferenceAlarmListTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId())
+            .field(ApplicationReferenceAlarmListTable.SOURCE_VALUE.getName(), streamData.getSourceValue())
 
-        target.put(ApplicationReferenceAlarmListTable.ALARM_TYPE.getName(), streamData.getAlarmType());
-        target.put(ApplicationReferenceAlarmListTable.ALARM_CONTENT.getName(), streamData.getAlarmContent());
+            .field(ApplicationReferenceAlarmListTable.ALARM_TYPE.getName(), streamData.getAlarmType())
+            .field(ApplicationReferenceAlarmListTable.ALARM_CONTENT.getName(), streamData.getAlarmContent())
 
-        target.put(ApplicationReferenceAlarmListTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
-        return target;
+            .field(ApplicationReferenceAlarmListTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+            .endObject();
     }
 
     @Override protected String timeBucketColumnNameForDelete() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmEsPersistenceDAO.java
index 3d53fc5..c19f528 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmEsPersistenceDAO.java
@@ -18,16 +18,16 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
 
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
 import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceAlarmPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.alarm.InstanceAlarm;
-import org.apache.skywalking.apm.collector.storage.table.alarm.InstanceAlarmTable;
+import org.apache.skywalking.apm.collector.storage.table.alarm.*;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
 
 /**
  * @author peng-yongsheng
@@ -55,17 +55,17 @@ public class InstanceAlarmEsPersistenceDAO extends AbstractPersistenceEsDAO<Inst
         return instanceAlarm;
     }
 
-    @Override protected Map<String, Object> esStreamDataToEsData(InstanceAlarm streamData) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(InstanceAlarmTable.APPLICATION_ID.getName(), streamData.getApplicationId());
-        target.put(InstanceAlarmTable.INSTANCE_ID.getName(), streamData.getInstanceId());
-        target.put(InstanceAlarmTable.SOURCE_VALUE.getName(), streamData.getSourceValue());
+    @Override protected XContentBuilder esStreamDataToEsData(InstanceAlarm streamData) throws IOException {
+        return XContentFactory.jsonBuilder().startObject()
+            .field(InstanceAlarmTable.APPLICATION_ID.getName(), streamData.getApplicationId())
+            .field(InstanceAlarmTable.INSTANCE_ID.getName(), streamData.getInstanceId())
+            .field(InstanceAlarmTable.SOURCE_VALUE.getName(), streamData.getSourceValue())
 
-        target.put(InstanceAlarmTable.ALARM_TYPE.getName(), streamData.getAlarmType());
-        target.put(InstanceAlarmTable.ALARM_CONTENT.getName(), streamData.getAlarmContent());
+            .field(InstanceAlarmTable.ALARM_TYPE.getName(), streamData.getAlarmType())
+            .field(InstanceAlarmTable.ALARM_CONTENT.getName(), streamData.getAlarmContent())
 
-        target.put(InstanceAlarmTable.LAST_TIME_BUCKET.getName(), streamData.getLastTimeBucket());
-        return target;
+            .field(InstanceAlarmTable.LAST_TIME_BUCKET.getName(), streamData.getLastTimeBucket())
+            .endObject();
     }
 
     @Override protected String timeBucketColumnNameForDelete() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmListEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmListEsPersistenceDAO.java
index 96fef53..5f5ffa7 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmListEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceAlarmListEsPersistenceDAO.java
@@ -18,16 +18,16 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
 
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
 import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceAlarmListPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.alarm.InstanceAlarmList;
-import org.apache.skywalking.apm.collector.storage.table.alarm.InstanceAlarmListTable;
+import org.apache.skywalking.apm.collector.storage.table.alarm.*;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
 
 /**
  * @author peng-yongsheng
@@ -55,17 +55,17 @@ public class InstanceAlarmListEsPersistenceDAO extends AbstractPersistenceEsDAO<
         return instanceAlarmList;
     }
 
-    @Override protected Map<String, Object> esStreamDataToEsData(InstanceAlarmList streamData) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(InstanceAlarmListTable.APPLICATION_ID.getName(), streamData.getApplicationId());
-        target.put(InstanceAlarmListTable.INSTANCE_ID.getName(), streamData.getInstanceId());
-        target.put(InstanceAlarmListTable.SOURCE_VALUE.getName(), streamData.getSourceValue());
+    @Override protected XContentBuilder esStreamDataToEsData(InstanceAlarmList streamData) throws IOException {
+        return XContentFactory.jsonBuilder().startObject()
+            .field(InstanceAlarmListTable.APPLICATION_ID.getName(), streamData.getApplicationId())
+            .field(InstanceAlarmListTable.INSTANCE_ID.getName(), streamData.getInstanceId())
+            .field(InstanceAlarmListTable.SOURCE_VALUE.getName(), streamData.getSourceValue())
 
-        target.put(InstanceAlarmListTable.ALARM_TYPE.getName(), streamData.getAlarmType());
-        target.put(InstanceAlarmListTable.ALARM_CONTENT.getName(), streamData.getAlarmContent());
+            .field(InstanceAlarmListTable.ALARM_TYPE.getName(), streamData.getAlarmType())
+            .field(InstanceAlarmListTable.ALARM_CONTENT.getName(), streamData.getAlarmContent())
 
-        target.put(InstanceAlarmListTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
-        return target;
+            .field(InstanceAlarmListTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+            .endObject();
     }
 
     @Override protected String timeBucketColumnNameForDelete() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmEsPersistenceDAO.java
index fdda6ea..7c76791 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmEsPersistenceDAO.java
@@ -18,16 +18,16 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
 
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
 import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceReferenceAlarmPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.alarm.InstanceReferenceAlarm;
-import org.apache.skywalking.apm.collector.storage.table.alarm.InstanceReferenceAlarmTable;
+import org.apache.skywalking.apm.collector.storage.table.alarm.*;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
 
 /**
  * @author peng-yongsheng
@@ -57,19 +57,19 @@ public class InstanceReferenceAlarmEsPersistenceDAO extends AbstractPersistenceE
         return instanceReferenceAlarm;
     }
 
-    @Override protected Map<String, Object> esStreamDataToEsData(InstanceReferenceAlarm streamData) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(InstanceReferenceAlarmTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId());
-        target.put(InstanceReferenceAlarmTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId());
-        target.put(InstanceReferenceAlarmTable.FRONT_INSTANCE_ID.getName(), streamData.getFrontInstanceId());
-        target.put(InstanceReferenceAlarmTable.BEHIND_INSTANCE_ID.getName(), streamData.getBehindInstanceId());
-        target.put(InstanceReferenceAlarmTable.SOURCE_VALUE.getName(), streamData.getSourceValue());
+    @Override protected XContentBuilder esStreamDataToEsData(InstanceReferenceAlarm streamData) throws IOException {
+        return XContentFactory.jsonBuilder().startObject()
+            .field(InstanceReferenceAlarmTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId())
+            .field(InstanceReferenceAlarmTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId())
+            .field(InstanceReferenceAlarmTable.FRONT_INSTANCE_ID.getName(), streamData.getFrontInstanceId())
+            .field(InstanceReferenceAlarmTable.BEHIND_INSTANCE_ID.getName(), streamData.getBehindInstanceId())
+            .field(InstanceReferenceAlarmTable.SOURCE_VALUE.getName(), streamData.getSourceValue())
 
-        target.put(InstanceReferenceAlarmTable.ALARM_TYPE.getName(), streamData.getAlarmType());
-        target.put(InstanceReferenceAlarmTable.ALARM_CONTENT.getName(), streamData.getAlarmContent());
+            .field(InstanceReferenceAlarmTable.ALARM_TYPE.getName(), streamData.getAlarmType())
+            .field(InstanceReferenceAlarmTable.ALARM_CONTENT.getName(), streamData.getAlarmContent())
 
-        target.put(InstanceReferenceAlarmTable.LAST_TIME_BUCKET.getName(), streamData.getLastTimeBucket());
-        return target;
+            .field(InstanceReferenceAlarmTable.LAST_TIME_BUCKET.getName(), streamData.getLastTimeBucket())
+            .endObject();
     }
 
     @Override protected String timeBucketColumnNameForDelete() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmListEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmListEsPersistenceDAO.java
index d695cc5..f90943a 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmListEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/InstanceReferenceAlarmListEsPersistenceDAO.java
@@ -18,16 +18,16 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
 
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
 import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceReferenceAlarmListPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.alarm.InstanceReferenceAlarmList;
-import org.apache.skywalking.apm.collector.storage.table.alarm.InstanceReferenceAlarmListTable;
+import org.apache.skywalking.apm.collector.storage.table.alarm.*;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
 
 /**
  * @author peng-yongsheng
@@ -57,19 +57,19 @@ public class InstanceReferenceAlarmListEsPersistenceDAO extends AbstractPersiste
         return serviceReferenceAlarmList;
     }
 
-    @Override protected Map<String, Object> esStreamDataToEsData(InstanceReferenceAlarmList streamData) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(InstanceReferenceAlarmListTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId());
-        target.put(InstanceReferenceAlarmListTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId());
-        target.put(InstanceReferenceAlarmListTable.FRONT_INSTANCE_ID.getName(), streamData.getFrontInstanceId());
-        target.put(InstanceReferenceAlarmListTable.BEHIND_INSTANCE_ID.getName(), streamData.getBehindInstanceId());
-        target.put(InstanceReferenceAlarmListTable.SOURCE_VALUE.getName(), streamData.getSourceValue());
+    @Override protected XContentBuilder esStreamDataToEsData(InstanceReferenceAlarmList streamData) throws IOException {
+        return XContentFactory.jsonBuilder().startObject()
+            .field(InstanceReferenceAlarmListTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId())
+            .field(InstanceReferenceAlarmListTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId())
+            .field(InstanceReferenceAlarmListTable.FRONT_INSTANCE_ID.getName(), streamData.getFrontInstanceId())
+            .field(InstanceReferenceAlarmListTable.BEHIND_INSTANCE_ID.getName(), streamData.getBehindInstanceId())
+            .field(InstanceReferenceAlarmListTable.SOURCE_VALUE.getName(), streamData.getSourceValue())
 
-        target.put(InstanceReferenceAlarmListTable.ALARM_TYPE.getName(), streamData.getAlarmType());
-        target.put(InstanceReferenceAlarmListTable.ALARM_CONTENT.getName(), streamData.getAlarmContent());
+            .field(InstanceReferenceAlarmListTable.ALARM_TYPE.getName(), streamData.getAlarmType())
+            .field(InstanceReferenceAlarmListTable.ALARM_CONTENT.getName(), streamData.getAlarmContent())
 
-        target.put(InstanceReferenceAlarmListTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
-        return target;
+            .field(InstanceReferenceAlarmListTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+            .endObject();
     }
 
     @Override protected String timeBucketColumnNameForDelete() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmEsPersistenceDAO.java
index a16d2fa..8a330ee 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmEsPersistenceDAO.java
@@ -18,16 +18,16 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
 
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
 import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceAlarmPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ServiceAlarm;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ServiceAlarmTable;
+import org.apache.skywalking.apm.collector.storage.table.alarm.*;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
 
 /**
  * @author peng-yongsheng
@@ -56,18 +56,18 @@ public class ServiceAlarmEsPersistenceDAO extends AbstractPersistenceEsDAO<Servi
         return serviceAlarm;
     }
 
-    @Override protected Map<String, Object> esStreamDataToEsData(ServiceAlarm streamData) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(ServiceAlarmTable.APPLICATION_ID.getName(), streamData.getApplicationId());
-        target.put(ServiceAlarmTable.INSTANCE_ID.getName(), streamData.getInstanceId());
-        target.put(ServiceAlarmTable.SERVICE_ID.getName(), streamData.getServiceId());
-        target.put(ServiceAlarmTable.SOURCE_VALUE.getName(), streamData.getSourceValue());
+    @Override protected XContentBuilder esStreamDataToEsData(ServiceAlarm streamData) throws IOException {
+        return XContentFactory.jsonBuilder().startObject()
+            .field(ServiceAlarmTable.APPLICATION_ID.getName(), streamData.getApplicationId())
+            .field(ServiceAlarmTable.INSTANCE_ID.getName(), streamData.getInstanceId())
+            .field(ServiceAlarmTable.SERVICE_ID.getName(), streamData.getServiceId())
+            .field(ServiceAlarmTable.SOURCE_VALUE.getName(), streamData.getSourceValue())
 
-        target.put(ServiceAlarmTable.ALARM_TYPE.getName(), streamData.getAlarmType());
-        target.put(ServiceAlarmTable.ALARM_CONTENT.getName(), streamData.getAlarmContent());
+            .field(ServiceAlarmTable.ALARM_TYPE.getName(), streamData.getAlarmType())
+            .field(ServiceAlarmTable.ALARM_CONTENT.getName(), streamData.getAlarmContent())
 
-        target.put(ServiceAlarmTable.LAST_TIME_BUCKET.getName(), streamData.getLastTimeBucket());
-        return target;
+            .field(ServiceAlarmTable.LAST_TIME_BUCKET.getName(), streamData.getLastTimeBucket())
+            .endObject();
     }
 
     @Override protected String timeBucketColumnNameForDelete() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmListEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmListEsPersistenceDAO.java
index 39e4b2e..33078a9 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmListEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceAlarmListEsPersistenceDAO.java
@@ -18,16 +18,16 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
 
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
 import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceAlarmListPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ServiceAlarmList;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ServiceAlarmListTable;
+import org.apache.skywalking.apm.collector.storage.table.alarm.*;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
 
 /**
  * @author peng-yongsheng
@@ -56,18 +56,18 @@ public class ServiceAlarmListEsPersistenceDAO extends AbstractPersistenceEsDAO<S
         return serviceAlarmList;
     }
 
-    @Override protected Map<String, Object> esStreamDataToEsData(ServiceAlarmList streamData) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(ServiceAlarmListTable.APPLICATION_ID.getName(), streamData.getApplicationId());
-        target.put(ServiceAlarmListTable.INSTANCE_ID.getName(), streamData.getInstanceId());
-        target.put(ServiceAlarmListTable.SERVICE_ID.getName(), streamData.getServiceId());
-        target.put(ServiceAlarmListTable.SOURCE_VALUE.getName(), streamData.getSourceValue());
+    @Override protected XContentBuilder esStreamDataToEsData(ServiceAlarmList streamData) throws IOException {
+        return XContentFactory.jsonBuilder().startObject()
+            .field(ServiceAlarmListTable.APPLICATION_ID.getName(), streamData.getApplicationId())
+            .field(ServiceAlarmListTable.INSTANCE_ID.getName(), streamData.getInstanceId())
+            .field(ServiceAlarmListTable.SERVICE_ID.getName(), streamData.getServiceId())
+            .field(ServiceAlarmListTable.SOURCE_VALUE.getName(), streamData.getSourceValue())
 
-        target.put(ServiceAlarmListTable.ALARM_TYPE.getName(), streamData.getAlarmType());
-        target.put(ServiceAlarmListTable.ALARM_CONTENT.getName(), streamData.getAlarmContent());
+            .field(ServiceAlarmListTable.ALARM_TYPE.getName(), streamData.getAlarmType())
+            .field(ServiceAlarmListTable.ALARM_CONTENT.getName(), streamData.getAlarmContent())
 
-        target.put(ServiceAlarmListTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
-        return target;
+            .field(ServiceAlarmListTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+            .endObject();
     }
 
     @Override protected String timeBucketColumnNameForDelete() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmEsPersistenceDAO.java
index 407258a..25f819d 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmEsPersistenceDAO.java
@@ -18,16 +18,16 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
 
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
 import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceReferenceAlarmPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ServiceReferenceAlarm;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ServiceReferenceAlarmTable;
+import org.apache.skywalking.apm.collector.storage.table.alarm.*;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
 
 /**
  * @author peng-yongsheng
@@ -59,21 +59,21 @@ public class ServiceReferenceAlarmEsPersistenceDAO extends AbstractPersistenceEs
         return serviceReferenceAlarm;
     }
 
-    @Override protected Map<String, Object> esStreamDataToEsData(ServiceReferenceAlarm streamData) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(ServiceReferenceAlarmTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId());
-        target.put(ServiceReferenceAlarmTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId());
-        target.put(ServiceReferenceAlarmTable.FRONT_INSTANCE_ID.getName(), streamData.getFrontInstanceId());
-        target.put(ServiceReferenceAlarmTable.BEHIND_INSTANCE_ID.getName(), streamData.getBehindInstanceId());
-        target.put(ServiceReferenceAlarmTable.FRONT_SERVICE_ID.getName(), streamData.getFrontServiceId());
-        target.put(ServiceReferenceAlarmTable.BEHIND_SERVICE_ID.getName(), streamData.getBehindServiceId());
-        target.put(ServiceReferenceAlarmTable.SOURCE_VALUE.getName(), streamData.getSourceValue());
+    @Override protected XContentBuilder esStreamDataToEsData(ServiceReferenceAlarm streamData) throws IOException {
+        return XContentFactory.jsonBuilder().startObject()
+            .field(ServiceReferenceAlarmTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId())
+            .field(ServiceReferenceAlarmTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId())
+            .field(ServiceReferenceAlarmTable.FRONT_INSTANCE_ID.getName(), streamData.getFrontInstanceId())
+            .field(ServiceReferenceAlarmTable.BEHIND_INSTANCE_ID.getName(), streamData.getBehindInstanceId())
+            .field(ServiceReferenceAlarmTable.FRONT_SERVICE_ID.getName(), streamData.getFrontServiceId())
+            .field(ServiceReferenceAlarmTable.BEHIND_SERVICE_ID.getName(), streamData.getBehindServiceId())
+            .field(ServiceReferenceAlarmTable.SOURCE_VALUE.getName(), streamData.getSourceValue())
 
-        target.put(ServiceReferenceAlarmTable.ALARM_TYPE.getName(), streamData.getAlarmType());
-        target.put(ServiceReferenceAlarmTable.ALARM_CONTENT.getName(), streamData.getAlarmContent());
+            .field(ServiceReferenceAlarmTable.ALARM_TYPE.getName(), streamData.getAlarmType())
+            .field(ServiceReferenceAlarmTable.ALARM_CONTENT.getName(), streamData.getAlarmContent())
 
-        target.put(ServiceReferenceAlarmTable.LAST_TIME_BUCKET.getName(), streamData.getLastTimeBucket());
-        return target;
+            .field(ServiceReferenceAlarmTable.LAST_TIME_BUCKET.getName(), streamData.getLastTimeBucket())
+            .endObject();
     }
 
     @Override protected String timeBucketColumnNameForDelete() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmListEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmListEsPersistenceDAO.java
index ade36d3..d1de511 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmListEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/alarm/ServiceReferenceAlarmListEsPersistenceDAO.java
@@ -18,16 +18,16 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.alarm;
 
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
 import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceReferenceAlarmListPersistenceDAO;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ServiceReferenceAlarmList;
-import org.apache.skywalking.apm.collector.storage.table.alarm.ServiceReferenceAlarmListTable;
+import org.apache.skywalking.apm.collector.storage.table.alarm.*;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.common.xcontent.*;
 
 /**
  * @author peng-yongsheng
@@ -59,21 +59,21 @@ public class ServiceReferenceAlarmListEsPersistenceDAO extends AbstractPersisten
         return serviceReferenceAlarmList;
     }
 
-    @Override protected Map<String, Object> esStreamDataToEsData(ServiceReferenceAlarmList streamData) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(ServiceReferenceAlarmListTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId());
-        target.put(ServiceReferenceAlarmListTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId());
-        target.put(ServiceReferenceAlarmListTable.FRONT_INSTANCE_ID.getName(), streamData.getFrontInstanceId());
-        target.put(ServiceReferenceAlarmListTable.BEHIND_INSTANCE_ID.getName(), streamData.getBehindInstanceId());
-        target.put(ServiceReferenceAlarmListTable.FRONT_SERVICE_ID.getName(), streamData.getFrontServiceId());
-        target.put(ServiceReferenceAlarmListTable.BEHIND_SERVICE_ID.getName(), streamData.getBehindServiceId());
-        target.put(ServiceReferenceAlarmListTable.SOURCE_VALUE.getName(), streamData.getSourceValue());
+    @Override protected XContentBuilder esStreamDataToEsData(ServiceReferenceAlarmList streamData) throws IOException {
+        return XContentFactory.jsonBuilder().startObject()
+            .field(ServiceReferenceAlarmListTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId())
+            .field(ServiceReferenceAlarmListTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId())
+            .field(ServiceReferenceAlarmListTable.FRONT_INSTANCE_ID.getName(), streamData.getFrontInstanceId())
+            .field(ServiceReferenceAlarmListTable.BEHIND_INSTANCE_ID.getName(), streamData.getBehindInstanceId())
+            .field(ServiceReferenceAlarmListTable.FRONT_SERVICE_ID.getName(), streamData.getFrontServiceId())
+            .field(ServiceReferenceAlarmListTable.BEHIND_SERVICE_ID.getName(), streamData.getBehindServiceId())
+            .field(ServiceReferenceAlarmListTable.SOURCE_VALUE.getName(), streamData.getSourceValue())
 
-        target.put(ServiceReferenceAlarmListTable.ALARM_TYPE.getName(), streamData.getAlarmType());
-        target.put(ServiceReferenceAlarmListTable.ALARM_CONTENT.getName(), streamData.getAlarmContent());
+            .field(ServiceReferenceAlarmListTable.ALARM_TYPE.getName(), streamData.getAlarmType())
+            .field(ServiceReferenceAlarmListTable.ALARM_CONTENT.getName(), streamData.getAlarmContent())
 
-        target.put(ServiceReferenceAlarmListTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
-        return target;
+            .field(ServiceReferenceAlarmListTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+            .endObject();
     }
 
     @Override protected String timeBucketColumnNameForDelete() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/amp/AbstractApplicationMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/amp/AbstractApplicationMetricEsPersistenceDAO.java
index b4f8d8b..2dbfadd 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/amp/AbstractApplicationMetricEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/amp/AbstractApplicationMetricEsPersistenceDAO.java
@@ -18,14 +18,14 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.amp;
 
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
-import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.application.ApplicationMetric;
-import org.apache.skywalking.apm.collector.storage.table.application.ApplicationMetricTable;
 import org.apache.skywalking.apm.collector.storage.es.MetricTransformUtil;
+import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
+import org.apache.skywalking.apm.collector.storage.table.application.*;
+import org.elasticsearch.common.xcontent.*;
 
 /**
  * @author peng-yongsheng
@@ -55,18 +55,19 @@ public abstract class AbstractApplicationMetricEsPersistenceDAO extends Abstract
         return applicationMetric;
     }
 
-    @Override protected final Map<String, Object> esStreamDataToEsData(ApplicationMetric streamData) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(ApplicationMetricTable.METRIC_ID.getName(), streamData.getMetricId());
+    @Override protected final XContentBuilder esStreamDataToEsData(ApplicationMetric streamData) throws IOException {
+        XContentBuilder target = XContentFactory.jsonBuilder().startObject()
+            .field(ApplicationMetricTable.METRIC_ID.getName(), streamData.getMetricId())
 
-        target.put(ApplicationMetricTable.APPLICATION_ID.getName(), streamData.getApplicationId());
+            .field(ApplicationMetricTable.APPLICATION_ID.getName(), streamData.getApplicationId())
 
-        MetricTransformUtil.INSTANCE.esStreamDataToEsData(streamData, target);
+            .field(ApplicationMetricTable.SATISFIED_COUNT.getName(), streamData.getSatisfiedCount())
+            .field(ApplicationMetricTable.TOLERATING_COUNT.getName(), streamData.getToleratingCount())
+            .field(ApplicationMetricTable.FRUSTRATED_COUNT.getName(), streamData.getFrustratedCount());
 
-        target.put(ApplicationMetricTable.SATISFIED_COUNT.getName(), streamData.getSatisfiedCount());
-        target.put(ApplicationMetricTable.TOLERATING_COUNT.getName(), streamData.getToleratingCount());
-        target.put(ApplicationMetricTable.FRUSTRATED_COUNT.getName(), streamData.getFrustratedCount());
+        MetricTransformUtil.INSTANCE.esStreamDataToEsData(streamData, target);
 
+        target.endObject();
         return target;
     }
 
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ampp/AbstractApplicationMappingEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ampp/AbstractApplicationMappingEsPersistenceDAO.java
index b02bd15..182be94 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ampp/AbstractApplicationMappingEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ampp/AbstractApplicationMappingEsPersistenceDAO.java
@@ -18,13 +18,13 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.ampp;
 
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.application.ApplicationMapping;
-import org.apache.skywalking.apm.collector.storage.table.application.ApplicationMappingTable;
+import org.apache.skywalking.apm.collector.storage.table.application.*;
+import org.elasticsearch.common.xcontent.*;
 
 /**
  * @author peng-yongsheng
@@ -49,15 +49,14 @@ public abstract class AbstractApplicationMappingEsPersistenceDAO extends Abstrac
         return applicationMapping;
     }
 
-    @Override protected final Map<String, Object> esStreamDataToEsData(ApplicationMapping streamData) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(ApplicationMappingTable.METRIC_ID.getName(), streamData.getMetricId());
+    @Override protected final XContentBuilder esStreamDataToEsData(ApplicationMapping streamData) throws IOException {
+        return XContentFactory.jsonBuilder().startObject()
+            .field(ApplicationMappingTable.METRIC_ID.getName(), streamData.getMetricId())
 
-        target.put(ApplicationMappingTable.APPLICATION_ID.getName(), streamData.getApplicationId());
-        target.put(ApplicationMappingTable.MAPPING_APPLICATION_ID.getName(), streamData.getMappingApplicationId());
-        target.put(ApplicationMappingTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
-
-        return target;
+            .field(ApplicationMappingTable.APPLICATION_ID.getName(), streamData.getApplicationId())
+            .field(ApplicationMappingTable.MAPPING_APPLICATION_ID.getName(), streamData.getMappingApplicationId())
+            .field(ApplicationMappingTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+            .endObject();
     }
 
     @GraphComputingMetric(name = "/persistence/get/" + ApplicationMappingTable.TABLE)
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/armp/AbstractApplicationReferenceMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/armp/AbstractApplicationReferenceMetricEsPersistenceDAO.java
index 0ef7935..6246199 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/armp/AbstractApplicationReferenceMetricEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/armp/AbstractApplicationReferenceMetricEsPersistenceDAO.java
@@ -18,14 +18,14 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.armp;
 
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
 import org.apache.skywalking.apm.collector.storage.es.MetricTransformUtil;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.application.ApplicationReferenceMetric;
-import org.apache.skywalking.apm.collector.storage.table.application.ApplicationReferenceMetricTable;
+import org.apache.skywalking.apm.collector.storage.table.application.*;
+import org.elasticsearch.common.xcontent.*;
 
 /**
  * @author peng-yongsheng
@@ -56,19 +56,21 @@ public abstract class AbstractApplicationReferenceMetricEsPersistenceDAO extends
         return applicationReferenceMetric;
     }
 
-    @Override protected final Map<String, Object> esStreamDataToEsData(ApplicationReferenceMetric streamData) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(ApplicationReferenceMetricTable.METRIC_ID.getName(), streamData.getMetricId());
+    @Override
+    protected final XContentBuilder esStreamDataToEsData(ApplicationReferenceMetric streamData) throws IOException {
+        XContentBuilder target = XContentFactory.jsonBuilder().startObject()
+            .field(ApplicationReferenceMetricTable.METRIC_ID.getName(), streamData.getMetricId())
 
-        target.put(ApplicationReferenceMetricTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId());
-        target.put(ApplicationReferenceMetricTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId());
+            .field(ApplicationReferenceMetricTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId())
+            .field(ApplicationReferenceMetricTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId())
 
-        MetricTransformUtil.INSTANCE.esStreamDataToEsData(streamData, target);
+            .field(ApplicationReferenceMetricTable.SATISFIED_COUNT.getName(), streamData.getSatisfiedCount())
+            .field(ApplicationReferenceMetricTable.TOLERATING_COUNT.getName(), streamData.getToleratingCount())
+            .field(ApplicationReferenceMetricTable.FRUSTRATED_COUNT.getName(), streamData.getFrustratedCount());
 
-        target.put(ApplicationReferenceMetricTable.SATISFIED_COUNT.getName(), streamData.getSatisfiedCount());
-        target.put(ApplicationReferenceMetricTable.TOLERATING_COUNT.getName(), streamData.getToleratingCount());
-        target.put(ApplicationReferenceMetricTable.FRUSTRATED_COUNT.getName(), streamData.getFrustratedCount());
+        MetricTransformUtil.INSTANCE.esStreamDataToEsData(streamData, target);
 
+        target.endObject();
         return target;
     }
 
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/cpu/AbstractCpuMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/cpu/AbstractCpuMetricEsPersistenceDAO.java
index fcae67f..6ffafd6 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/cpu/AbstractCpuMetricEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/cpu/AbstractCpuMetricEsPersistenceDAO.java
@@ -18,13 +18,13 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.cpu;
 
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.jvm.CpuMetric;
-import org.apache.skywalking.apm.collector.storage.table.jvm.CpuMetricTable;
+import org.apache.skywalking.apm.collector.storage.table.jvm.*;
+import org.elasticsearch.common.xcontent.*;
 
 /**
  * @author peng-yongsheng
@@ -52,16 +52,15 @@ public abstract class AbstractCpuMetricEsPersistenceDAO extends AbstractPersiste
         return cpuMetric;
     }
 
-    @Override protected final Map<String, Object> esStreamDataToEsData(CpuMetric streamData) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(CpuMetricTable.METRIC_ID.getName(), streamData.getMetricId());
+    @Override protected final XContentBuilder esStreamDataToEsData(CpuMetric streamData) throws IOException {
+        return XContentFactory.jsonBuilder().startObject()
+            .field(CpuMetricTable.METRIC_ID.getName(), streamData.getMetricId())
 
-        target.put(CpuMetricTable.INSTANCE_ID.getName(), streamData.getInstanceId());
-        target.put(CpuMetricTable.USAGE_PERCENT.getName(), streamData.getUsagePercent());
-        target.put(CpuMetricTable.TIMES.getName(), streamData.getTimes());
-        target.put(CpuMetricTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
-
-        return target;
+            .field(CpuMetricTable.INSTANCE_ID.getName(), streamData.getInstanceId())
+            .field(CpuMetricTable.USAGE_PERCENT.getName(), streamData.getUsagePercent())
+            .field(CpuMetricTable.TIMES.getName(), streamData.getTimes())
+            .field(CpuMetricTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+            .endObject();
     }
 
     @GraphComputingMetric(name = "/persistence/get/" + CpuMetricTable.TABLE)
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/gc/AbstractGCMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/gc/AbstractGCMetricEsPersistenceDAO.java
index c215b85..bd1fc53 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/gc/AbstractGCMetricEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/gc/AbstractGCMetricEsPersistenceDAO.java
@@ -18,11 +18,13 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.gc;
 
-import java.util.*;
+import java.io.IOException;
+import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
 import org.apache.skywalking.apm.collector.storage.table.jvm.*;
+import org.elasticsearch.common.xcontent.*;
 
 /**
  * @author peng-yongsheng
@@ -53,18 +55,17 @@ public abstract class AbstractGCMetricEsPersistenceDAO extends AbstractPersisten
         return gcMetric;
     }
 
-    @Override protected final Map<String, Object> esStreamDataToEsData(GCMetric streamData) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(GCMetricTable.METRIC_ID.getName(), streamData.getMetricId());
+    @Override protected final XContentBuilder esStreamDataToEsData(GCMetric streamData) throws IOException {
+        return XContentFactory.jsonBuilder().startObject()
+            .field(GCMetricTable.METRIC_ID.getName(), streamData.getMetricId())
 
-        target.put(GCMetricTable.INSTANCE_ID.getName(), streamData.getInstanceId());
-        target.put(GCMetricTable.PHRASE.getName(), streamData.getPhrase());
-        target.put(GCMetricTable.COUNT.getName(), streamData.getCount());
-        target.put(GCMetricTable.TIMES.getName(), streamData.getTimes());
-        target.put(GCMetricTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
-        target.put(GCMetricTable.DURATION.getName(), streamData.getDuration());
-
-        return target;
+            .field(GCMetricTable.INSTANCE_ID.getName(), streamData.getInstanceId())
+            .field(GCMetricTable.PHRASE.getName(), streamData.getPhrase())
+            .field(GCMetricTable.COUNT.getName(), streamData.getCount())
+            .field(GCMetricTable.TIMES.getName(), streamData.getTimes())
+            .field(GCMetricTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+            .field(GCMetricTable.DURATION.getName(), streamData.getDuration())
+            .endObject();
     }
 
     @GraphComputingMetric(name = "/persistence/get/" + GCMetricTable.TABLE)
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/imp/AbstractInstanceMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/imp/AbstractInstanceMetricEsPersistenceDAO.java
index a9d48f2..0abc207 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/imp/AbstractInstanceMetricEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/imp/AbstractInstanceMetricEsPersistenceDAO.java
@@ -18,14 +18,14 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.imp;
 
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
 import org.apache.skywalking.apm.collector.storage.es.MetricTransformUtil;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetric;
-import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetricTable;
+import org.apache.skywalking.apm.collector.storage.table.instance.*;
+import org.elasticsearch.common.xcontent.*;
 
 /**
  * @author peng-yongsheng
@@ -52,14 +52,15 @@ public abstract class AbstractInstanceMetricEsPersistenceDAO extends AbstractPer
         return instanceMetric;
     }
 
-    @Override protected final Map<String, Object> esStreamDataToEsData(InstanceMetric streamData) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(InstanceMetricTable.METRIC_ID.getName(), streamData.getMetricId());
-        target.put(InstanceMetricTable.APPLICATION_ID.getName(), streamData.getApplicationId());
-        target.put(InstanceMetricTable.INSTANCE_ID.getName(), streamData.getInstanceId());
+    @Override protected final XContentBuilder esStreamDataToEsData(InstanceMetric streamData) throws IOException {
+        XContentBuilder target = XContentFactory.jsonBuilder().startObject()
+            .field(InstanceMetricTable.METRIC_ID.getName(), streamData.getMetricId())
+            .field(InstanceMetricTable.APPLICATION_ID.getName(), streamData.getApplicationId())
+            .field(InstanceMetricTable.INSTANCE_ID.getName(), streamData.getInstanceId());
 
         MetricTransformUtil.INSTANCE.esStreamDataToEsData(streamData, target);
 
+        target.endObject();
         return target;
     }
 
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/impp/AbstractInstanceMappingEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/impp/AbstractInstanceMappingEsPersistenceDAO.java
index 093894b..6543832 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/impp/AbstractInstanceMappingEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/impp/AbstractInstanceMappingEsPersistenceDAO.java
@@ -18,13 +18,13 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.impp;
 
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMapping;
-import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMappingTable;
+import org.apache.skywalking.apm.collector.storage.table.instance.*;
+import org.elasticsearch.common.xcontent.*;
 
 /**
  * @author peng-yongsheng
@@ -50,16 +50,15 @@ public abstract class AbstractInstanceMappingEsPersistenceDAO extends AbstractPe
         return instanceMapping;
     }
 
-    @Override protected final Map<String, Object> esStreamDataToEsData(InstanceMapping streamData) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(InstanceMappingTable.METRIC_ID.getName(), streamData.getMetricId());
+    @Override protected final XContentBuilder esStreamDataToEsData(InstanceMapping streamData) throws IOException {
+        return XContentFactory.jsonBuilder().startObject()
+            .field(InstanceMappingTable.METRIC_ID.getName(), streamData.getMetricId())
 
-        target.put(InstanceMappingTable.APPLICATION_ID.getName(), streamData.getApplicationId());
-        target.put(InstanceMappingTable.INSTANCE_ID.getName(), streamData.getInstanceId());
-        target.put(InstanceMappingTable.ADDRESS_ID.getName(), streamData.getAddressId());
-        target.put(InstanceMappingTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
-
-        return target;
+            .field(InstanceMappingTable.APPLICATION_ID.getName(), streamData.getApplicationId())
+            .field(InstanceMappingTable.INSTANCE_ID.getName(), streamData.getInstanceId())
+            .field(InstanceMappingTable.ADDRESS_ID.getName(), streamData.getAddressId())
+            .field(InstanceMappingTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+            .endObject();
     }
 
     @GraphComputingMetric(name = "/persistence/get/" + InstanceMappingTable.TABLE)
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/AbstractInstanceReferenceMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/AbstractInstanceReferenceMetricEsPersistenceDAO.java
index fa474b7..9ae8c87 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/AbstractInstanceReferenceMetricEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/irmp/AbstractInstanceReferenceMetricEsPersistenceDAO.java
@@ -18,14 +18,14 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.irmp;
 
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
 import org.apache.skywalking.apm.collector.storage.es.MetricTransformUtil;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric;
-import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetricTable;
+import org.apache.skywalking.apm.collector.storage.table.instance.*;
+import org.elasticsearch.common.xcontent.*;
 
 /**
  * @author peng-yongsheng
@@ -54,17 +54,18 @@ public abstract class AbstractInstanceReferenceMetricEsPersistenceDAO extends Ab
         return instanceReferenceMetric;
     }
 
-    @Override protected final Map<String, Object> esStreamDataToEsData(InstanceReferenceMetric streamData) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(InstanceReferenceMetricTable.METRIC_ID.getName(), streamData.getMetricId());
-
-        target.put(InstanceReferenceMetricTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId());
-        target.put(InstanceReferenceMetricTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId());
-        target.put(InstanceReferenceMetricTable.FRONT_INSTANCE_ID.getName(), streamData.getFrontInstanceId());
-        target.put(InstanceReferenceMetricTable.BEHIND_INSTANCE_ID.getName(), streamData.getBehindInstanceId());
+    @Override
+    protected final XContentBuilder esStreamDataToEsData(InstanceReferenceMetric streamData) throws IOException {
+        XContentBuilder target = XContentFactory.jsonBuilder().startObject()
+            .field(InstanceReferenceMetricTable.METRIC_ID.getName(), streamData.getMetricId())
+            .field(InstanceReferenceMetricTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId())
+            .field(InstanceReferenceMetricTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId())
+            .field(InstanceReferenceMetricTable.FRONT_INSTANCE_ID.getName(), streamData.getFrontInstanceId())
+            .field(InstanceReferenceMetricTable.BEHIND_INSTANCE_ID.getName(), streamData.getBehindInstanceId());
 
         MetricTransformUtil.INSTANCE.esStreamDataToEsData(streamData, target);
 
+        target.endObject();
         return target;
     }
 
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/memory/AbstractMemoryMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/memory/AbstractMemoryMetricEsPersistenceDAO.java
index 8f12f0d..df85e52 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/memory/AbstractMemoryMetricEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/memory/AbstractMemoryMetricEsPersistenceDAO.java
@@ -18,13 +18,13 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.memory;
 
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.jvm.MemoryMetric;
-import org.apache.skywalking.apm.collector.storage.table.jvm.MemoryMetricTable;
+import org.apache.skywalking.apm.collector.storage.table.jvm.*;
+import org.elasticsearch.common.xcontent.*;
 
 /**
  * @author peng-yongsheng
@@ -56,20 +56,19 @@ public abstract class AbstractMemoryMetricEsPersistenceDAO extends AbstractPersi
         return memoryMetric;
     }
 
-    @Override protected final Map<String, Object> esStreamDataToEsData(MemoryMetric streamData) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(MemoryMetricTable.METRIC_ID.getName(), streamData.getMetricId());
+    @Override protected final XContentBuilder esStreamDataToEsData(MemoryMetric streamData) throws IOException {
+        return XContentFactory.jsonBuilder().startObject()
+            .field(MemoryMetricTable.METRIC_ID.getName(), streamData.getMetricId())
 
-        target.put(MemoryMetricTable.INSTANCE_ID.getName(), streamData.getInstanceId());
-        target.put(MemoryMetricTable.IS_HEAP.getName(), streamData.getIsHeap());
-        target.put(MemoryMetricTable.INIT.getName(), streamData.getInit());
-        target.put(MemoryMetricTable.MAX.getName(), streamData.getMax());
-        target.put(MemoryMetricTable.USED.getName(), streamData.getUsed());
-        target.put(MemoryMetricTable.COMMITTED.getName(), streamData.getCommitted());
-        target.put(MemoryMetricTable.TIMES.getName(), streamData.getTimes());
-        target.put(MemoryMetricTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
-
-        return target;
+            .field(MemoryMetricTable.INSTANCE_ID.getName(), streamData.getInstanceId())
+            .field(MemoryMetricTable.IS_HEAP.getName(), streamData.getIsHeap())
+            .field(MemoryMetricTable.INIT.getName(), streamData.getInit())
+            .field(MemoryMetricTable.MAX.getName(), streamData.getMax())
+            .field(MemoryMetricTable.USED.getName(), streamData.getUsed())
+            .field(MemoryMetricTable.COMMITTED.getName(), streamData.getCommitted())
+            .field(MemoryMetricTable.TIMES.getName(), streamData.getTimes())
+            .field(MemoryMetricTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+            .endObject();
     }
 
     @GraphComputingMetric(name = "/persistence/get/" + MemoryMetricTable.TABLE)
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/mpool/AbstractMemoryPoolMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/mpool/AbstractMemoryPoolMetricEsPersistenceDAO.java
index 12f87cd..20dd4e9 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/mpool/AbstractMemoryPoolMetricEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/mpool/AbstractMemoryPoolMetricEsPersistenceDAO.java
@@ -18,13 +18,13 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.mpool;
 
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetric;
-import org.apache.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetricTable;
+import org.apache.skywalking.apm.collector.storage.table.jvm.*;
+import org.elasticsearch.common.xcontent.*;
 
 /**
  * @author peng-yongsheng
@@ -59,20 +59,18 @@ public abstract class AbstractMemoryPoolMetricEsPersistenceDAO extends AbstractP
     }
 
     @Override
-    protected final Map<String, Object> esStreamDataToEsData(MemoryPoolMetric streamData) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(MemoryPoolMetricTable.METRIC_ID.getName(), streamData.getMetricId());
-
-        target.put(MemoryPoolMetricTable.INSTANCE_ID.getName(), streamData.getInstanceId());
-        target.put(MemoryPoolMetricTable.POOL_TYPE.getName(), streamData.getPoolType());
-        target.put(MemoryPoolMetricTable.INIT.getName(), streamData.getInit());
-        target.put(MemoryPoolMetricTable.MAX.getName(), streamData.getMax());
-        target.put(MemoryPoolMetricTable.USED.getName(), streamData.getUsed());
-        target.put(MemoryPoolMetricTable.COMMITTED.getName(), streamData.getCommitted());
-        target.put(MemoryPoolMetricTable.TIMES.getName(), streamData.getTimes());
-        target.put(MemoryPoolMetricTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
-
-        return target;
+    protected final XContentBuilder esStreamDataToEsData(MemoryPoolMetric streamData) throws IOException {
+        return XContentFactory.jsonBuilder().startObject()
+            .field(MemoryPoolMetricTable.METRIC_ID.getName(), streamData.getMetricId())
+            .field(MemoryPoolMetricTable.INSTANCE_ID.getName(), streamData.getInstanceId())
+            .field(MemoryPoolMetricTable.POOL_TYPE.getName(), streamData.getPoolType())
+            .field(MemoryPoolMetricTable.INIT.getName(), streamData.getInit())
+            .field(MemoryPoolMetricTable.MAX.getName(), streamData.getMax())
+            .field(MemoryPoolMetricTable.USED.getName(), streamData.getUsed())
+            .field(MemoryPoolMetricTable.COMMITTED.getName(), streamData.getCommitted())
+            .field(MemoryPoolMetricTable.TIMES.getName(), streamData.getTimes())
+            .field(MemoryPoolMetricTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+            .endObject();
     }
 
     @GraphComputingMetric(name = "/persistence/get/" + MemoryPoolMetricTable.TABLE)
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/rtd/AbstractResponseTimeDistributionEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/rtd/AbstractResponseTimeDistributionEsPersistenceDAO.java
index d6fb4a5..a37a76d 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/rtd/AbstractResponseTimeDistributionEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/rtd/AbstractResponseTimeDistributionEsPersistenceDAO.java
@@ -18,13 +18,13 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.rtd;
 
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.global.ResponseTimeDistribution;
-import org.apache.skywalking.apm.collector.storage.table.global.ResponseTimeDistributionTable;
+import org.apache.skywalking.apm.collector.storage.table.global.*;
+import org.elasticsearch.common.xcontent.*;
 
 /**
  * @author peng-yongsheng
@@ -53,19 +53,19 @@ public abstract class AbstractResponseTimeDistributionEsPersistenceDAO extends A
         return responseTimeDistribution;
     }
 
-    @Override protected final Map<String, Object> esStreamDataToEsData(ResponseTimeDistribution streamData) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(ResponseTimeDistributionTable.METRIC_ID.getName(), streamData.getMetricId());
+    @Override
+    protected final XContentBuilder esStreamDataToEsData(ResponseTimeDistribution streamData) throws IOException {
+        return XContentFactory.jsonBuilder().startObject()
+            .field(ResponseTimeDistributionTable.METRIC_ID.getName(), streamData.getMetricId())
 
-        target.put(ResponseTimeDistributionTable.STEP.getName(), streamData.getStep());
+            .field(ResponseTimeDistributionTable.STEP.getName(), streamData.getStep())
 
-        target.put(ResponseTimeDistributionTable.CALLS.getName(), streamData.getCalls());
-        target.put(ResponseTimeDistributionTable.ERROR_CALLS.getName(), streamData.getErrorCalls());
-        target.put(ResponseTimeDistributionTable.SUCCESS_CALLS.getName(), streamData.getSuccessCalls());
+            .field(ResponseTimeDistributionTable.CALLS.getName(), streamData.getCalls())
+            .field(ResponseTimeDistributionTable.ERROR_CALLS.getName(), streamData.getErrorCalls())
+            .field(ResponseTimeDistributionTable.SUCCESS_CALLS.getName(), streamData.getSuccessCalls())
 
-        target.put(ResponseTimeDistributionTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
-
-        return target;
+            .field(ResponseTimeDistributionTable.TIME_BUCKET.getName(), streamData.getTimeBucket())
+            .endObject();
     }
 
     @GraphComputingMetric(name = "/persistence/get/" + ResponseTimeDistributionTable.TABLE)
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/smp/AbstractServiceMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/smp/AbstractServiceMetricEsPersistenceDAO.java
index 4530ccc..b07f6a5 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/smp/AbstractServiceMetricEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/smp/AbstractServiceMetricEsPersistenceDAO.java
@@ -18,14 +18,14 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.smp;
 
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
 import org.apache.skywalking.apm.collector.storage.es.MetricTransformUtil;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.service.ServiceMetric;
-import org.apache.skywalking.apm.collector.storage.table.service.ServiceMetricTable;
+import org.apache.skywalking.apm.collector.storage.table.service.*;
+import org.elasticsearch.common.xcontent.*;
 
 /**
  * @author peng-yongsheng
@@ -53,16 +53,17 @@ public abstract class AbstractServiceMetricEsPersistenceDAO extends AbstractPers
         return serviceMetric;
     }
 
-    @Override protected final Map<String, Object> esStreamDataToEsData(ServiceMetric streamData) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(ServiceMetricTable.METRIC_ID.getName(), streamData.getMetricId());
+    @Override protected final XContentBuilder esStreamDataToEsData(ServiceMetric streamData) throws IOException {
+        XContentBuilder target = XContentFactory.jsonBuilder().startObject()
+            .field(ServiceMetricTable.METRIC_ID.getName(), streamData.getMetricId())
 
-        target.put(ServiceMetricTable.APPLICATION_ID.getName(), streamData.getApplicationId());
-        target.put(ServiceMetricTable.INSTANCE_ID.getName(), streamData.getInstanceId());
-        target.put(ServiceMetricTable.SERVICE_ID.getName(), streamData.getServiceId());
+            .field(ServiceMetricTable.APPLICATION_ID.getName(), streamData.getApplicationId())
+            .field(ServiceMetricTable.INSTANCE_ID.getName(), streamData.getInstanceId())
+            .field(ServiceMetricTable.SERVICE_ID.getName(), streamData.getServiceId());
 
         MetricTransformUtil.INSTANCE.esStreamDataToEsData(streamData, target);
 
+        target.endObject();
         return target;
     }
 
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/srmp/AbstractServiceReferenceMetricEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/srmp/AbstractServiceReferenceMetricEsPersistenceDAO.java
index 3a1a8df..1d9078a 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/srmp/AbstractServiceReferenceMetricEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/srmp/AbstractServiceReferenceMetricEsPersistenceDAO.java
@@ -18,14 +18,14 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.srmp;
 
-import java.util.HashMap;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
 import org.apache.skywalking.apm.collector.storage.es.MetricTransformUtil;
 import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
-import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
-import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetricTable;
+import org.apache.skywalking.apm.collector.storage.table.service.*;
+import org.elasticsearch.common.xcontent.*;
 
 /**
  * @author peng-yongsheng
@@ -56,19 +56,20 @@ public abstract class AbstractServiceReferenceMetricEsPersistenceDAO extends Abs
         return serviceReferenceMetric;
     }
 
-    @Override protected final Map<String, Object> esStreamDataToEsData(ServiceReferenceMetric streamData) {
-        Map<String, Object> target = new HashMap<>();
-        target.put(ServiceReferenceMetricTable.METRIC_ID.getName(), streamData.getMetricId());
-
-        target.put(ServiceReferenceMetricTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId());
-        target.put(ServiceReferenceMetricTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId());
-        target.put(ServiceReferenceMetricTable.FRONT_INSTANCE_ID.getName(), streamData.getFrontInstanceId());
-        target.put(ServiceReferenceMetricTable.BEHIND_INSTANCE_ID.getName(), streamData.getBehindInstanceId());
-        target.put(ServiceReferenceMetricTable.FRONT_SERVICE_ID.getName(), streamData.getFrontServiceId());
-        target.put(ServiceReferenceMetricTable.BEHIND_SERVICE_ID.getName(), streamData.getBehindServiceId());
+    @Override
+    protected final XContentBuilder esStreamDataToEsData(ServiceReferenceMetric streamData) throws IOException {
+        XContentBuilder target = XContentFactory.jsonBuilder().startObject()
+            .field(ServiceReferenceMetricTable.METRIC_ID.getName(), streamData.getMetricId())
+            .field(ServiceReferenceMetricTable.FRONT_APPLICATION_ID.getName(), streamData.getFrontApplicationId())
+            .field(ServiceReferenceMetricTable.BEHIND_APPLICATION_ID.getName(), streamData.getBehindApplicationId())
+            .field(ServiceReferenceMetricTable.FRONT_INSTANCE_ID.getName(), streamData.getFrontInstanceId())
+            .field(ServiceReferenceMetricTable.BEHIND_INSTANCE_ID.getName(), streamData.getBehindInstanceId())
+            .field(ServiceReferenceMetricTable.FRONT_SERVICE_ID.getName(), streamData.getFrontServiceId())
+            .field(ServiceReferenceMetricTable.BEHIND_SERVICE_ID.getName(), streamData.getBehindServiceId());
 
         MetricTransformUtil.INSTANCE.esStreamDataToEsData(streamData, target);
 
+        target.endObject();
         return target;
     }
 
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationComponentEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationComponentEsUIDAO.java
index 93ee042..a2ce032 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationComponentEsUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationComponentEsUIDAO.java
@@ -52,7 +52,7 @@ public class ApplicationComponentEsUIDAO extends EsDAO implements IApplicationCo
         SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(tableName);
         searchRequestBuilder.setTypes(ApplicationComponentTable.TABLE_TYPE);
         searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
-        searchRequestBuilder.setQuery(QueryBuilders.rangeQuery(ApplicationComponentTable.TIME_BUCKET.getName()).gte(startTimeBucket).lte(endTimeBucket));
+        searchRequestBuilder.setPostFilter(QueryBuilders.rangeQuery(ApplicationComponentTable.TIME_BUCKET.getName()).gte(startTimeBucket).lte(endTimeBucket));
         searchRequestBuilder.setSize(0);
 
         searchRequestBuilder.addAggregation(AggregationBuilders.terms(ApplicationComponentTable.COMPONENT_ID.getName()).field(ApplicationComponentTable.COMPONENT_ID.getName()).size(100)
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationMappingEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationMappingEsUIDAO.java
index 4164474..676263f 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationMappingEsUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationMappingEsUIDAO.java
@@ -52,7 +52,7 @@ public class ApplicationMappingEsUIDAO extends EsDAO implements IApplicationMapp
         SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(tableName);
         searchRequestBuilder.setTypes(ApplicationMappingTable.TABLE_TYPE);
         searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
-        searchRequestBuilder.setQuery(QueryBuilders.rangeQuery(ApplicationMappingTable.TIME_BUCKET.getName()).gte(startTimeBucket).lte(endTimeBucket));
+        searchRequestBuilder.setPostFilter(QueryBuilders.rangeQuery(ApplicationMappingTable.TIME_BUCKET.getName()).gte(startTimeBucket).lte(endTimeBucket));
         searchRequestBuilder.setSize(0);
 
         searchRequestBuilder.addAggregation(
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationMetricEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationMetricEsUIDAO.java
index 0fc5d87..4da0071 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationMetricEsUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationMetricEsUIDAO.java
@@ -104,7 +104,7 @@ public class ApplicationMetricEsUIDAO extends EsDAO implements IApplicationMetri
         boolQuery.must().add(QueryBuilders.rangeQuery(ApplicationMetricTable.TIME_BUCKET.getName()).gte(startTimeBucket).lte(endTimeBucket));
         boolQuery.must().add(QueryBuilders.termQuery(ApplicationMetricTable.SOURCE_VALUE.getName(), metricSource.getValue()));
 
-        searchRequestBuilder.setQuery(boolQuery);
+        searchRequestBuilder.setPostFilter(boolQuery);
         searchRequestBuilder.setSize(0);
 
         TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms(ApplicationMetricTable.APPLICATION_ID.getName()).field(ApplicationMetricTable.APPLICATION_ID.getName()).size(100);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationReferenceMetricEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationReferenceMetricEsUIDAO.java
index 3dd84de..169f067 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationReferenceMetricEsUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/ApplicationReferenceMetricEsUIDAO.java
@@ -71,7 +71,7 @@ public class ApplicationReferenceMetricEsUIDAO extends EsDAO implements IApplica
             boolQuery.must().add(applicationBoolQuery);
         }
 
-        searchRequestBuilder.setQuery(boolQuery);
+        searchRequestBuilder.setPostFilter(boolQuery);
         searchRequestBuilder.setSize(0);
 
         return buildMetrics(searchRequestBuilder);
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceEsUIDAO.java
index 30f20ef..00e0ff9 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceEsUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/InstanceEsUIDAO.java
@@ -176,7 +176,7 @@ public class InstanceEsUIDAO extends EsDAO implements IInstanceUIDAO {
         timeBoolQuery.should().add(boolQuery2);
 
         boolQuery.must().add(timeBoolQuery);
-        searchRequestBuilder.setQuery(boolQuery);
+        searchRequestBuilder.setPostFilter(boolQuery);
 
         SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
         SearchHit[] searchHits = searchResponse.getHits().getHits();
@@ -190,7 +190,7 @@ public class InstanceEsUIDAO extends EsDAO implements IInstanceUIDAO {
         searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
         searchRequestBuilder.setSize(1);
 
-        searchRequestBuilder.setQuery(QueryBuilders.termQuery(InstanceTable.APPLICATION_ID.getName(), applicationId));
+        searchRequestBuilder.setPostFilter(QueryBuilders.termQuery(InstanceTable.APPLICATION_ID.getName(), applicationId));
         searchRequestBuilder.addSort(SortBuilders.fieldSort(InstanceTable.REGISTER_TIME.getName()).order(SortOrder.ASC));
 
         SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
@@ -210,7 +210,7 @@ public class InstanceEsUIDAO extends EsDAO implements IInstanceUIDAO {
         searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
         searchRequestBuilder.setSize(1);
 
-        searchRequestBuilder.setQuery(QueryBuilders.termQuery(InstanceTable.APPLICATION_ID.getName(), applicationId));
+        searchRequestBuilder.setPostFilter(QueryBuilders.termQuery(InstanceTable.APPLICATION_ID.getName(), applicationId));
         searchRequestBuilder.addSort(SortBuilders.fieldSort(InstanceTable.HEARTBEAT_TIME.getName()).order(SortOrder.DESC));
 
         SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
diff --git a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/base/dao/BatchH2DAO.java b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/base/dao/BatchH2DAO.java
index 3a11e32..0de2a2c 100644
--- a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/base/dao/BatchH2DAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/base/dao/BatchH2DAO.java
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
  */
 public class BatchH2DAO extends H2DAO implements IBatchDAO {
 
-    private final Logger logger = LoggerFactory.getLogger(BatchH2DAO.class);
+    private static final Logger logger = LoggerFactory.getLogger(BatchH2DAO.class);
 
     public BatchH2DAO(H2Client client) {
         super(client);
@@ -44,7 +44,10 @@ public class BatchH2DAO extends H2DAO implements IBatchDAO {
     @Override
     public void batchPersistence(List<?> batchCollection) {
         if (batchCollection != null && batchCollection.size() > 0) {
-            logger.debug("the batch collection size is {}", batchCollection.size());
+            if (logger.isDebugEnabled()) {
+                logger.debug("the batch collection size is {}", batchCollection.size());
+            }
+
             Connection conn;
             final Map<String, PreparedStatement> batchSqls = new HashMap<>();
             try {
@@ -63,7 +66,10 @@ public class BatchH2DAO extends H2DAO implements IBatchDAO {
 
                     Object[] params = e.getParams();
                     if (params != null) {
-                        logger.debug("the sql is {}, params size is {}, params: {}", e.getSql(), params.length, params);
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("the sql is {}, params size is {}, params: {}", e.getSql(), params.length, params);
+                        }
+
                         for (int i = 0; i < params.length; i++) {
                             ps.setObject(i + 1, params[i]);
                         }
diff --git a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/jetty/handler/GraphQLHandler.java b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/jetty/handler/GraphQLHandler.java
index 56f36c4..2b4ea52 100644
--- a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/jetty/handler/GraphQLHandler.java
+++ b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/jetty/handler/GraphQLHandler.java
@@ -19,43 +19,23 @@
 package org.apache.skywalking.apm.collector.ui.jetty.handler;
 
 import com.coxautodev.graphql.tools.SchemaParser;
-import com.google.gson.Gson;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
+import com.google.gson.*;
 import com.google.gson.reflect.TypeToken;
-import graphql.ExecutionInput;
-import graphql.ExecutionResult;
-import graphql.GraphQL;
-import graphql.GraphQLError;
+import graphql.*;
 import graphql.schema.GraphQLSchema;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.lang.reflect.Type;
-import java.util.List;
-import java.util.Map;
+import java.io.*;
+import java.util.*;
 import javax.servlet.http.HttpServletRequest;
 import org.apache.skywalking.apm.collector.core.module.ModuleManager;
 import org.apache.skywalking.apm.collector.core.util.CollectionUtils;
-import org.apache.skywalking.apm.collector.server.jetty.ArgumentsParseException;
 import org.apache.skywalking.apm.collector.server.jetty.JettyJsonHandler;
-import org.apache.skywalking.apm.collector.storage.ui.application.ApplicationNode;
-import org.apache.skywalking.apm.collector.storage.ui.application.ConjecturalNode;
+import org.apache.skywalking.apm.collector.storage.ui.application.*;
 import org.apache.skywalking.apm.collector.storage.ui.common.VisualUserNode;
 import org.apache.skywalking.apm.collector.storage.ui.service.ServiceNode;
-import org.apache.skywalking.apm.collector.ui.graphql.VersionMutation;
-import org.apache.skywalking.apm.collector.ui.graphql.VersionQuery;
+import org.apache.skywalking.apm.collector.ui.graphql.*;
 import org.apache.skywalking.apm.collector.ui.mutation.ConfigMutation;
-import org.apache.skywalking.apm.collector.ui.query.AlarmQuery;
-import org.apache.skywalking.apm.collector.ui.query.ApplicationQuery;
-import org.apache.skywalking.apm.collector.ui.query.ConfigQuery;
-import org.apache.skywalking.apm.collector.ui.query.OverViewLayerQuery;
-import org.apache.skywalking.apm.collector.ui.query.ServerQuery;
-import org.apache.skywalking.apm.collector.ui.query.ServiceQuery;
-import org.apache.skywalking.apm.collector.ui.query.TraceQuery;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.skywalking.apm.collector.ui.query.*;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
@@ -96,30 +76,29 @@ public class GraphQLHandler extends JettyJsonHandler {
         return "/graphql";
     }
 
-    @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
+    @Override protected JsonElement doGet(HttpServletRequest req) {
         return execute(req.getParameter(QUERY), null);
     }
 
-    @Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException, IOException {
+    @Override protected JsonElement doPost(HttpServletRequest req) throws IOException {
         BufferedReader reader = new BufferedReader(new InputStreamReader(req.getInputStream()));
         String line;
-        String request = "";
+        StringBuilder request = new StringBuilder();
         while ((line = reader.readLine()) != null) {
-            request += line;
+            request.append(line);
         }
 
-        JsonObject requestJson = gson.fromJson(request, JsonObject.class);
+        JsonObject requestJson = gson.fromJson(request.toString(), JsonObject.class);
 
-        Type mapType = new TypeToken<Map<String, Object>>() { }.getType();
-
-        return execute(requestJson.get(QUERY).getAsString(), gson.fromJson(requestJson.get(VARIABLES), mapType));
+        return execute(requestJson.get(QUERY).getAsString(), gson.fromJson(requestJson.get(VARIABLES), new TypeToken<Map<String, Object>>() {
+        }.getType()));
     }
 
     private JsonObject execute(String request, Map<String, Object> variables) {
         try {
             ExecutionInput executionInput = ExecutionInput.newExecutionInput().query(request).variables(variables).build();
             ExecutionResult executionResult = graphQL.execute(executionInput);
-            logger.info("Execution result is {}", executionResult);
+            logger.debug("Execution result is {}", executionResult);
             Object data = executionResult.getData();
             List<GraphQLError> errors = executionResult.getErrors();
 
diff --git a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/ApplicationTopologyService.java b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/ApplicationTopologyService.java
index 61159c3..535f49c 100644
--- a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/ApplicationTopologyService.java
+++ b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/ApplicationTopologyService.java
@@ -75,7 +75,7 @@ public class ApplicationTopologyService {
 
         TopologyBuilder builder = new TopologyBuilder(moduleManager);
 
-        Topology topology = builder.build(applicationComponents, applicationMappings, applicationMetrics, callerReferenceMetric, calleeReferenceMetric, step, startTimeBucket, endTimeBucket, startSecondTimeBucket, endSecondTimeBucket);
+        Topology topology = builder.build(applicationComponents, applicationMappings, applicationMetrics, callerReferenceMetric, calleeReferenceMetric, startSecondTimeBucket, endSecondTimeBucket);
 
         Set<Integer> nodeIds = new HashSet<>();
         topology.getCalls().forEach(call -> {
diff --git a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/ClusterTopologyService.java b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/ClusterTopologyService.java
index 6616df6..d0d1f04 100644
--- a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/ClusterTopologyService.java
+++ b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/ClusterTopologyService.java
@@ -67,6 +67,6 @@ public class ClusterTopologyService {
 
         TopologyBuilder builder = new TopologyBuilder(moduleManager);
 
-        return builder.build(applicationComponents, applicationMappings, applicationMetrics, callerReferenceMetric, calleeReferenceMetric, step, startTimeBucket, endTimeBucket, startSecondTimeBucket, endSecondTimeBucket);
+        return builder.build(applicationComponents, applicationMappings, applicationMetrics, callerReferenceMetric, calleeReferenceMetric, startSecondTimeBucket, endSecondTimeBucket);
     }
 }
diff --git a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/TopologyBuilder.java b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/TopologyBuilder.java
index 6ef3f36..c23f579 100644
--- a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/TopologyBuilder.java
+++ b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/service/TopologyBuilder.java
@@ -28,7 +28,6 @@ import org.apache.skywalking.apm.collector.core.module.ModuleManager;
 import org.apache.skywalking.apm.collector.core.util.*;
 import org.apache.skywalking.apm.collector.storage.dao.ui.*;
 import org.apache.skywalking.apm.collector.storage.table.register.Application;
-import org.apache.skywalking.apm.collector.storage.ui.alarm.Alarm;
 import org.apache.skywalking.apm.collector.storage.ui.application.*;
 import org.apache.skywalking.apm.collector.storage.ui.common.*;
 import org.apache.skywalking.apm.collector.ui.utils.*;
@@ -44,14 +43,12 @@ class TopologyBuilder {
     private final ApplicationCacheService applicationCacheService;
     private final ServerService serverService;
     private final DateBetweenService dateBetweenService;
-    private final AlarmService alarmService;
     private final IComponentLibraryCatalogService componentLibraryCatalogService;
 
     TopologyBuilder(ModuleManager moduleManager) {
         this.applicationCacheService = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class);
         this.serverService = new ServerService(moduleManager);
         this.dateBetweenService = new DateBetweenService(moduleManager);
-        this.alarmService = new AlarmService(moduleManager);
         this.componentLibraryCatalogService = moduleManager.find(ConfigurationModule.NAME).getService(IComponentLibraryCatalogService.class);
     }
 
@@ -60,17 +57,17 @@ class TopologyBuilder {
         List<IApplicationMetricUIDAO.ApplicationMetric> applicationMetrics,
         List<IApplicationReferenceMetricUIDAO.ApplicationReferenceMetric> callerReferenceMetric,
         List<IApplicationReferenceMetricUIDAO.ApplicationReferenceMetric> calleeReferenceMetric,
-        Step step, long startTimeBucket, long endTimeBucket, long startSecondTimeBucket, long endSecondTimeBucket) {
+        long startSecondTimeBucket, long endSecondTimeBucket) {
         Map<Integer, String> nodeCompMap = buildNodeCompMap(applicationComponents);
         Map<Integer, String> conjecturalNodeCompMap = buildConjecturalNodeCompMap(applicationComponents);
         Map<Integer, Integer> mappings = changeMapping2Map(applicationMappings);
-
         filterZeroSourceOrTargetReference(callerReferenceMetric);
         filterZeroSourceOrTargetReference(calleeReferenceMetric);
-
         calleeReferenceMetric = calleeReferenceMetricFilter(calleeReferenceMetric);
 
         List<Node> nodes = new LinkedList<>();
+        Map<Integer, Integer> applicationMinuteBetweenMap = new HashMap<>();
+        Map<Integer, Integer> numberOfServer = new HashMap<>();
         applicationMetrics.forEach(applicationMetric -> {
             int applicationId = applicationMetric.getId();
             Application application = applicationCacheService.getApplicationById(applicationId);
@@ -81,23 +78,14 @@ class TopologyBuilder {
 
             applicationNode.setSla(SLACalculator.INSTANCE.calculate(applicationMetric.getErrorCalls(), applicationMetric.getCalls()));
             try {
-                applicationNode.setCpm(applicationMetric.getCalls() / dateBetweenService.minutesBetween(applicationId, startSecondTimeBucket, endSecondTimeBucket));
+                applicationNode.setCpm(applicationMetric.getCalls() / getApplicationMinuteBetween(applicationMinuteBetweenMap, applicationId, startSecondTimeBucket, endSecondTimeBucket));
             } catch (ParseException e) {
                 logger.error(e.getMessage(), e);
             }
             applicationNode.setAvgResponseTime(applicationMetric.getDurations() / applicationMetric.getCalls());
             applicationNode.setApdex(ApdexCalculator.INSTANCE.calculate(applicationMetric.getSatisfiedCount(), applicationMetric.getToleratingCount(), applicationMetric.getFrustratedCount()));
             applicationNode.setAlarm(false);
-            try {
-                Alarm alarm = alarmService.loadApplicationAlarmList(Const.EMPTY_STRING, applicationId, step, startTimeBucket, endTimeBucket, 1, 0);
-                if (alarm.getItems().size() > 0) {
-                    applicationNode.setAlarm(true);
-                }
-            } catch (ParseException e) {
-                logger.error(e.getMessage(), e);
-            }
-
-            applicationNode.setNumOfServer(serverService.getAllServer(applicationId, startSecondTimeBucket, endSecondTimeBucket).size());
+            applicationNode.setNumOfServer(getNumberOfServer(numberOfServer, applicationId, startSecondTimeBucket, endSecondTimeBucket));
             nodes.add(applicationNode);
         });
 
@@ -139,7 +127,7 @@ class TopologyBuilder {
             call.setAlert(false);
             call.setCallType(nodeCompMap.get(referenceMetric.getTarget()));
             try {
-                call.setCpm(referenceMetric.getCalls() / dateBetweenService.minutesBetween(source.getApplicationId(), startSecondTimeBucket, endSecondTimeBucket));
+                call.setCpm(referenceMetric.getCalls() / getApplicationMinuteBetween(applicationMinuteBetweenMap, source.getApplicationId(), startSecondTimeBucket, endSecondTimeBucket));
             } catch (ParseException e) {
                 logger.error(e.getMessage(), e);
             }
@@ -186,7 +174,7 @@ class TopologyBuilder {
                 call.setCallType(nodeCompMap.get(referenceMetric.getTarget()));
             }
             try {
-                call.setCpm(referenceMetric.getCalls() / dateBetweenService.minutesBetween(target.getApplicationId(), startSecondTimeBucket, endSecondTimeBucket));
+                call.setCpm(referenceMetric.getCalls() / getApplicationMinuteBetween(applicationMinuteBetweenMap, target.getApplicationId(), startSecondTimeBucket, endSecondTimeBucket));
             } catch (ParseException e) {
                 logger.error(e.getMessage(), e);
             }
@@ -257,4 +245,28 @@ class TopologyBuilder {
             }
         }
     }
+
+    private int getApplicationMinuteBetween(Map<Integer, Integer> applicationMinuteBetweenMap, int applicationId,
+        long startSecondTimeBucket,
+        long endSecondTimeBucket) throws ParseException {
+        if (applicationMinuteBetweenMap.containsKey(applicationId)) {
+            return applicationMinuteBetweenMap.get(applicationId);
+        } else {
+            int applicationMinuteBetween = dateBetweenService.minutesBetween(applicationId, startSecondTimeBucket, endSecondTimeBucket);
+            applicationMinuteBetweenMap.put(applicationId, applicationMinuteBetween);
+            return applicationMinuteBetween;
+        }
+    }
+
+    private int getNumberOfServer(Map<Integer, Integer> numberOfServerMap, int applicationId,
+        long startSecondTimeBucket,
+        long endSecondTimeBucket) {
+        if (numberOfServerMap.containsKey(applicationId)) {
+            return numberOfServerMap.get(applicationId);
+        } else {
+            int numberOfServer = serverService.getAllServer(applicationId, startSecondTimeBucket, endSecondTimeBucket).size();
+            numberOfServerMap.put(applicationId, numberOfServer);
+            return numberOfServer;
+        }
+    }
 }
diff --git a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/test/java/org/apache/skywalking/apm/collector/ui/service/TopologyBuilderTest.java b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/test/java/org/apache/skywalking/apm/collector/ui/service/TopologyBuilderTest.java
index 35d23e3..d04376e 100644
--- a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/test/java/org/apache/skywalking/apm/collector/ui/service/TopologyBuilderTest.java
+++ b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/test/java/org/apache/skywalking/apm/collector/ui/service/TopologyBuilderTest.java
@@ -59,7 +59,6 @@ public class TopologyBuilderTest {
         alarmService = mock(AlarmService.class);
         dateBetweenService = mock(DateBetweenService.class);
         Whitebox.setInternalState(topologyBuilder, "applicationCacheService", applicationCacheService);
-        Whitebox.setInternalState(topologyBuilder, "alarmService", alarmService);
         Whitebox.setInternalState(topologyBuilder, "dateBetweenService", dateBetweenService);
         duration = new Duration();
         duration.setEnd("2018-02");
@@ -129,7 +128,7 @@ public class TopologyBuilderTest {
             return alarm;
         });
         when(dateBetweenService.minutesBetween(anyInt(), anyLong(), anyLong())).then(invocation -> 20L);
-        Topology topology = topologyBuilder.build(applicationComponents, applicationMappings, applicationMetrics, callerReferenceMetric, calleeReferenceMetric, duration.getStep(), startTimeBucket, endTimeBucket, startSecondTimeBucket, endSecondTimeBucket);
+        Topology topology = topologyBuilder.build(applicationComponents, applicationMappings, applicationMetrics, callerReferenceMetric, calleeReferenceMetric, startSecondTimeBucket, endSecondTimeBucket);
         Assert.assertNotNull(topology);
     }