You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2018/09/19 07:22:04 UTC

[incubator-skywalking] branch master updated: Add record persistence stream analysis. (#1683)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 13fa303  Add record persistence stream analysis. (#1683)
13fa303 is described below

commit 13fa30307a08b2a3cab29e797eaad19157bcc72c
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Wed Sep 19 15:21:56 2018 +0800

    Add record persistence stream analysis. (#1683)
---
 .../oap/server/core/CoreModuleProvider.java        |   2 +
 .../server/core/analysis/DispatcherManager.java    |   3 +
 .../server/core/analysis/data/MergeDataCache.java  |   6 +-
 .../core/analysis/data/MergeDataCollection.java    |  27 +++---
 .../core/analysis/data/NonMergeDataCache.java      |  12 +--
 .../core/analysis/data/NonMergeDataCollection.java |  26 +++--
 .../data/{Collection.java => SWCollection.java}    |  12 ++-
 .../oap/server/core/analysis/data/Window.java      |  18 ++--
 .../analysis/manual/segment/SegmentDispatcher.java |  45 +++++++++
 .../analysis/manual/segment/SegmentRecord.java     | 103 ++++++++++++++++++++
 .../{data/Collection.java => record/Record.java}   |  26 ++---
 .../analysis/record/annotation/RecordType.java}    |  15 ++-
 .../record/annotation/RecordTypeListener.java}     |  37 +++----
 .../analysis/worker/IndicatorAggregateWorker.java  |   2 +-
 .../analysis/worker/IndicatorPersistentWorker.java |  56 ++---------
 ...ersistentWorker.java => PersistenceWorker.java} |  80 +++-------------
 .../analysis/worker/RecordPersistentWorker.java    |  68 +++++++++++++
 .../server/core/analysis/worker/RecordProcess.java |  62 ++++++++++++
 .../skywalking/oap/server/core/source/Scope.java   |   3 +-
 .../core/source/{Scope.java => Segment.java}       |  27 +++++-
 .../Collection.java => storage/IRecordDAO.java}    |  25 ++---
 .../oap/server/core/storage/PersistenceTimer.java  |   8 +-
 .../oap/server/core/storage/StorageDAO.java        |   3 +
 .../oap/server/library/util/CollectionUtils.java   |   8 ++
 .../trace/provider/TraceModuleProvider.java        |   2 +
 .../trace/provider/parser/SegmentParse.java        |  12 +--
 .../provider/parser/decorator/SegmentCoreInfo.java |  70 +-------------
 .../provider/parser/listener/SpanListener.java     |   2 +-
 .../listener/segment/SegmentSpanListener.java      | 106 +++++++++++++++++++++
 .../server-starter/src/main/resources/log4j2.xml   |   4 +-
 .../elasticsearch/base/ColumnTypeEsMapping.java    |   2 +
 .../plugin/elasticsearch/base/RecordEsDAO.java     |  60 ++++++++++++
 .../plugin/elasticsearch/base/StorageEsDAO.java    |   5 +
 .../elasticsearch/base/StorageEsInstaller.java     |   7 +-
 34 files changed, 631 insertions(+), 313 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index d22648e..91d0b33 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.core;
 
 import java.io.IOException;
 import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorTypeListener;
+import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordTypeListener;
 import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
 import org.apache.skywalking.oap.server.core.cache.*;
 import org.apache.skywalking.oap.server.core.cluster.*;
@@ -115,6 +116,7 @@ public class CoreModuleProvider extends ModuleProvider {
         annotationScan.registerListener(streamAnnotationListener);
         annotationScan.registerListener(new IndicatorTypeListener(getManager()));
         annotationScan.registerListener(new InventoryTypeListener(getManager()));
+        annotationScan.registerListener(new RecordTypeListener(getManager()));
 
         this.remoteClientManager = new RemoteClientManager(getManager());
         this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
index a676500..b216ff8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
@@ -31,6 +31,7 @@ import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancej
 import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancerelation.ServiceInstanceRelationDispatcher;
 import org.apache.skywalking.oap.server.core.analysis.generated.servicerelation.ServiceRelationDispatcher;
 import org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation.EndpointCallRelationDispatcher;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentDispatcher;
 import org.apache.skywalking.oap.server.core.analysis.manual.service.*;
 import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.ServiceCallRelationDispatcher;
 import org.apache.skywalking.oap.server.core.source.*;
@@ -50,6 +51,8 @@ public class DispatcherManager {
 
         this.dispatcherMap.put(Scope.All, new SourceDispatcher[] {new AllDispatcher()});
 
+        this.dispatcherMap.put(Scope.Segment, new SourceDispatcher[] {new SegmentDispatcher()});
+
         this.dispatcherMap.put(Scope.Service, new SourceDispatcher[] {new ServiceDispatcher()});
         this.dispatcherMap.put(Scope.ServiceInstance, new SourceDispatcher[] {new ServiceInstanceDispatcher()});
         this.dispatcherMap.put(Scope.Endpoint, new SourceDispatcher[] {new EndpointDispatcher()});
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCache.java
index b5de443..778de6b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCache.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCache.java
@@ -23,11 +23,11 @@ import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
 /**
  * @author peng-yongsheng
  */
-public class MergeDataCache<INDICATOR extends Indicator> extends Window<MergeDataCollection<INDICATOR>> implements DataCache {
+public class MergeDataCache<INDICATOR extends Indicator> extends Window<INDICATOR> implements DataCache {
 
-    private MergeDataCollection<INDICATOR> lockedMergeDataCollection;
+    private SWCollection<INDICATOR> lockedMergeDataCollection;
 
-    @Override public MergeDataCollection<INDICATOR> collectionInstance() {
+    @Override public SWCollection<INDICATOR> collectionInstance() {
         return new MergeDataCollection<>();
     }
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCollection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCollection.java
index bd53c39..5a703e5 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCollection.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCollection.java
@@ -24,13 +24,14 @@ import org.apache.skywalking.oap.server.core.remote.data.StreamData;
 /**
  * @author peng-yongsheng
  */
-public class MergeDataCollection<STREAM_DATA extends StreamData> implements Collection<Map<STREAM_DATA, STREAM_DATA>> {
-    private Map<STREAM_DATA, STREAM_DATA> data;
+public class MergeDataCollection<STREAM_DATA extends StreamData> implements SWCollection<STREAM_DATA> {
+
+    private Map<STREAM_DATA, STREAM_DATA> collection;
     private volatile boolean writing;
     private volatile boolean reading;
 
     MergeDataCollection() {
-        this.data = new HashMap<>();
+        this.collection = new HashMap<>();
         this.writing = false;
         this.reading = false;
     }
@@ -59,27 +60,27 @@ public class MergeDataCollection<STREAM_DATA extends StreamData> implements Coll
         return reading;
     }
 
-    boolean containsKey(STREAM_DATA key) {
-        return data.containsKey(key);
+    @Override public boolean containsKey(STREAM_DATA key) {
+        return collection.containsKey(key);
     }
 
-    void put(STREAM_DATA value) {
-        data.put(value, value);
+    @Override public void put(STREAM_DATA value) {
+        collection.put(value, value);
     }
 
-    public STREAM_DATA get(STREAM_DATA key) {
-        return data.get(key);
+    @Override public STREAM_DATA get(STREAM_DATA key) {
+        return collection.get(key);
     }
 
     @Override public int size() {
-        return data.size();
+        return collection.size();
     }
 
     @Override public void clear() {
-        data.clear();
+        collection.clear();
     }
 
-    public Map<STREAM_DATA, STREAM_DATA> collection() {
-        return data;
+    @Override public Collection<STREAM_DATA> collection() {
+        return collection.values();
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCache.java
index ab40f7d..238b127 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCache.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCache.java
@@ -18,21 +18,21 @@
 
 package org.apache.skywalking.oap.server.core.analysis.data;
 
-import org.apache.skywalking.oap.server.core.remote.data.StreamData;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
 
 /**
  * @author peng-yongsheng
  */
-public class NonMergeDataCache<STREAM_DATA extends StreamData> extends Window<NonMergeDataCollection<STREAM_DATA>> implements DataCache {
+public class NonMergeDataCache<STORAGE_DATA extends StorageData> extends Window<STORAGE_DATA> implements DataCache {
 
-    private NonMergeDataCollection<STREAM_DATA> lockedMergeDataCollection;
+    private SWCollection<STORAGE_DATA> lockedMergeDataCollection;
 
-    @Override public NonMergeDataCollection<STREAM_DATA> collectionInstance() {
+    @Override public SWCollection<STORAGE_DATA> collectionInstance() {
         return new NonMergeDataCollection<>();
     }
 
-    public void add(STREAM_DATA data) {
-        lockedMergeDataCollection.add(data);
+    public void add(STORAGE_DATA data) {
+        lockedMergeDataCollection.put(data);
     }
 
     @Override public void writing() {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java
index b0498c3..2a4ac3a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java
@@ -19,19 +19,19 @@
 package org.apache.skywalking.oap.server.core.analysis.data;
 
 import java.util.*;
-import org.apache.skywalking.oap.server.core.remote.data.StreamData;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
 
 /**
  * @author peng-yongsheng
  */
-public class NonMergeDataCollection<STREAM_DATA extends StreamData> implements Collection<List<STREAM_DATA>> {
+public class NonMergeDataCollection<STORAGE_DATA extends StorageData> implements SWCollection<STORAGE_DATA> {
 
-    private final List<STREAM_DATA> data;
+    private final List<STORAGE_DATA> data;
     private volatile boolean writing;
     private volatile boolean reading;
 
     NonMergeDataCollection() {
-        this.data = new LinkedList<>();
+        this.data = new ArrayList<>();
         this.writing = false;
         this.reading = false;
     }
@@ -60,10 +60,6 @@ public class NonMergeDataCollection<STREAM_DATA extends StreamData> implements C
         return reading;
     }
 
-    void add(STREAM_DATA value) {
-        data.add(value);
-    }
-
     @Override public int size() {
         return data.size();
     }
@@ -72,7 +68,19 @@ public class NonMergeDataCollection<STREAM_DATA extends StreamData> implements C
         data.clear();
     }
 
-    public List<STREAM_DATA> collection() {
+    @Override public boolean containsKey(STORAGE_DATA key) {
+        throw new UnsupportedOperationException("None merge data collection not support containsKey operation.");
+    }
+
+    @Override public STORAGE_DATA get(STORAGE_DATA key) {
+        throw new UnsupportedOperationException("None merge data collection not support get operation.");
+    }
+
+    @Override public void put(STORAGE_DATA value) {
+        data.add(value);
+    }
+
+    @Override public Collection<STORAGE_DATA> collection() {
         return data;
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Collection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/SWCollection.java
similarity index 85%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Collection.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/SWCollection.java
index f1d52a8..032563d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Collection.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/SWCollection.java
@@ -18,10 +18,12 @@
 
 package org.apache.skywalking.oap.server.core.analysis.data;
 
+import java.util.Collection;
+
 /**
  * @author peng-yongsheng
  */
-public interface Collection<Data> {
+public interface SWCollection<DATA> {
 
     void reading();
 
@@ -39,5 +41,11 @@ public interface Collection<Data> {
 
     void finishWriting();
 
-    Data collection();
+    Collection<DATA> collection();
+
+    boolean containsKey(DATA key);
+
+    DATA get(DATA key);
+
+    void put(DATA value);
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java
index 38570b3..6f9486d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java
@@ -23,22 +23,22 @@ import java.util.concurrent.atomic.AtomicInteger;
 /**
  * @author peng-yongsheng
  */
-public abstract class Window<WINDOW_COLLECTION extends Collection> {
+public abstract class Window<DATA> {
 
     private AtomicInteger windowSwitch = new AtomicInteger(0);
 
-    private WINDOW_COLLECTION pointer;
+    private SWCollection<DATA> pointer;
 
-    private WINDOW_COLLECTION windowDataA;
-    private WINDOW_COLLECTION windowDataB;
+    private SWCollection<DATA> windowDataA;
+    private SWCollection<DATA> windowDataB;
 
-    protected Window() {
+    Window() {
         this.windowDataA = collectionInstance();
         this.windowDataB = collectionInstance();
         this.pointer = windowDataA;
     }
 
-    public abstract WINDOW_COLLECTION collectionInstance();
+    public abstract SWCollection<DATA> collectionInstance();
 
     public boolean trySwitchPointer() {
         return windowSwitch.incrementAndGet() == 1 && !getLast().isReading();
@@ -57,7 +57,7 @@ public abstract class Window<WINDOW_COLLECTION extends Collection> {
         getLast().reading();
     }
 
-    protected WINDOW_COLLECTION getCurrentAndWriting() {
+    SWCollection<DATA> getCurrentAndWriting() {
         if (pointer == windowDataA) {
             windowDataA.writing();
             return windowDataA;
@@ -67,7 +67,7 @@ public abstract class Window<WINDOW_COLLECTION extends Collection> {
         }
     }
 
-    private WINDOW_COLLECTION getCurrent() {
+    private SWCollection<DATA> getCurrent() {
         return pointer;
     }
 
@@ -75,7 +75,7 @@ public abstract class Window<WINDOW_COLLECTION extends Collection> {
         return getCurrent().size();
     }
 
-    public WINDOW_COLLECTION getLast() {
+    public SWCollection<DATA> getLast() {
         if (pointer == windowDataA) {
             return windowDataB;
         } else {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentDispatcher.java
new file mode 100644
index 0000000..9a4c51f
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentDispatcher.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.segment;
+
+import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
+import org.apache.skywalking.oap.server.core.analysis.worker.RecordProcess;
+import org.apache.skywalking.oap.server.core.source.Segment;
+
+/**
+ * @author peng-yongsheng
+ */
+public class SegmentDispatcher implements SourceDispatcher<Segment> {
+
+    @Override public void dispatch(Segment source) {
+        SegmentRecord segment = new SegmentRecord();
+        segment.setSegmentId(source.getSegmentId());
+        segment.setTraceId(source.getTraceId());
+        segment.setServiceId(source.getServiceId());
+        segment.setEndpointName(source.getEndpointName());
+        segment.setStartTime(source.getStartTime());
+        segment.setEndTime(source.getEndTime());
+        segment.setLatency(source.getLatency());
+        segment.setIsError(source.getIsError());
+        segment.setDataBinary(source.getDataBinary());
+        segment.setTimeBucket(source.getTimeBucket());
+
+        RecordProcess.INSTANCE.in(segment);
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java
new file mode 100644
index 0000000..ba5fc88
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.segment;
+
+import java.util.*;
+import lombok.*;
+import org.apache.skywalking.apm.util.StringUtil;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordType;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.annotation.*;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+
+/**
+ * @author peng-yongsheng
+ */
+@RecordType
+@StorageEntity(name = SegmentRecord.INDEX_NAME, builder = SegmentRecord.Builder.class)
+public class SegmentRecord extends Record {
+
+    public static final String INDEX_NAME = "segment";
+    public static final String SEGMENT_ID = "segment_id";
+    public static final String TRACE_ID = "trace_id";
+    public static final String SERVICE_ID = "service_id";
+    public static final String ENDPOINT_NAME = "endpoint_name";
+    public static final String START_TIME = "start_time";
+    public static final String END_TIME = "end_time";
+    public static final String LATENCY = "latency";
+    public static final String IS_ERROR = "is_error";
+    public static final String DATA_BINARY = "data_binary";
+
+    @Setter @Getter @Column(columnName = SEGMENT_ID) @IDColumn private String segmentId;
+    @Setter @Getter @Column(columnName = TRACE_ID) @IDColumn private String traceId;
+    @Setter @Getter @Column(columnName = SERVICE_ID) @IDColumn private int serviceId;
+    @Setter @Getter @Column(columnName = ENDPOINT_NAME) @IDColumn private String endpointName;
+    @Setter @Getter @Column(columnName = START_TIME) @IDColumn private long startTime;
+    @Setter @Getter @Column(columnName = END_TIME) @IDColumn private long endTime;
+    @Setter @Getter @Column(columnName = LATENCY) @IDColumn private int latency;
+    @Setter @Getter @Column(columnName = IS_ERROR) @IDColumn private int isError;
+    @Setter @Getter @Column(columnName = DATA_BINARY) @IDColumn private byte[] dataBinary;
+
+    @Override public String id() {
+        return segmentId;
+    }
+
+    public static class Builder implements StorageBuilder<SegmentRecord> {
+
+        @Override public Map<String, Object> data2Map(SegmentRecord storageData) {
+            Map<String, Object> map = new HashMap<>();
+            map.put(SEGMENT_ID, storageData.getSegmentId());
+            map.put(TRACE_ID, storageData.getTraceId());
+            map.put(SERVICE_ID, storageData.getServiceId());
+            map.put(ENDPOINT_NAME, storageData.getEndpointName());
+            map.put(START_TIME, storageData.getStartTime());
+            map.put(END_TIME, storageData.getEndTime());
+            map.put(LATENCY, storageData.getLatency());
+            map.put(IS_ERROR, storageData.getIsError());
+            map.put(TIME_BUCKET, storageData.getTimeBucket());
+            if (CollectionUtils.isEmpty(storageData.getDataBinary())) {
+                map.put(DATA_BINARY, Const.EMPTY_STRING);
+            } else {
+                map.put(DATA_BINARY, new String(Base64.getEncoder().encode(storageData.getDataBinary())));
+            }
+            return map;
+        }
+
+        @Override public SegmentRecord map2Data(Map<String, Object> dbMap) {
+            SegmentRecord record = new SegmentRecord();
+            record.setSegmentId((String)dbMap.get(SEGMENT_ID));
+            record.setTraceId((String)dbMap.get(TRACE_ID));
+            record.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue());
+            record.setEndpointName((String)dbMap.get(ENDPOINT_NAME));
+            record.setStartTime(((Number)dbMap.get(START_TIME)).longValue());
+            record.setEndTime(((Number)dbMap.get(END_TIME)).longValue());
+            record.setLatency(((Number)dbMap.get(LATENCY)).intValue());
+            record.setIsError(((Number)dbMap.get(IS_ERROR)).intValue());
+            record.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
+            if (StringUtil.isEmpty((String)dbMap.get(DATA_BINARY))) {
+                record.setDataBinary(new byte[] {});
+            } else {
+                record.setDataBinary(Base64.getDecoder().decode((String)dbMap.get(DATA_BINARY)));
+            }
+            return record;
+        }
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Collection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/Record.java
similarity index 67%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Collection.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/Record.java
index f1d52a8..0973a63 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Collection.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/Record.java
@@ -16,28 +16,18 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.analysis.data;
+package org.apache.skywalking.oap.server.core.analysis.record;
+
+import lombok.*;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
 
 /**
  * @author peng-yongsheng
  */
-public interface Collection<Data> {
-
-    void reading();
-
-    boolean isReading();
-
-    void writing();
-
-    boolean isWriting();
-
-    void clear();
-
-    int size();
-
-    void finishReading();
+public abstract class Record implements StorageData {
 
-    void finishWriting();
+    public static final String TIME_BUCKET = "time_bucket";
 
-    Data collection();
+    @Getter @Setter @Column(columnName = TIME_BUCKET) private long timeBucket;
 }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListener.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/annotation/RecordType.java
similarity index 77%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListener.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/annotation/RecordType.java
index ce70081..880477c 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListener.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/annotation/RecordType.java
@@ -16,17 +16,14 @@
  *
  */
 
-package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
+package org.apache.skywalking.oap.server.core.analysis.record.annotation;
+
+import java.lang.annotation.*;
 
 /**
  * @author peng-yongsheng
  */
-public interface SpanListener {
-    void build();
-
-    boolean containsPoint(Point point);
-
-    enum Point {
-        Entry, Exit, Local, First, GlobalTraceIds
-    }
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface RecordType {
 }
diff --git a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/CollectionUtils.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/annotation/RecordTypeListener.java
similarity index 53%
copy from oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/CollectionUtils.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/annotation/RecordTypeListener.java
index 8faaffa..6eb0d07 100644
--- a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/CollectionUtils.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/annotation/RecordTypeListener.java
@@ -16,40 +16,29 @@
  *
  */
 
-package org.apache.skywalking.oap.server.library.util;
+package org.apache.skywalking.oap.server.core.analysis.record.annotation;
 
-import java.util.*;
+import java.lang.annotation.Annotation;
+import org.apache.skywalking.oap.server.core.analysis.worker.RecordProcess;
+import org.apache.skywalking.oap.server.core.annotation.AnnotationListener;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
 
 /**
  * @author peng-yongsheng
  */
-public class CollectionUtils {
+public class RecordTypeListener implements AnnotationListener {
 
-    public static boolean isEmpty(Map map) {
-        return map == null || map.size() == 0;
-    }
-
-    public static boolean isEmpty(List list) {
-        return list == null || list.size() == 0;
-    }
-
-    public static boolean isEmpty(Set set) {
-        return set == null || set.size() == 0;
-    }
-
-    public static boolean isNotEmpty(List list) {
-        return !isEmpty(list);
-    }
+    private final ModuleManager moduleManager;
 
-    public static boolean isNotEmpty(Set set) {
-        return !isEmpty(set);
+    public RecordTypeListener(ModuleManager moduleManager) {
+        this.moduleManager = moduleManager;
     }
 
-    public static boolean isNotEmpty(Map map) {
-        return !isEmpty(map);
+    @Override public Class<? extends Annotation> annotation() {
+        return RecordType.class;
     }
 
-    public static <T> boolean isNotEmpty(T[] array) {
-        return array != null && array.length > 0;
+    @Override public void notify(Class aClass) {
+        RecordProcess.INSTANCE.create(moduleManager, aClass);
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
index 8d0bf88..15ff40b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
@@ -71,7 +71,7 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
             }
         }
 
-        mergeDataCache.getLast().collection().forEach((Indicator key, Indicator data) -> {
+        mergeDataCache.getLast().collection().forEach(data -> {
             if (logger.isDebugEnabled()) {
                 logger.debug(data.toString());
             }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
index bca9752..6a614a0 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
@@ -19,9 +19,9 @@
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
 import java.util.*;
-import org.apache.skywalking.oap.server.core.analysis.data.*;
+import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
-import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.IIndicatorDAO;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.slf4j.*;
@@ -31,48 +31,28 @@ import static java.util.Objects.nonNull;
 /**
  * @author peng-yongsheng
  */
-public class IndicatorPersistentWorker extends AbstractWorker<Indicator> {
+public class IndicatorPersistentWorker extends PersistenceWorker<Indicator, MergeDataCache<Indicator>> {
 
     private static final Logger logger = LoggerFactory.getLogger(IndicatorPersistentWorker.class);
 
     private final String modelName;
     private final MergeDataCache<Indicator> mergeDataCache;
-    private final IBatchDAO batchDAO;
     private final IIndicatorDAO indicatorDAO;
-    private final int blockBatchPersistenceSize;
     private final AbstractWorker<Indicator> nextWorker;
 
     IndicatorPersistentWorker(int workerId, String modelName, int batchSize, ModuleManager moduleManager,
         IIndicatorDAO indicatorDAO, AbstractWorker<Indicator> nextWorker) {
-        super(workerId);
+        super(moduleManager, workerId, batchSize);
         this.modelName = modelName;
-        this.blockBatchPersistenceSize = batchSize;
         this.mergeDataCache = new MergeDataCache<>();
-        this.batchDAO = moduleManager.find(StorageModule.NAME).getService(IBatchDAO.class);
         this.indicatorDAO = indicatorDAO;
         this.nextWorker = nextWorker;
     }
 
-    public final Window<MergeDataCollection<Indicator>> getCache() {
+    @Override public MergeDataCache<Indicator> getCache() {
         return mergeDataCache;
     }
 
-    @Override public final void in(Indicator input) {
-        if (getCache().currentCollectionSize() >= blockBatchPersistenceSize) {
-            try {
-                if (getCache().trySwitchPointer()) {
-                    getCache().switchPointer();
-
-                    List<?> collection = buildBatchCollection();
-                    batchDAO.batchPersistence(collection);
-                }
-            } finally {
-                getCache().trySwitchPointerFinally();
-            }
-        }
-        cacheData(input);
-    }
-
     public boolean flushAndSwitch() {
         boolean isSwitch;
         try {
@@ -85,29 +65,9 @@ public class IndicatorPersistentWorker extends AbstractWorker<Indicator> {
         return isSwitch;
     }
 
-    public final List<?> buildBatchCollection() {
-        List<?> batchCollection = new LinkedList<>();
-        try {
-            while (getCache().getLast().isWriting()) {
-                try {
-                    Thread.sleep(10);
-                } catch (InterruptedException e) {
-                    logger.warn("thread wake up");
-                }
-            }
-
-            if (getCache().getLast().collection() != null) {
-                batchCollection = prepareBatch(getCache().getLast());
-            }
-        } finally {
-            getCache().finishReadingLast();
-        }
-        return batchCollection;
-    }
-
-    private List<Object> prepareBatch(MergeDataCollection<Indicator> collection) {
+    @Override public List<Object> prepareBatch(MergeDataCache<Indicator> cache) {
         List<Object> batchCollection = new LinkedList<>();
-        collection.collection().forEach((id, data) -> {
+        cache.getLast().collection().forEach(data -> {
             Indicator dbData = null;
             try {
                 dbData = indicatorDAO.get(modelName, data);
@@ -131,7 +91,7 @@ public class IndicatorPersistentWorker extends AbstractWorker<Indicator> {
         return batchCollection;
     }
 
-    private void cacheData(Indicator input) {
+    @Override public void cacheData(Indicator input) {
         mergeDataCache.writing();
         if (mergeDataCache.containsKey(input)) {
             Indicator indicator = mergeDataCache.get(input);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
similarity index 50%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
index bca9752..d731c73 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
@@ -19,46 +19,30 @@
 package org.apache.skywalking.oap.server.core.analysis.worker;
 
 import java.util.*;
-import org.apache.skywalking.oap.server.core.analysis.data.*;
-import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.analysis.data.Window;
 import org.apache.skywalking.oap.server.core.storage.*;
 import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.slf4j.*;
 
-import static java.util.Objects.nonNull;
-
 /**
  * @author peng-yongsheng
  */
-public class IndicatorPersistentWorker extends AbstractWorker<Indicator> {
+public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends Window<INPUT>> extends AbstractWorker<INPUT> {
 
-    private static final Logger logger = LoggerFactory.getLogger(IndicatorPersistentWorker.class);
+    private static final Logger logger = LoggerFactory.getLogger(PersistenceWorker.class);
 
-    private final String modelName;
-    private final MergeDataCache<Indicator> mergeDataCache;
+    private final int batchSize;
     private final IBatchDAO batchDAO;
-    private final IIndicatorDAO indicatorDAO;
-    private final int blockBatchPersistenceSize;
-    private final AbstractWorker<Indicator> nextWorker;
 
-    IndicatorPersistentWorker(int workerId, String modelName, int batchSize, ModuleManager moduleManager,
-        IIndicatorDAO indicatorDAO, AbstractWorker<Indicator> nextWorker) {
+    PersistenceWorker(ModuleManager moduleManager, int workerId, int batchSize) {
         super(workerId);
-        this.modelName = modelName;
-        this.blockBatchPersistenceSize = batchSize;
-        this.mergeDataCache = new MergeDataCache<>();
+        this.batchSize = batchSize;
         this.batchDAO = moduleManager.find(StorageModule.NAME).getService(IBatchDAO.class);
-        this.indicatorDAO = indicatorDAO;
-        this.nextWorker = nextWorker;
-    }
-
-    public final Window<MergeDataCollection<Indicator>> getCache() {
-        return mergeDataCache;
     }
 
-    @Override public final void in(Indicator input) {
-        if (getCache().currentCollectionSize() >= blockBatchPersistenceSize) {
+    @Override public final void in(INPUT input) {
+        if (getCache().currentCollectionSize() >= batchSize) {
             try {
                 if (getCache().trySwitchPointer()) {
                     getCache().switchPointer();
@@ -73,6 +57,10 @@ public class IndicatorPersistentWorker extends AbstractWorker<Indicator> {
         cacheData(input);
     }
 
+    public abstract void cacheData(INPUT input);
+
+    public abstract CACHE getCache();
+
     public boolean flushAndSwitch() {
         boolean isSwitch;
         try {
@@ -85,6 +73,8 @@ public class IndicatorPersistentWorker extends AbstractWorker<Indicator> {
         return isSwitch;
     }
 
+    public abstract List<Object> prepareBatch(CACHE cache);
+
     public final List<?> buildBatchCollection() {
         List<?> batchCollection = new LinkedList<>();
         try {
@@ -97,51 +87,11 @@ public class IndicatorPersistentWorker extends AbstractWorker<Indicator> {
             }
 
             if (getCache().getLast().collection() != null) {
-                batchCollection = prepareBatch(getCache().getLast());
+                batchCollection = prepareBatch(getCache());
             }
         } finally {
             getCache().finishReadingLast();
         }
         return batchCollection;
     }
-
-    private List<Object> prepareBatch(MergeDataCollection<Indicator> collection) {
-        List<Object> batchCollection = new LinkedList<>();
-        collection.collection().forEach((id, data) -> {
-            Indicator dbData = null;
-            try {
-                dbData = indicatorDAO.get(modelName, data);
-            } catch (Throwable t) {
-                logger.error(t.getMessage(), t);
-            }
-            try {
-                if (nonNull(dbData)) {
-                    data.combine(dbData);
-                    batchCollection.add(indicatorDAO.prepareBatchUpdate(modelName, data));
-                } else {
-                    batchCollection.add(indicatorDAO.prepareBatchInsert(modelName, data));
-                }
-
-                nextWorker.in(data);
-            } catch (Throwable t) {
-                logger.error(t.getMessage(), t);
-            }
-        });
-
-        return batchCollection;
-    }
-
-    private void cacheData(Indicator input) {
-        mergeDataCache.writing();
-        if (mergeDataCache.containsKey(input)) {
-            Indicator indicator = mergeDataCache.get(input);
-            indicator.combine(input);
-            indicator.calculate();
-        } else {
-            input.calculate();
-            mergeDataCache.put(input);
-        }
-
-        mergeDataCache.finishWriting();
-    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
new file mode 100644
index 0000000..ea8a47d
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker;
+
+import java.util.*;
+import org.apache.skywalking.oap.server.core.analysis.data.NonMergeDataCache;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RecordPersistentWorker extends PersistenceWorker<Record, NonMergeDataCache<Record>> {
+
+    private static final Logger logger = LoggerFactory.getLogger(RecordPersistentWorker.class);
+
+    private final String modelName;
+    private final NonMergeDataCache<Record> nonMergeDataCache;
+    private final IRecordDAO recordDAO;
+
+    RecordPersistentWorker(int workerId, String modelName, int batchSize, ModuleManager moduleManager,
+        IRecordDAO recordDAO) {
+        super(moduleManager, workerId, batchSize);
+        this.modelName = modelName;
+        this.nonMergeDataCache = new NonMergeDataCache<>();
+        this.recordDAO = recordDAO;
+    }
+
+    @Override public NonMergeDataCache<Record> getCache() {
+        return nonMergeDataCache;
+    }
+
+    @Override public List<Object> prepareBatch(NonMergeDataCache<Record> cache) {
+        List<Object> batchCollection = new LinkedList<>();
+        cache.getLast().collection().forEach(record -> {
+            try {
+                batchCollection.add(recordDAO.prepareBatchInsert(modelName, record));
+            } catch (Throwable t) {
+                logger.error(t.getMessage(), t);
+            }
+        });
+        return batchCollection;
+    }
+
+    @Override public void cacheData(Record input) {
+        nonMergeDataCache.writing();
+        nonMergeDataCache.add(input);
+        nonMergeDataCache.finishWriting();
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordProcess.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordProcess.java
new file mode 100644
index 0000000..838136d
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordProcess.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker;
+
+import java.util.*;
+import lombok.Getter;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
+import org.apache.skywalking.oap.server.core.worker.*;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+/**
+ * @author peng-yongsheng
+ */
+public enum RecordProcess {
+    INSTANCE;
+
+    private Map<Class<? extends Record>, RecordPersistentWorker> workers = new HashMap<>();
+
+    public void in(Record record) {
+        workers.get(record.getClass()).in(record);
+    }
+
+    @Getter private List<RecordPersistentWorker> persistentWorkers = new ArrayList<>();
+
+    public void create(ModuleManager moduleManager, Class<? extends Record> recordClass) {
+        String modelName = StorageEntityAnnotationUtils.getModelName(recordClass);
+        Class<? extends StorageBuilder> builderClass = StorageEntityAnnotationUtils.getBuilder(recordClass);
+
+        StorageDAO storageDAO = moduleManager.find(StorageModule.NAME).getService(StorageDAO.class);
+        IRecordDAO recordDAO;
+        try {
+            recordDAO = storageDAO.newRecordDao(builderClass.newInstance());
+        } catch (InstantiationException | IllegalAccessException e) {
+            throw new UnexpectedException("");
+        }
+
+        RecordPersistentWorker persistentWorker = new RecordPersistentWorker(WorkerIdGenerator.INSTANCES.generate(), modelName,
+            1000, moduleManager, recordDAO);
+        WorkerInstances.INSTANCES.put(persistentWorker.getWorkerId(), persistentWorker);
+        persistentWorkers.add(persistentWorker);
+        workers.put(recordClass, persistentWorker);
+    }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java
index 396ecab..e77d036 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java
@@ -24,5 +24,6 @@ package org.apache.skywalking.oap.server.core.source;
 public enum Scope {
     All, Service, ServiceInstance, Endpoint, ServiceRelation, ServiceInstanceRelation, EndpointRelation, NetworkAddress,
     ServiceInstanceJVMCPU, ServiceInstanceJVMMemory, ServiceInstanceJVMMemoryPool, ServiceInstanceJVMGC,
-    ServiceComponent, ServiceMapping
+    ServiceComponent, ServiceMapping,
+    Segment
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Segment.java
similarity index 56%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Segment.java
index 396ecab..b669171 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Segment.java
@@ -18,11 +18,30 @@
 
 package org.apache.skywalking.oap.server.core.source;
 
+import lombok.*;
+import org.apache.skywalking.oap.server.core.source.annotation.SourceType;
+
 /**
  * @author peng-yongsheng
  */
-public enum Scope {
-    All, Service, ServiceInstance, Endpoint, ServiceRelation, ServiceInstanceRelation, EndpointRelation, NetworkAddress,
-    ServiceInstanceJVMCPU, ServiceInstanceJVMMemory, ServiceInstanceJVMMemoryPool, ServiceInstanceJVMGC,
-    ServiceComponent, ServiceMapping
+@SourceType
+public class Segment extends Source {
+
+    @Override public Scope scope() {
+        return Scope.Segment;
+    }
+
+    @Override public String getEntityId() {
+        return segmentId;
+    }
+
+    @Setter @Getter private String segmentId;
+    @Setter @Getter private String traceId;
+    @Setter @Getter private int serviceId;
+    @Setter @Getter private String endpointName;
+    @Setter @Getter private long startTime;
+    @Setter @Getter private long endTime;
+    @Setter @Getter private int latency;
+    @Setter @Getter private int isError;
+    @Setter @Getter private byte[] dataBinary;
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Collection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRecordDAO.java
similarity index 70%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Collection.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRecordDAO.java
index f1d52a8..d5e156f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Collection.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRecordDAO.java
@@ -16,28 +16,17 @@
  *
  */
 
-package org.apache.skywalking.oap.server.core.analysis.data;
+package org.apache.skywalking.oap.server.core.storage;
+
+import java.io.IOException;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
 
 /**
  * @author peng-yongsheng
  */
-public interface Collection<Data> {
-
-    void reading();
-
-    boolean isReading();
-
-    void writing();
-
-    boolean isWriting();
-
-    void clear();
-
-    int size();
-
-    void finishReading();
+public interface IRecordDAO<INSERT> extends DAO {
 
-    void finishWriting();
+    INSERT prepareBatchInsert(String modelName, Record record) throws IOException;
 
-    Data collection();
+    void deleteHistory(String modelName, Long timeBucketBefore);
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
index 7271f04..4d8ebd7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
@@ -21,7 +21,7 @@ package org.apache.skywalking.oap.server.core.storage;
 import java.util.*;
 import java.util.concurrent.*;
 import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
-import org.apache.skywalking.oap.server.core.analysis.worker.IndicatorProcess;
+import org.apache.skywalking.oap.server.core.analysis.worker.*;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.slf4j.*;
 
@@ -65,7 +65,11 @@ public enum PersistenceTimer {
         long startTime = System.currentTimeMillis();
         try {
             List batchAllCollection = new LinkedList();
-            IndicatorProcess.INSTANCE.getPersistentWorkers().forEach(worker -> {
+            List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
+            persistenceWorkers.addAll(IndicatorProcess.INSTANCE.getPersistentWorkers());
+            persistenceWorkers.addAll(RecordProcess.INSTANCE.getPersistentWorkers());
+
+            persistenceWorkers.forEach(worker -> {
                 if (logger.isDebugEnabled()) {
                     logger.debug("extract {} worker data and save", worker.getClass().getName());
                 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java
index 187cd78..ddb348e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.core.storage;
 
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.register.RegisterSource;
 import org.apache.skywalking.oap.server.library.module.Service;
 
@@ -30,4 +31,6 @@ public interface StorageDAO extends Service {
     IIndicatorDAO newIndicatorDao(StorageBuilder<Indicator> storageBuilder);
 
     IRegisterDAO newRegisterDao(StorageBuilder<RegisterSource> storageBuilder);
+
+    IRecordDAO newRecordDao(StorageBuilder<Record> storageBuilder);
 }
diff --git a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/CollectionUtils.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/CollectionUtils.java
index 8faaffa..c3b657c 100644
--- a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/CollectionUtils.java
+++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/CollectionUtils.java
@@ -52,4 +52,12 @@ public class CollectionUtils {
     public static <T> boolean isNotEmpty(T[] array) {
         return array != null && array.length > 0;
     }
+
+    public static boolean isEmpty(byte[] array) {
+        return array == null || array.length == 0;
+    }
+
+    public static boolean isNotEmpty(byte[] array) {
+        return !isEmpty(array);
+    }
 }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
index 266e85d..83065ea 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
@@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
 import org.apache.skywalking.oap.server.receiver.trace.provider.handler.TraceSegmentServiceHandler;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.*;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.endpoint.MultiScopesSpanListener;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.segment.SegmentSpanListener;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.service.*;
 import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.SegmentStandardizationWorker;
 
@@ -60,6 +61,7 @@ public class TraceModuleProvider extends ModuleProvider {
         listenerManager.add(new MultiScopesSpanListener.Factory());
         listenerManager.add(new ServiceComponentSpanListener.Factory());
         listenerManager.add(new ServiceMappingSpanListener.Factory());
+        listenerManager.add(new SegmentSpanListener.Factory());
 
         GRPCHandlerRegister grpcHandlerRegister = getManager().find(CoreModule.NAME).getService(GRPCHandlerRegister.class);
         try {
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
index 78c6a78..d07c4a5 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
@@ -78,9 +78,7 @@ public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment>
                 if (logger.isDebugEnabled()) {
                     logger.debug("This segment id exchange success, id: {}", segmentCoreInfo.getSegmentId());
                 }
-
                 notifyListenerToBuild();
-                buildSegment(segmentCoreInfo.getSegmentId(), segmentDecorator.toByteArray());
                 return true;
             }
         } catch (Throwable e) {
@@ -111,6 +109,7 @@ public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment>
         segmentCoreInfo.setSegmentId(segmentIdBuilder.toString());
         segmentCoreInfo.setApplicationId(segmentDecorator.getApplicationId());
         segmentCoreInfo.setApplicationInstanceId(segmentDecorator.getApplicationInstanceId());
+        segmentCoreInfo.setDataBinary(segmentDecorator.toByteArray());
 
         for (int i = 0; i < segmentDecorator.getSpansCount(); i++) {
             SpanDecorator spanDecorator = segmentDecorator.getSpans(i);
@@ -159,13 +158,6 @@ public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment>
         return true;
     }
 
-    private void buildSegment(String id, byte[] dataBinary) {
-//        Segment segment = new Segment();
-//        segment.setId(id);
-//        segment.setDataBinary(dataBinary);
-//        segment.setTimeBucket(segmentCoreInfo.getMinuteTimeBucket());
-    }
-
     private void writeToBufferFile(String id, UpstreamSegment upstreamSegment) {
         if (logger.isDebugEnabled()) {
             logger.debug("push to segment buffer write worker, id: {}", id);
@@ -215,7 +207,7 @@ public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment>
 
     private void notifyGlobalsListener(UniqueId uniqueId) {
         spanListeners.forEach(listener -> {
-            if (listener.containsPoint(SpanListener.Point.GlobalTraceIds)) {
+            if (listener.containsPoint(SpanListener.Point.TraceIds)) {
                 ((GlobalTraceIdsListener)listener).parseGlobalTraceId(uniqueId, segmentCoreInfo);
             }
         });
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/SegmentCoreInfo.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/SegmentCoreInfo.java
index 78817ea..a0c0b7f 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/SegmentCoreInfo.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/SegmentCoreInfo.java
@@ -18,80 +18,20 @@
 
 package org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator;
 
+import lombok.*;
+
 /**
  * @author peng-yongsheng
  */
+@Getter
+@Setter
 public class SegmentCoreInfo {
     private String segmentId;
-    private String traceId;
     private int applicationId;
     private int applicationInstanceId;
     private long startTime;
     private long endTime;
     private boolean isError;
     private long minuteTimeBucket;
-
-    public String getSegmentId() {
-        return segmentId;
-    }
-
-    public void setSegmentId(String segmentId) {
-        this.segmentId = segmentId;
-    }
-
-    public int getApplicationId() {
-        return applicationId;
-    }
-
-    public void setApplicationId(int applicationId) {
-        this.applicationId = applicationId;
-    }
-
-    public int getApplicationInstanceId() {
-        return applicationInstanceId;
-    }
-
-    public void setApplicationInstanceId(int applicationInstanceId) {
-        this.applicationInstanceId = applicationInstanceId;
-    }
-
-    public long getStartTime() {
-        return startTime;
-    }
-
-    public void setStartTime(long startTime) {
-        this.startTime = startTime;
-    }
-
-    public long getEndTime() {
-        return endTime;
-    }
-
-    public void setEndTime(long endTime) {
-        this.endTime = endTime;
-    }
-
-    public boolean isError() {
-        return isError;
-    }
-
-    public void setError(boolean error) {
-        isError = error;
-    }
-
-    public long getMinuteTimeBucket() {
-        return minuteTimeBucket;
-    }
-
-    public void setMinuteTimeBucket(long minuteTimeBucket) {
-        this.minuteTimeBucket = minuteTimeBucket;
-    }
-
-    public String getTraceId() {
-        return traceId;
-    }
-
-    public void setTraceId(String traceId) {
-        this.traceId = traceId;
-    }
+    private byte[] dataBinary;
 }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListener.java
index ce70081..08e3b80 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListener.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListener.java
@@ -27,6 +27,6 @@ public interface SpanListener {
     boolean containsPoint(Point point);
 
     enum Point {
-        Entry, Exit, Local, First, GlobalTraceIds
+        Entry, Exit, Local, First, TraceIds
     }
 }
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
new file mode 100644
index 0000000..d3dceb8
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.segment;
+
+import org.apache.skywalking.apm.network.language.agent.UniqueId;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
+import org.apache.skywalking.oap.server.core.source.*;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.util.*;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class SegmentSpanListener implements FirstSpanListener, EntrySpanListener, GlobalTraceIdsListener {
+
+    private static final Logger logger = LoggerFactory.getLogger(SegmentSpanListener.class);
+
+    private final SourceReceiver sourceReceiver;
+    private final Segment segment = new Segment();
+    private final EndpointInventoryCache serviceNameCacheService;
+    private int entryEndpointId = 0;
+    private int firstEndpointId = 0;
+
+    private SegmentSpanListener(ModuleManager moduleManager) {
+        this.sourceReceiver = moduleManager.find(CoreModule.NAME).getService(SourceReceiver.class);
+        this.serviceNameCacheService = moduleManager.find(CoreModule.NAME).getService(EndpointInventoryCache.class);
+    }
+
+    @Override public boolean containsPoint(Point point) {
+        return Point.First.equals(point) || Point.Entry.equals(point) || Point.TraceIds.equals(point);
+    }
+
+    @Override
+    public void parseFirst(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
+        long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(segmentCoreInfo.getStartTime());
+
+        segment.setSegmentId(segmentCoreInfo.getSegmentId());
+        segment.setSegmentId(segmentCoreInfo.getSegmentId());
+        segment.setServiceId(segmentCoreInfo.getApplicationId());
+        segment.setLatency((int)(segmentCoreInfo.getEndTime() - segmentCoreInfo.getStartTime()));
+        segment.setStartTime(segmentCoreInfo.getStartTime());
+        segment.setEndTime(segmentCoreInfo.getEndTime());
+        segment.setIsError(BooleanUtils.booleanToValue(segmentCoreInfo.isError()));
+        segment.setTimeBucket(timeBucket);
+        segment.setDataBinary(segmentCoreInfo.getDataBinary());
+
+        firstEndpointId = spanDecorator.getOperationNameId();
+    }
+
+    @Override public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
+        entryEndpointId = spanDecorator.getOperationNameId();
+    }
+
+    @Override public void parseGlobalTraceId(UniqueId uniqueId, SegmentCoreInfo segmentCoreInfo) {
+        StringBuilder traceIdBuilder = new StringBuilder();
+        for (int i = 0; i < uniqueId.getIdPartsList().size(); i++) {
+            if (i == 0) {
+                traceIdBuilder.append(uniqueId.getIdPartsList().get(i));
+            } else {
+                traceIdBuilder.append(".").append(uniqueId.getIdPartsList().get(i));
+            }
+        }
+        segment.setTraceId(traceIdBuilder.toString());
+    }
+
+    @Override public void build() {
+        if (logger.isDebugEnabled()) {
+            logger.debug("segment duration listener build");
+        }
+
+        if (entryEndpointId == 0) {
+            segment.setEndpointName(serviceNameCacheService.get(firstEndpointId).getName());
+        } else {
+            segment.setEndpointName(serviceNameCacheService.get(entryEndpointId).getName());
+        }
+
+        sourceReceiver.receive(segment);
+    }
+
+    public static class Factory implements SpanListenerFactory {
+
+        @Override public SpanListener create(ModuleManager moduleManager) {
+            return new SegmentSpanListener(moduleManager);
+        }
+    }
+}
diff --git a/oap-server/server-starter/src/main/resources/log4j2.xml b/oap-server/server-starter/src/main/resources/log4j2.xml
index 2da2d13..a6ce5a2 100644
--- a/oap-server/server-starter/src/main/resources/log4j2.xml
+++ b/oap-server/server-starter/src/main/resources/log4j2.xml
@@ -32,8 +32,8 @@
         <logger name="io.netty" level="INFO"/>
         <logger name="org.apache.http" level="INFO"/>
         <logger name="org.apache.skywalking.oap.server.receiver.istio.telemetry" level="DEBUG"/>
-        <logger name="org.apache.skywalking.oap.server.core.remote" level="DEBUG"/>
-        <Root level="DEBUG">
+        <logger name="org.apache.skywalking.oap.server.core.remote" level="INFO"/>
+        <Root level="INFO">
             <AppenderRef ref="Console"/>
         </Root>
     </Loggers>
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
index 37c3db8..9df2148 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
@@ -37,6 +37,8 @@ public class ColumnTypeEsMapping implements DataTypeMapping {
             return "keyword";
         } else if (IntKeyLongValueArray.class.equals(type)) {
             return "keyword";
+        } else if (byte[].class.equals(type)) {
+            return "binary";
         } else {
             throw new IllegalArgumentException("Unsupported data type: " + type.getName());
         }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java
new file mode 100644
index 0000000..bec4cab
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.type.StorageDataType;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.common.xcontent.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RecordEsDAO extends EsDAO implements IRecordDAO<IndexRequest> {
+
+    private final StorageBuilder<Record> storageBuilder;
+
+    public RecordEsDAO(ElasticSearchClient client, StorageBuilder<Record> storageBuilder) {
+        super(client);
+        this.storageBuilder = storageBuilder;
+    }
+
+    @Override public IndexRequest prepareBatchInsert(String modelName, Record record) throws IOException {
+        Map<String, Object> objectMap = storageBuilder.data2Map(record);
+
+        XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
+        for (String key : objectMap.keySet()) {
+            Object value = objectMap.get(key);
+            if (value instanceof StorageDataType) {
+                builder.field(key, ((StorageDataType)value).toStorageData());
+            } else {
+                builder.field(key, value);
+            }
+        }
+        builder.endObject();
+        return getClient().prepareInsert(modelName, record.id(), builder);
+    }
+
+    @Override public void deleteHistory(String modelName, Long timeBucketBefore) {
+    }
+}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsDAO.java
index 44c9df0..3774d65 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsDAO.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
 
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
 import org.apache.skywalking.oap.server.core.register.RegisterSource;
 import org.apache.skywalking.oap.server.core.storage.*;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
@@ -39,4 +40,8 @@ public class StorageEsDAO extends EsDAO implements StorageDAO {
     @Override public IRegisterDAO newRegisterDao(StorageBuilder<RegisterSource> storageBuilder) {
         return new RegisterEsDAO(getClient(), storageBuilder);
     }
+
+    @Override public IRecordDAO newRecordDao(StorageBuilder<Record> storageBuilder) {
+        return new RecordEsDAO(getClient(), storageBuilder);
+    }
 }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
index 95fb094..19e616a 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
@@ -19,11 +19,12 @@
 package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
 
 import java.io.IOException;
-import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.StorageException;
 import org.apache.skywalking.oap.server.core.storage.model.*;
 import org.apache.skywalking.oap.server.library.client.Client;
 import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.*;
 import org.slf4j.*;
@@ -80,9 +81,9 @@ public class StorageEsInstaller extends ModelInstaller {
         Settings settings = createSettingBuilder();
         try {
             mappingBuilder = createMappingBuilder(tableDefine);
-            logger.info("mapping builder str: {}", mappingBuilder.prettyPrint());
+            logger.info("index {}'s mapping builder str: {}", tableDefine.getName(), Strings.toString(mappingBuilder.prettyPrint()));
         } catch (Exception e) {
-            logger.error("create {} index mapping builder error", tableDefine.getName());
+            logger.error("create {} index mapping builder error, error message: {}", tableDefine.getName(), e.getMessage());
         }
 
         boolean isAcknowledged;