You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by wu...@apache.org on 2018/06/21 02:52:42 UTC

[incubator-skywalking] branch master updated: Accept multi entry spans in collector analysis (#1372)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new aa9a896  Accept multi entry spans in collector analysis (#1372)
aa9a896 is described below

commit aa9a89685a39e1006bb71be16a4e991eee21e0f1
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Thu Jun 21 10:52:39 2018 +0800

    Accept multi entry spans in collector analysis (#1372)
    
    * #1206
    
    1. Support list data type attribute in streaming data.
    2. Support multiple service names in segment duration index.
    
    * Fixed the conflict.
    
    * #1206
    
    Add TODO description to notify linjiaqi.
    
    * #1206
    
    Remove @NotNull
---
 .../grpc/provider/handler/mock/ConsumerMock.java   | 109 ++++++++++---
 .../segment/SegmentDurationSpanListener.java       |  13 +-
 .../parser/define/decorator/SegmentCoreInfo.java   |   9 ++
 .../parser/provider/parser/SegmentParse.java       |  45 ++----
 .../apm/collector/core/data/AbstractData.java      | 177 +++++++++++++++++----
 .../skywalking/apm/collector/core/data/Data.java   |  16 ++
 .../{MergeOperation.java => DoubleLinkedList.java} |  13 +-
 ...{MergeOperation.java => IntegerLinkedList.java} |  13 +-
 .../{MergeOperation.java => LongLinkedList.java}   |  13 +-
 .../apm/collector/core/data/MergeOperation.java    |  12 +-
 .../apm/collector/core/data/StreamData.java        |  23 ++-
 .../{MergeOperation.java => StringLinkedList.java} |  13 +-
 .../ByteColumn.java}                               |  19 +--
 .../collector/core/data/{ => column}/Column.java   |  17 +-
 .../DoubleColumn.java}                             |  19 +--
 .../DoubleListColumn.java}                         |  19 +--
 .../IntegerColumn.java}                            |  19 +--
 .../IntegerListColumn.java}                        |  19 +--
 .../LongColumn.java}                               |  19 +--
 .../LongListColumn.java}                           |  19 +--
 .../StringColumn.java}                             |  19 +--
 .../StringListColumn.java}                         |  19 +--
 .../core/data/operator/AddMergeOperation.java      |  20 ++-
 .../core/data/operator/CoverMergeOperation.java    |  19 ++-
 .../core/data/operator/MaxMergeOperation.java      |  20 ++-
 .../core/data/operator/MinMergeOperation.java      |  20 ++-
 .../core/data/operator/NonMergeOperation.java      |  18 ++-
 .../grpc/service/GRPCRemoteDeserializeService.java |  19 +++
 .../grpc/service/GRPCRemoteSerializeService.java   |  47 ++++--
 .../src/main/proto/RemoteCommonService.proto       |  20 +++
 .../storage/table/alarm/ApplicationAlarm.java      |  32 ++--
 .../storage/table/alarm/ApplicationAlarmList.java  |  34 ++--
 .../table/alarm/ApplicationReferenceAlarm.java     |  27 ++--
 .../table/alarm/ApplicationReferenceAlarmList.java |  27 ++--
 .../storage/table/alarm/InstanceAlarm.java         |  34 ++--
 .../storage/table/alarm/InstanceAlarmList.java     |  34 ++--
 .../table/alarm/InstanceReferenceAlarm.java        |  31 ++--
 .../table/alarm/InstanceReferenceAlarmList.java    |  31 ++--
 .../storage/table/alarm/ServiceAlarm.java          |  36 ++---
 .../storage/table/alarm/ServiceAlarmList.java      |  36 ++---
 .../storage/table/alarm/ServiceReferenceAlarm.java |  35 ++--
 .../table/alarm/ServiceReferenceAlarmList.java     |  35 ++--
 .../table/application/ApplicationComponent.java    |  30 ++--
 .../table/application/ApplicationMapping.java      |  30 ++--
 .../table/application/ApplicationMetric.java       |  68 ++++----
 .../application/ApplicationReferenceMetric.java    |  73 ++++-----
 .../storage/table/global/GlobalTrace.java          |  26 ++-
 .../table/global/ResponseTimeDistribution.java     |  35 ++--
 .../storage/table/instance/InstanceMapping.java    |  32 ++--
 .../storage/table/instance/InstanceMetric.java     |  66 ++++----
 .../table/instance/InstanceReferenceMetric.java    |  69 ++++----
 .../apm/collector/storage/table/jvm/CpuMetric.java |  30 ++--
 .../apm/collector/storage/table/jvm/GCMetric.java  |  32 ++--
 .../collector/storage/table/jvm/MemoryMetric.java  |  41 ++---
 .../storage/table/jvm/MemoryPoolMetric.java        |  41 ++---
 .../storage/table/register/Application.java        |  30 ++--
 .../collector/storage/table/register/Instance.java |  33 ++--
 .../storage/table/register/NetworkAddress.java     |  32 ++--
 .../storage/table/register/ServiceName.java        |  27 ++--
 .../collector/storage/table/segment/Segment.java   |  24 ++-
 .../storage/table/segment/SegmentDuration.java     |  54 +++----
 .../storage/table/service/ServiceMetric.java       |  67 ++++----
 .../table/service/ServiceReferenceMetric.java      |  74 ++++-----
 .../apm/collector/storage/ui/trace/BasicTrace.java |  22 ++-
 .../es/dao/SegmentDurationEsPersistenceDAO.java    |   5 +-
 .../storage/es/dao/ui/SegmentDurationEsUIDAO.java  |  12 +-
 .../storage/h2/dao/ui/SegmentDurationH2UIDAO.java  |   2 +-
 .../dao/ui/SegmentDurationShardingjdbcUIDAO.java   |   3 +-
 .../ui/service/SegmentTopServiceTest.java          |   2 +-
 .../src/main/resources/ui-graphql/trace.graphqls   |   2 +-
 70 files changed, 1169 insertions(+), 1012 deletions(-)

diff --git a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/ConsumerMock.java b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/ConsumerMock.java
index d83a6cd..e080d3d 100644
--- a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/ConsumerMock.java
+++ b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/ConsumerMock.java
@@ -20,12 +20,7 @@ package org.apache.skywalking.apm.collector.agent.grpc.provider.handler.mock;
 
 import com.google.protobuf.ByteString;
 import io.grpc.stub.StreamObserver;
-import org.apache.skywalking.apm.network.proto.SpanLayer;
-import org.apache.skywalking.apm.network.proto.SpanObject;
-import org.apache.skywalking.apm.network.proto.SpanType;
-import org.apache.skywalking.apm.network.proto.TraceSegmentObject;
-import org.apache.skywalking.apm.network.proto.UniqueId;
-import org.apache.skywalking.apm.network.proto.UpstreamSegment;
+import org.apache.skywalking.apm.network.proto.*;
 import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
 
 /**
@@ -47,20 +42,76 @@ class ConsumerMock {
         segment.setTraceSegmentId(segmentId);
         segment.setApplicationId(-1);
         segment.setApplicationInstanceId(2);
-        segment.addSpans(createExitSpan(startTimestamp, isPrepare));
         segment.addSpans(createEntrySpan(startTimestamp, isPrepare));
+        segment.addSpans(createLocalSpan(startTimestamp, isPrepare));
+        segment.addSpans(createMqEntrySpan(startTimestamp, isPrepare));
+        segment.addSpans(createExitSpan(startTimestamp, isPrepare));
+        segment.addSpans(createMqEntrySpan2(startTimestamp, isPrepare));
+        segment.addSpans(createExitSpan2(startTimestamp, isPrepare));
 
         return segment.build().toByteString();
     }
 
-    private SpanObject.Builder createExitSpan(long startTimestamp, boolean isPrepare) {
+    private SpanObject.Builder createEntrySpan(long startTimestamp, boolean isPrepare) {
+        SpanObject.Builder span = SpanObject.newBuilder();
+        span.setSpanId(0);
+        span.setSpanType(SpanType.Entry);
+        span.setSpanLayer(SpanLayer.Http);
+        span.setParentSpanId(-1);
+        span.setStartTime(startTimestamp);
+        span.setEndTime(startTimestamp + 2000);
+        span.setComponentId(ComponentsDefine.TOMCAT.getId());
+        if (isPrepare) {
+            span.setOperationName("/dubbox-case/case/dubbox-rest");
+        } else {
+            span.setOperationNameId(2);
+        }
+        span.setIsError(false);
+        return span;
+    }
+
+    private SpanObject.Builder createLocalSpan(long startTimestamp, boolean isPrepare) {
         SpanObject.Builder span = SpanObject.newBuilder();
         span.setSpanId(1);
+        span.setSpanType(SpanType.Local);
+        span.setParentSpanId(0);
+        span.setStartTime(startTimestamp + 100);
+        span.setEndTime(startTimestamp + 1900);
+        if (isPrepare) {
+            span.setOperationName("org.apache.skywalking.Local.do");
+        } else {
+            span.setOperationNameId(2);
+        }
+        span.setIsError(false);
+        return span;
+    }
+
+    private SpanObject.Builder createMqEntrySpan(long startTimestamp, boolean isPrepare) {
+        SpanObject.Builder span = SpanObject.newBuilder();
+        span.setSpanId(2);
+        span.setSpanType(SpanType.Entry);
+        span.setSpanLayer(SpanLayer.MQ);
+        span.setParentSpanId(1);
+        span.setStartTime(startTimestamp + 110);
+        span.setEndTime(startTimestamp + 1800);
+        span.setComponentId(ComponentsDefine.ROCKET_MQ_CONSUMER.getId());
+        if (isPrepare) {
+            span.setOperationName("org.apache.skywalking.RocketMQ");
+        } else {
+            span.setOperationNameId(2);
+        }
+        span.setIsError(false);
+        return span;
+    }
+
+    private SpanObject.Builder createExitSpan(long startTimestamp, boolean isPrepare) {
+        SpanObject.Builder span = SpanObject.newBuilder();
+        span.setSpanId(3);
         span.setSpanType(SpanType.Exit);
         span.setSpanLayer(SpanLayer.RPCFramework);
-        span.setParentSpanId(0);
-        span.setStartTime(startTimestamp + 10);
-        span.setEndTime(startTimestamp + 1990);
+        span.setParentSpanId(2);
+        span.setStartTime(startTimestamp + 120);
+        span.setEndTime(startTimestamp + 1780);
         span.setComponentId(ComponentsDefine.DUBBO.getId());
         if (isPrepare) {
             span.setPeer("172.25.0.4:20880");
@@ -73,21 +124,41 @@ class ConsumerMock {
         return span;
     }
 
-    private SpanObject.Builder createEntrySpan(long startTimestamp, boolean isPrepare) {
+    private SpanObject.Builder createMqEntrySpan2(long startTimestamp, boolean isPrepare) {
         SpanObject.Builder span = SpanObject.newBuilder();
-        span.setSpanId(0);
+        span.setSpanId(4);
         span.setSpanType(SpanType.Entry);
-        span.setSpanLayer(SpanLayer.Http);
-        span.setParentSpanId(-1);
-        span.setStartTime(startTimestamp);
-        span.setEndTime(startTimestamp + 2000);
-        span.setComponentId(ComponentsDefine.TOMCAT.getId());
+        span.setSpanLayer(SpanLayer.MQ);
+        span.setParentSpanId(1);
+        span.setStartTime(startTimestamp + 110);
+        span.setEndTime(startTimestamp + 1800);
+        span.setComponentId(ComponentsDefine.ROCKET_MQ_CONSUMER.getId());
         if (isPrepare) {
-            span.setOperationName("/dubbox-case/case/dubbox-rest");
+            span.setOperationName("org.apache.skywalking.RocketMQ");
         } else {
             span.setOperationNameId(2);
         }
         span.setIsError(false);
         return span;
     }
+
+    private SpanObject.Builder createExitSpan2(long startTimestamp, boolean isPrepare) {
+        SpanObject.Builder span = SpanObject.newBuilder();
+        span.setSpanId(5);
+        span.setSpanType(SpanType.Exit);
+        span.setSpanLayer(SpanLayer.RPCFramework);
+        span.setParentSpanId(4);
+        span.setStartTime(startTimestamp + 120);
+        span.setEndTime(startTimestamp + 1780);
+        span.setComponentId(ComponentsDefine.DUBBO.getId());
+        if (isPrepare) {
+            span.setPeer("172.25.0.4:20880");
+            span.setOperationName("org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
+        } else {
+            span.setOperationNameId(-1);
+            span.setPeerId(-1);
+        }
+        span.setIsError(false);
+        return span;
+    }
 }
diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/segment/SegmentDurationSpanListener.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/segment/SegmentDurationSpanListener.java
index c763de5..30269c4 100644
--- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/segment/SegmentDurationSpanListener.java
+++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/segment/SegmentDurationSpanListener.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.segment;
 
+import java.util.*;
 import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricGraphIdDefine;
 import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.*;
 import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.*;
@@ -39,11 +40,12 @@ public class SegmentDurationSpanListener implements FirstSpanListener, EntrySpan
 
     private final SegmentDuration segmentDuration;
     private final ServiceNameCacheService serviceNameCacheService;
-    private int entryOperationNameId = 0;
+    private Set<Integer> entryOperationNameIds;
     private int firstOperationNameId = 0;
 
     private SegmentDurationSpanListener(ModuleManager moduleManager) {
         this.segmentDuration = new SegmentDuration();
+        this.entryOperationNameIds = new HashSet<>();
         this.serviceNameCacheService = moduleManager.find(CacheModule.NAME).getService(ServiceNameCacheService.class);
     }
 
@@ -56,6 +58,7 @@ public class SegmentDurationSpanListener implements FirstSpanListener, EntrySpan
         long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(segmentCoreInfo.getStartTime());
 
         segmentDuration.setId(segmentCoreInfo.getSegmentId());
+        segmentDuration.setTraceId(segmentCoreInfo.getTraceId());
         segmentDuration.setSegmentId(segmentCoreInfo.getSegmentId());
         segmentDuration.setApplicationId(segmentCoreInfo.getApplicationId());
         segmentDuration.setDuration(segmentCoreInfo.getEndTime() - segmentCoreInfo.getStartTime());
@@ -68,16 +71,16 @@ public class SegmentDurationSpanListener implements FirstSpanListener, EntrySpan
     }
 
     @Override public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
-        entryOperationNameId = spanDecorator.getOperationNameId();
+        entryOperationNameIds.add(spanDecorator.getOperationNameId());
     }
 
     @Override public void build() {
         Graph<SegmentDuration> graph = GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.SEGMENT_DURATION_GRAPH_ID, SegmentDuration.class);
         logger.debug("segment duration listener build");
-        if (entryOperationNameId == 0) {
-            segmentDuration.setServiceName(serviceNameCacheService.get(firstOperationNameId).getServiceName());
+        if (entryOperationNameIds.size() == 0) {
+            segmentDuration.getServiceName().add(serviceNameCacheService.get(firstOperationNameId).getServiceName());
         } else {
-            segmentDuration.setServiceName(serviceNameCacheService.get(entryOperationNameId).getServiceName());
+            entryOperationNameIds.forEach(entryOperationNameId -> segmentDuration.getServiceName().add(serviceNameCacheService.get(entryOperationNameId).getServiceName()));
         }
 
         graph.start(segmentDuration);
diff --git a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-define/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/define/decorator/SegmentCoreInfo.java b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-define/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/define/decorator/SegmentCoreInfo.java
index fb15145..48607c3 100644
--- a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-define/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/define/decorator/SegmentCoreInfo.java
+++ b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-define/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/define/decorator/SegmentCoreInfo.java
@@ -23,6 +23,7 @@ package org.apache.skywalking.apm.collector.analysis.segment.parser.define.decor
  */
 public class SegmentCoreInfo {
     private String segmentId;
+    private String traceId;
     private int applicationId;
     private int applicationInstanceId;
     private long startTime;
@@ -85,4 +86,12 @@ public class SegmentCoreInfo {
     public void setMinuteTimeBucket(long minuteTimeBucket) {
         this.minuteTimeBucket = minuteTimeBucket;
     }
+
+    public String getTraceId() {
+        return traceId;
+    }
+
+    public void setTraceId(String traceId) {
+        this.traceId = traceId;
+    }
 }
diff --git a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/SegmentParse.java b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/SegmentParse.java
index aaa0f83..85acfc0 100644
--- a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/SegmentParse.java
+++ b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/SegmentParse.java
@@ -19,32 +19,19 @@
 package org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser;
 
 import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.ReferenceDecorator;
-import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SegmentCoreInfo;
-import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SegmentDecorator;
-import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SpanDecorator;
+import java.util.*;
+import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.*;
 import org.apache.skywalking.apm.collector.analysis.segment.parser.define.graph.GraphIdDefine;
 import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.*;
 import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParseService;
-import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization.ReferenceIdExchanger;
-import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization.SegmentStandardization;
-import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization.SpanIdExchanger;
-import org.apache.skywalking.apm.collector.core.UnexpectedException;
+import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization.*;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
-import org.apache.skywalking.apm.collector.core.graph.Graph;
-import org.apache.skywalking.apm.collector.core.graph.GraphManager;
+import org.apache.skywalking.apm.collector.core.graph.*;
 import org.apache.skywalking.apm.collector.core.module.ModuleManager;
 import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
 import org.apache.skywalking.apm.collector.storage.table.segment.Segment;
-import org.apache.skywalking.apm.network.proto.SpanType;
-import org.apache.skywalking.apm.network.proto.TraceSegmentObject;
-import org.apache.skywalking.apm.network.proto.UniqueId;
-import org.apache.skywalking.apm.network.proto.UpstreamSegment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.LinkedList;
-import java.util.List;
+import org.apache.skywalking.apm.network.proto.*;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
@@ -121,7 +108,6 @@ public class SegmentParse {
         segmentCoreInfo.setApplicationId(segmentDecorator.getApplicationId());
         segmentCoreInfo.setApplicationInstanceId(segmentDecorator.getApplicationInstanceId());
 
-        int entrySpanCount = 0;
         for (int i = 0; i < segmentDecorator.getSpansCount(); i++) {
             SpanDecorator spanDecorator = segmentDecorator.getSpans(i);
 
@@ -136,10 +122,6 @@ public class SegmentParse {
                 }
             }
 
-            if (SpanType.Entry.equals(spanDecorator.getSpanType())) {
-                entrySpanCount++;
-            }
-
             if (segmentCoreInfo.getStartTime() > spanDecorator.getStartTime()) {
                 segmentCoreInfo.setStartTime(spanDecorator.getStartTime());
             }
@@ -147,10 +129,6 @@ public class SegmentParse {
                 segmentCoreInfo.setEndTime(spanDecorator.getEndTime());
             }
             segmentCoreInfo.setError(spanDecorator.getIsError() || segmentCoreInfo.isError());
-
-            if (entrySpanCount > 1) {
-                throw new UnexpectedException("This segment contains multiple entry span.");
-            }
         }
 
         long minuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(segmentCoreInfo.getStartTime());
@@ -205,7 +183,7 @@ public class SegmentParse {
     private void notifyExitListener(SpanDecorator spanDecorator) {
         spanListeners.forEach(listener -> {
             if (listener.containsPoint(SpanListener.Point.Exit)) {
-                ((ExitSpanListener) listener).parseExit(spanDecorator, segmentCoreInfo);
+                ((ExitSpanListener)listener).parseExit(spanDecorator, segmentCoreInfo);
             }
         });
     }
@@ -214,7 +192,7 @@ public class SegmentParse {
     private void notifyEntryListener(SpanDecorator spanDecorator) {
         spanListeners.forEach(listener -> {
             if (listener.containsPoint(SpanListener.Point.Entry)) {
-                ((EntrySpanListener) listener).parseEntry(spanDecorator, segmentCoreInfo);
+                ((EntrySpanListener)listener).parseEntry(spanDecorator, segmentCoreInfo);
             }
         });
     }
@@ -223,7 +201,7 @@ public class SegmentParse {
     private void notifyLocalListener(SpanDecorator spanDecorator) {
         spanListeners.forEach(listener -> {
             if (listener.containsPoint(SpanListener.Point.Local)) {
-                ((LocalSpanListener) listener).parseLocal(spanDecorator, segmentCoreInfo);
+                ((LocalSpanListener)listener).parseLocal(spanDecorator, segmentCoreInfo);
             }
         });
     }
@@ -232,7 +210,7 @@ public class SegmentParse {
     private void notifyFirstListener(SpanDecorator spanDecorator) {
         spanListeners.forEach(listener -> {
             if (listener.containsPoint(SpanListener.Point.First)) {
-                ((FirstSpanListener) listener).parseFirst(spanDecorator, segmentCoreInfo);
+                ((FirstSpanListener)listener).parseFirst(spanDecorator, segmentCoreInfo);
             }
         });
     }
@@ -241,7 +219,7 @@ public class SegmentParse {
     private void notifyGlobalsListener(UniqueId uniqueId) {
         spanListeners.forEach(listener -> {
             if (listener.containsPoint(SpanListener.Point.GlobalTraceIds)) {
-                ((GlobalTraceIdsListener) listener).parseGlobalTraceId(uniqueId, segmentCoreInfo);
+                ((GlobalTraceIdsListener)listener).parseGlobalTraceId(uniqueId, segmentCoreInfo);
             }
         });
     }
@@ -250,5 +228,4 @@ public class SegmentParse {
     private void createSpanListeners() {
         listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager)));
     }
-
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/AbstractData.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/AbstractData.java
index 6634743..5af3ff2 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/AbstractData.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/AbstractData.java
@@ -18,82 +18,141 @@
 
 package org.apache.skywalking.apm.collector.core.data;
 
+import org.apache.skywalking.apm.collector.core.data.column.*;
+
 import static java.util.Objects.nonNull;
 
 /**
  * @author peng-yongsheng
  */
-public abstract class AbstractData {
-    private String[] dataStrings;
-    private Long[] dataLongs;
-    private Double[] dataDoubles;
-    private Integer[] dataIntegers;
-    private byte[][] dataBytes;
-    private final Column[] stringColumns;
-    private final Column[] longColumns;
-    private final Column[] doubleColumns;
-    private final Column[] integerColumns;
-    private final Column[] byteColumns;
-
-    public AbstractData(Column[] stringColumns, Column[] longColumns, Column[] doubleColumns,
-        Column[] integerColumns, Column[] byteColumns) {
-        this.dataStrings = new String[stringColumns.length];
-        this.dataLongs = new Long[longColumns.length];
-        this.dataDoubles = new Double[doubleColumns.length];
-        this.dataIntegers = new Integer[integerColumns.length];
-        this.dataBytes = new byte[byteColumns.length][];
+public abstract class AbstractData implements RemoteData {
+    private final String[] dataStrings;
+    private final Long[] dataLongs;
+    private final Double[] dataDoubles;
+    private final Integer[] dataIntegers;
+    private final byte[][] dataBytes;
+
+    private final StringLinkedList[] dataStringLists;
+    private final LongLinkedList[] dataLongLists;
+    private final DoubleLinkedList[] dataDoubleLists;
+    private final IntegerLinkedList[] dataIntegerLists;
+
+    private final StringColumn[] stringColumns;
+    private final LongColumn[] longColumns;
+    private final IntegerColumn[] integerColumns;
+    private final DoubleColumn[] doubleColumns;
+    private final ByteColumn[] byteColumns;
+
+    private final StringListColumn[] stringListColumns;
+    private final LongListColumn[] longListColumns;
+    private final IntegerListColumn[] integerListColumns;
+    private final DoubleListColumn[] doubleListColumns;
+
+    AbstractData(StringColumn[] stringColumns, LongColumn[] longColumns,
+        IntegerColumn[] integerColumns,
+        DoubleColumn[] doubleColumns, ByteColumn[] byteColumns,
+        StringListColumn[] stringListColumns,
+        LongListColumn[] longListColumns,
+        IntegerListColumn[] integerListColumns, DoubleListColumn[] doubleListColumns) {
         this.stringColumns = stringColumns;
         this.longColumns = longColumns;
-        this.doubleColumns = doubleColumns;
         this.integerColumns = integerColumns;
+        this.doubleColumns = doubleColumns;
         this.byteColumns = byteColumns;
+
+        this.stringListColumns = stringListColumns;
+        this.longListColumns = longListColumns;
+        this.integerListColumns = integerListColumns;
+        this.doubleListColumns = doubleListColumns;
+
+        this.dataStrings = new String[stringColumns.length];
+        this.dataLongs = new Long[longColumns.length];
+        this.dataIntegers = new Integer[integerColumns.length];
+        this.dataDoubles = new Double[doubleColumns.length];
+        this.dataBytes = new byte[byteColumns.length][];
+
+        this.dataStringLists = new StringLinkedList[stringListColumns.length];
+        for (int i = 0; i < this.dataStringLists.length; i++) {
+            this.dataStringLists[i] = new StringLinkedList();
+        }
+
+        this.dataLongLists = new LongLinkedList[longListColumns.length];
+        for (int i = 0; i < this.dataLongLists.length; i++) {
+            this.dataLongLists[i] = new LongLinkedList();
+        }
+
+        this.dataIntegerLists = new IntegerLinkedList[integerListColumns.length];
+        for (int i = 0; i < this.dataIntegerLists.length; i++) {
+            this.dataIntegerLists[i] = new IntegerLinkedList();
+        }
+
+        this.dataDoubleLists = new DoubleLinkedList[doubleListColumns.length];
+        for (int i = 0; i < this.dataDoubleLists.length; i++) {
+            this.dataDoubleLists[i] = new DoubleLinkedList();
+        }
     }
 
-    public final int getDataStringsCount() {
+    @Override public final int getDataStringsCount() {
         return dataStrings.length;
     }
 
-    public final int getDataLongsCount() {
+    @Override public final int getDataLongsCount() {
         return dataLongs.length;
     }
 
-    public final int getDataDoublesCount() {
+    @Override public final int getDataDoublesCount() {
         return dataDoubles.length;
     }
 
-    public final int getDataIntegersCount() {
+    @Override public final int getDataIntegersCount() {
         return dataIntegers.length;
     }
 
-    public final int getDataBytesCount() {
+    @Override public final int getDataBytesCount() {
         return dataBytes.length;
     }
 
-    public final void setDataString(int position, String value) {
+    @Override public int getDataStringListsCount() {
+        return dataStringLists.length;
+    }
+
+    @Override public int getDataLongListsCount() {
+        return dataLongLists.length;
+    }
+
+    @Override public int getDataDoubleListsCount() {
+        return dataDoubleLists.length;
+    }
+
+    @Override public int getDataIntegerListsCount() {
+        return dataIntegerLists.length;
+    }
+
+    @Override public final void setDataString(int position, String value) {
         dataStrings[position] = value;
     }
 
-    public final void setDataLong(int position, Long value) {
+    @Override public final void setDataLong(int position, Long value) {
         dataLongs[position] = value;
     }
 
-    public final void setDataDouble(int position, Double value) {
+    @Override public final void setDataDouble(int position, Double value) {
         dataDoubles[position] = value;
     }
 
-    public final void setDataInteger(int position, Integer value) {
+    @Override public final void setDataInteger(int position, Integer value) {
         dataIntegers[position] = value;
     }
 
-    public final void setDataBytes(int position, byte[] dataBytes) {
+    @Override public final void setDataBytes(int position, byte[] dataBytes) {
         this.dataBytes[position] = dataBytes;
     }
 
-    public final String getDataString(int position) {
+    @Override public final String getDataString(int position) {
         return dataStrings[position];
     }
 
-    public final Long getDataLong(int position) {
+    @Override public final Long getDataLong(int position) {
         if (position + 1 > dataLongs.length) {
             throw new IndexOutOfBoundsException();
         } else if (dataLongs[position] == null) {
@@ -103,7 +162,7 @@ public abstract class AbstractData {
         }
     }
 
-    public final Double getDataDouble(int position) {
+    @Override public final Double getDataDouble(int position) {
         if (position + 1 > dataDoubles.length) {
             throw new IndexOutOfBoundsException();
         } else if (dataDoubles[position] == null) {
@@ -113,7 +172,7 @@ public abstract class AbstractData {
         }
     }
 
-    public final Integer getDataInteger(int position) {
+    @Override public final Integer getDataInteger(int position) {
         if (position + 1 > dataIntegers.length) {
             throw new IndexOutOfBoundsException();
         } else if (dataIntegers[position] == null) {
@@ -123,10 +182,42 @@ public abstract class AbstractData {
         }
     }
 
-    public final byte[] getDataBytes(int position) {
+    @Override public final byte[] getDataBytes(int position) {
         return dataBytes[position];
     }
 
+    @Override public StringLinkedList getDataStringList(int position) {
+        if (position + 1 > dataStringLists.length) {
+            throw new IndexOutOfBoundsException();
+        } else {
+            return dataStringLists[position];
+        }
+    }
+
+    @Override public LongLinkedList getDataLongList(int position) {
+        if (position + 1 > dataLongLists.length) {
+            throw new IndexOutOfBoundsException();
+        } else {
+            return dataLongLists[position];
+        }
+    }
+
+    @Override public DoubleLinkedList getDataDoubleList(int position) {
+        if (position + 1 > dataDoubleLists.length) {
+            throw new IndexOutOfBoundsException();
+        } else {
+            return dataDoubleLists[position];
+        }
+    }
+
+    @Override public IntegerLinkedList getDataIntegerList(int position) {
+        if (position + 1 > dataIntegerLists.length) {
+            throw new IndexOutOfBoundsException();
+        } else {
+            return dataIntegerLists[position];
+        }
+    }
+
     public final void mergeAndFormulaCalculateData(AbstractData newData) {
         mergeData(newData);
         calculateFormula();
@@ -153,6 +244,22 @@ public abstract class AbstractData {
             byte[] byteData = byteColumns[i].getMergeOperation().operate(newData.getDataBytes(i), this.getDataBytes(i));
             this.dataBytes[i] = byteData;
         }
+        for (int i = 0; i < stringListColumns.length; i++) {
+            StringLinkedList stringListData = stringListColumns[i].getMergeOperation().operate(newData.getDataStringList(i), this.getDataStringList(i));
+            this.dataStringLists[i] = stringListData;
+        }
+        for (int i = 0; i < longListColumns.length; i++) {
+            LongLinkedList longListData = longListColumns[i].getMergeOperation().operate(newData.getDataLongList(i), this.getDataLongList(i));
+            this.dataLongLists[i] = longListData;
+        }
+        for (int i = 0; i < doubleListColumns.length; i++) {
+            DoubleLinkedList doubleListData = doubleListColumns[i].getMergeOperation().operate(newData.getDataDoubleList(i), this.getDataDoubleList(i));
+            this.dataDoubleLists[i] = doubleListData;
+        }
+        for (int i = 0; i < integerListColumns.length; i++) {
+            IntegerLinkedList integerListData = integerListColumns[i].getMergeOperation().operate(newData.getDataIntegerList(i), this.getDataIntegerList(i));
+            this.dataIntegerLists[i] = integerListData;
+        }
     }
 
     @SuppressWarnings("unchecked")
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/Data.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/Data.java
index 8626154..540b358 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/Data.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/Data.java
@@ -30,6 +30,14 @@ public interface Data {
 
     int getDataIntegersCount();
 
+    int getDataStringListsCount();
+
+    int getDataLongListsCount();
+
+    int getDataDoubleListsCount();
+
+    int getDataIntegerListsCount();
+
     int getDataBytesCount();
 
     void setDataString(int position, String value);
@@ -50,5 +58,13 @@ public interface Data {
 
     Integer getDataInteger(int position);
 
+    StringLinkedList getDataStringList(int position);
+
+    LongLinkedList getDataLongList(int position);
+
+    DoubleLinkedList getDataDoubleList(int position);
+
+    IntegerLinkedList getDataIntegerList(int position);
+
     byte[] getDataBytes(int position);
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/DoubleLinkedList.java
similarity index 73%
copy from apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
copy to apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/DoubleLinkedList.java
index 1b3ddba..b3254ef 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/DoubleLinkedList.java
@@ -18,17 +18,10 @@
 
 package org.apache.skywalking.apm.collector.core.data;
 
+import java.util.LinkedList;
+
 /**
  * @author peng-yongsheng
  */
-public interface MergeOperation extends Operation {
-    String operate(String newValue, String oldValue);
-
-    Long operate(Long newValue, Long oldValue);
-
-    Double operate(Double newValue, Double oldValue);
-
-    Integer operate(Integer newValue, Integer oldValue);
-
-    byte[] operate(byte[] newValue, byte[] oldValue);
+public class DoubleLinkedList extends LinkedList<Double> {
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/IntegerLinkedList.java
similarity index 73%
copy from apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
copy to apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/IntegerLinkedList.java
index 1b3ddba..2374a20 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/IntegerLinkedList.java
@@ -18,17 +18,10 @@
 
 package org.apache.skywalking.apm.collector.core.data;
 
+import java.util.LinkedList;
+
 /**
  * @author peng-yongsheng
  */
-public interface MergeOperation extends Operation {
-    String operate(String newValue, String oldValue);
-
-    Long operate(Long newValue, Long oldValue);
-
-    Double operate(Double newValue, Double oldValue);
-
-    Integer operate(Integer newValue, Integer oldValue);
-
-    byte[] operate(byte[] newValue, byte[] oldValue);
+public class IntegerLinkedList extends LinkedList<Integer> {
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/LongLinkedList.java
similarity index 73%
copy from apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
copy to apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/LongLinkedList.java
index 1b3ddba..88dd6d9 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/LongLinkedList.java
@@ -18,17 +18,10 @@
 
 package org.apache.skywalking.apm.collector.core.data;
 
+import java.util.LinkedList;
+
 /**
  * @author peng-yongsheng
  */
-public interface MergeOperation extends Operation {
-    String operate(String newValue, String oldValue);
-
-    Long operate(Long newValue, Long oldValue);
-
-    Double operate(Double newValue, Double oldValue);
-
-    Integer operate(Integer newValue, Integer oldValue);
-
-    byte[] operate(byte[] newValue, byte[] oldValue);
+public class LongLinkedList extends LinkedList<Long> {
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
index 1b3ddba..5ebc927 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
@@ -26,9 +26,17 @@ public interface MergeOperation extends Operation {
 
     Long operate(Long newValue, Long oldValue);
 
-    Double operate(Double newValue, Double oldValue);
-
     Integer operate(Integer newValue, Integer oldValue);
 
+    Double operate(Double newValue, Double oldValue);
+
     byte[] operate(byte[] newValue, byte[] oldValue);
+
+    StringLinkedList operate(StringLinkedList newValue, StringLinkedList oldValue);
+
+    LongLinkedList operate(LongLinkedList newValue, LongLinkedList oldValue);
+
+    IntegerLinkedList operate(IntegerLinkedList newValue, IntegerLinkedList oldValue);
+
+    DoubleLinkedList operate(DoubleLinkedList newValue, DoubleLinkedList oldValue);
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/StreamData.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/StreamData.java
index faf127e..ece2da9 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/StreamData.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/StreamData.java
@@ -18,12 +18,13 @@
 
 package org.apache.skywalking.apm.collector.core.data;
 
+import org.apache.skywalking.apm.collector.core.data.column.*;
 import org.apache.skywalking.apm.collector.core.queue.EndOfBatchContext;
 
 /**
  * @author peng-yongsheng
  */
-public abstract class StreamData extends AbstractData implements RemoteData, QueueData {
+public abstract class StreamData extends AbstractData implements QueueData {
 
     private EndOfBatchContext endOfBatchContext;
 
@@ -35,9 +36,23 @@ public abstract class StreamData extends AbstractData implements RemoteData, Que
         this.endOfBatchContext = context;
     }
 
-    public StreamData(Column[] stringColumns, Column[] longColumns, Column[] doubleColumns,
-        Column[] integerColumns, Column[] byteColumns) {
-        super(stringColumns, longColumns, doubleColumns, integerColumns, byteColumns);
+    public StreamData(StringColumn[] stringColumns, LongColumn[] longColumns,
+        IntegerColumn[] integerColumns,
+        DoubleColumn[] doubleColumns, StringListColumn[] stringListColumns,
+        LongListColumn[] longListColumns,
+        IntegerListColumn[] integerListColumns, DoubleListColumn[] doubleListColumns) {
+        super(stringColumns, longColumns, integerColumns, doubleColumns, new ByteColumn[0], stringListColumns, longListColumns, integerListColumns, doubleListColumns);
+    }
+
+    public StreamData(StringColumn[] stringColumns, LongColumn[] longColumns,
+        IntegerColumn[] integerColumns, DoubleColumn[] doubleColumns) {
+        super(stringColumns, longColumns, integerColumns, doubleColumns, new ByteColumn[0], new StringListColumn[0], new LongListColumn[0], new IntegerListColumn[0], new DoubleListColumn[0]);
+    }
+
+    public StreamData(StringColumn[] stringColumns, LongColumn[] longColumns,
+        IntegerColumn[] integerColumns, DoubleColumn[] doubleColumns,
+        ByteColumn[] byteColumns) {
+        super(stringColumns, longColumns, integerColumns, doubleColumns, byteColumns, new StringListColumn[0], new LongListColumn[0], new IntegerListColumn[0], new DoubleListColumn[0]);
     }
 
     @Override public final String selectKey() {
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/StringLinkedList.java
similarity index 73%
copy from apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
copy to apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/StringLinkedList.java
index 1b3ddba..7293e67 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/StringLinkedList.java
@@ -18,17 +18,10 @@
 
 package org.apache.skywalking.apm.collector.core.data;
 
+import java.util.LinkedList;
+
 /**
  * @author peng-yongsheng
  */
-public interface MergeOperation extends Operation {
-    String operate(String newValue, String oldValue);
-
-    Long operate(Long newValue, Long oldValue);
-
-    Double operate(Double newValue, Double oldValue);
-
-    Integer operate(Integer newValue, Integer oldValue);
-
-    byte[] operate(byte[] newValue, byte[] oldValue);
+public class StringLinkedList extends LinkedList<String> {
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/ByteColumn.java
similarity index 64%
copy from apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
copy to apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/ByteColumn.java
index 1b3ddba..2063af2 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/ByteColumn.java
@@ -16,19 +16,20 @@
  *
  */
 
-package org.apache.skywalking.apm.collector.core.data;
+package org.apache.skywalking.apm.collector.core.data.column;
+
+import org.apache.skywalking.apm.collector.core.data.*;
 
 /**
  * @author peng-yongsheng
  */
-public interface MergeOperation extends Operation {
-    String operate(String newValue, String oldValue);
-
-    Long operate(Long newValue, Long oldValue);
-
-    Double operate(Double newValue, Double oldValue);
+public class ByteColumn extends Column {
 
-    Integer operate(Integer newValue, Integer oldValue);
+    public ByteColumn(ColumnName columnName, MergeOperation mergeOperation) {
+        super(columnName, mergeOperation);
+    }
 
-    byte[] operate(byte[] newValue, byte[] oldValue);
+    public ByteColumn(ColumnName columnName, MergeOperation mergeOperation, FormulaOperation formulaOperation) {
+        super(columnName, mergeOperation, formulaOperation);
+    }
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/Column.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/Column.java
similarity index 74%
rename from apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/Column.java
rename to apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/Column.java
index 3a651d2..d7fee69 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/Column.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/Column.java
@@ -16,37 +16,40 @@
  *
  */
 
-package org.apache.skywalking.apm.collector.core.data;
+package org.apache.skywalking.apm.collector.core.data.column;
+
+import org.apache.skywalking.apm.collector.core.data.*;
 
 /**
  * @author peng-yongsheng
  */
-public class Column {
+class Column {
     private final ColumnName columnName;
     private final MergeOperation mergeOperation;
     private final FormulaOperation formulaOperation;
 
-    public Column(ColumnName columnName, MergeOperation mergeOperation) {
+    Column(ColumnName columnName, MergeOperation mergeOperation) {
         this.columnName = columnName;
         this.mergeOperation = mergeOperation;
         this.formulaOperation = null;
     }
 
-    public Column(ColumnName columnName, MergeOperation mergeOperation, FormulaOperation formulaOperation) {
+    Column(ColumnName columnName, MergeOperation mergeOperation,
+        FormulaOperation formulaOperation) {
         this.columnName = columnName;
         this.mergeOperation = mergeOperation;
         this.formulaOperation = formulaOperation;
     }
 
-    public ColumnName getColumnName() {
+    public final ColumnName getColumnName() {
         return columnName;
     }
 
-    MergeOperation getMergeOperation() {
+    public final MergeOperation getMergeOperation() {
         return mergeOperation;
     }
 
-    FormulaOperation getFormulaOperation() {
+    public final FormulaOperation getFormulaOperation() {
         return formulaOperation;
     }
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/DoubleColumn.java
similarity index 64%
copy from apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
copy to apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/DoubleColumn.java
index 1b3ddba..bc5182c 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/DoubleColumn.java
@@ -16,19 +16,20 @@
  *
  */
 
-package org.apache.skywalking.apm.collector.core.data;
+package org.apache.skywalking.apm.collector.core.data.column;
+
+import org.apache.skywalking.apm.collector.core.data.*;
 
 /**
  * @author peng-yongsheng
  */
-public interface MergeOperation extends Operation {
-    String operate(String newValue, String oldValue);
-
-    Long operate(Long newValue, Long oldValue);
-
-    Double operate(Double newValue, Double oldValue);
+public class DoubleColumn extends Column {
 
-    Integer operate(Integer newValue, Integer oldValue);
+    public DoubleColumn(ColumnName columnName, MergeOperation mergeOperation) {
+        super(columnName, mergeOperation);
+    }
 
-    byte[] operate(byte[] newValue, byte[] oldValue);
+    public DoubleColumn(ColumnName columnName, MergeOperation mergeOperation, FormulaOperation formulaOperation) {
+        super(columnName, mergeOperation, formulaOperation);
+    }
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/DoubleListColumn.java
similarity index 63%
copy from apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
copy to apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/DoubleListColumn.java
index 1b3ddba..f07da95 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/DoubleListColumn.java
@@ -16,19 +16,20 @@
  *
  */
 
-package org.apache.skywalking.apm.collector.core.data;
+package org.apache.skywalking.apm.collector.core.data.column;
+
+import org.apache.skywalking.apm.collector.core.data.*;
 
 /**
  * @author peng-yongsheng
  */
-public interface MergeOperation extends Operation {
-    String operate(String newValue, String oldValue);
-
-    Long operate(Long newValue, Long oldValue);
-
-    Double operate(Double newValue, Double oldValue);
+public class DoubleListColumn extends Column {
 
-    Integer operate(Integer newValue, Integer oldValue);
+    public DoubleListColumn(ColumnName columnName, MergeOperation mergeOperation) {
+        super(columnName, mergeOperation);
+    }
 
-    byte[] operate(byte[] newValue, byte[] oldValue);
+    public DoubleListColumn(ColumnName columnName, MergeOperation mergeOperation, FormulaOperation formulaOperation) {
+        super(columnName, mergeOperation, formulaOperation);
+    }
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/IntegerColumn.java
similarity index 64%
copy from apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
copy to apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/IntegerColumn.java
index 1b3ddba..6e734ce 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/IntegerColumn.java
@@ -16,19 +16,20 @@
  *
  */
 
-package org.apache.skywalking.apm.collector.core.data;
+package org.apache.skywalking.apm.collector.core.data.column;
+
+import org.apache.skywalking.apm.collector.core.data.*;
 
 /**
  * @author peng-yongsheng
  */
-public interface MergeOperation extends Operation {
-    String operate(String newValue, String oldValue);
-
-    Long operate(Long newValue, Long oldValue);
-
-    Double operate(Double newValue, Double oldValue);
+public class IntegerColumn extends Column {
 
-    Integer operate(Integer newValue, Integer oldValue);
+    public IntegerColumn(ColumnName columnName, MergeOperation mergeOperation) {
+        super(columnName, mergeOperation);
+    }
 
-    byte[] operate(byte[] newValue, byte[] oldValue);
+    public IntegerColumn(ColumnName columnName, MergeOperation mergeOperation, FormulaOperation formulaOperation) {
+        super(columnName, mergeOperation, formulaOperation);
+    }
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/IntegerListColumn.java
similarity index 63%
copy from apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
copy to apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/IntegerListColumn.java
index 1b3ddba..cc92f1b 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/IntegerListColumn.java
@@ -16,19 +16,20 @@
  *
  */
 
-package org.apache.skywalking.apm.collector.core.data;
+package org.apache.skywalking.apm.collector.core.data.column;
+
+import org.apache.skywalking.apm.collector.core.data.*;
 
 /**
  * @author peng-yongsheng
  */
-public interface MergeOperation extends Operation {
-    String operate(String newValue, String oldValue);
-
-    Long operate(Long newValue, Long oldValue);
-
-    Double operate(Double newValue, Double oldValue);
+public class IntegerListColumn extends Column {
 
-    Integer operate(Integer newValue, Integer oldValue);
+    public IntegerListColumn(ColumnName columnName, MergeOperation mergeOperation) {
+        super(columnName, mergeOperation);
+    }
 
-    byte[] operate(byte[] newValue, byte[] oldValue);
+    public IntegerListColumn(ColumnName columnName, MergeOperation mergeOperation, FormulaOperation formulaOperation) {
+        super(columnName, mergeOperation, formulaOperation);
+    }
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/LongColumn.java
similarity index 64%
copy from apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
copy to apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/LongColumn.java
index 1b3ddba..2319d81 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/LongColumn.java
@@ -16,19 +16,20 @@
  *
  */
 
-package org.apache.skywalking.apm.collector.core.data;
+package org.apache.skywalking.apm.collector.core.data.column;
+
+import org.apache.skywalking.apm.collector.core.data.*;
 
 /**
  * @author peng-yongsheng
  */
-public interface MergeOperation extends Operation {
-    String operate(String newValue, String oldValue);
-
-    Long operate(Long newValue, Long oldValue);
-
-    Double operate(Double newValue, Double oldValue);
+public class LongColumn extends Column {
 
-    Integer operate(Integer newValue, Integer oldValue);
+    public LongColumn(ColumnName columnName, MergeOperation mergeOperation) {
+        super(columnName, mergeOperation);
+    }
 
-    byte[] operate(byte[] newValue, byte[] oldValue);
+    public LongColumn(ColumnName columnName, MergeOperation mergeOperation, FormulaOperation formulaOperation) {
+        super(columnName, mergeOperation, formulaOperation);
+    }
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/LongListColumn.java
similarity index 63%
copy from apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
copy to apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/LongListColumn.java
index 1b3ddba..406fc30 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/LongListColumn.java
@@ -16,19 +16,20 @@
  *
  */
 
-package org.apache.skywalking.apm.collector.core.data;
+package org.apache.skywalking.apm.collector.core.data.column;
+
+import org.apache.skywalking.apm.collector.core.data.*;
 
 /**
  * @author peng-yongsheng
  */
-public interface MergeOperation extends Operation {
-    String operate(String newValue, String oldValue);
-
-    Long operate(Long newValue, Long oldValue);
-
-    Double operate(Double newValue, Double oldValue);
+public class LongListColumn extends Column {
 
-    Integer operate(Integer newValue, Integer oldValue);
+    public LongListColumn(ColumnName columnName, MergeOperation mergeOperation) {
+        super(columnName, mergeOperation);
+    }
 
-    byte[] operate(byte[] newValue, byte[] oldValue);
+    public LongListColumn(ColumnName columnName, MergeOperation mergeOperation, FormulaOperation formulaOperation) {
+        super(columnName, mergeOperation, formulaOperation);
+    }
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/StringColumn.java
similarity index 64%
copy from apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
copy to apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/StringColumn.java
index 1b3ddba..915e733 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/StringColumn.java
@@ -16,19 +16,20 @@
  *
  */
 
-package org.apache.skywalking.apm.collector.core.data;
+package org.apache.skywalking.apm.collector.core.data.column;
+
+import org.apache.skywalking.apm.collector.core.data.*;
 
 /**
  * @author peng-yongsheng
  */
-public interface MergeOperation extends Operation {
-    String operate(String newValue, String oldValue);
-
-    Long operate(Long newValue, Long oldValue);
-
-    Double operate(Double newValue, Double oldValue);
+public class StringColumn extends Column {
 
-    Integer operate(Integer newValue, Integer oldValue);
+    public StringColumn(ColumnName columnName, MergeOperation mergeOperation) {
+        super(columnName, mergeOperation);
+    }
 
-    byte[] operate(byte[] newValue, byte[] oldValue);
+    public StringColumn(ColumnName columnName, MergeOperation mergeOperation, FormulaOperation formulaOperation) {
+        super(columnName, mergeOperation, formulaOperation);
+    }
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/StringListColumn.java
similarity index 63%
copy from apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
copy to apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/StringListColumn.java
index 1b3ddba..6a12de4 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/StringListColumn.java
@@ -16,19 +16,20 @@
  *
  */
 
-package org.apache.skywalking.apm.collector.core.data;
+package org.apache.skywalking.apm.collector.core.data.column;
+
+import org.apache.skywalking.apm.collector.core.data.*;
 
 /**
  * @author peng-yongsheng
  */
-public interface MergeOperation extends Operation {
-    String operate(String newValue, String oldValue);
-
-    Long operate(Long newValue, Long oldValue);
-
-    Double operate(Double newValue, Double oldValue);
+public class StringListColumn extends Column {
 
-    Integer operate(Integer newValue, Integer oldValue);
+    public StringListColumn(ColumnName columnName, MergeOperation mergeOperation) {
+        super(columnName, mergeOperation);
+    }
 
-    byte[] operate(byte[] newValue, byte[] oldValue);
+    public StringListColumn(ColumnName columnName, MergeOperation mergeOperation, FormulaOperation formulaOperation) {
+        super(columnName, mergeOperation, formulaOperation);
+    }
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/AddMergeOperation.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/AddMergeOperation.java
index 2b7d416..c346557 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/AddMergeOperation.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/AddMergeOperation.java
@@ -18,7 +18,7 @@
 
 package org.apache.skywalking.apm.collector.core.data.operator;
 
-import org.apache.skywalking.apm.collector.core.data.MergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
 
 /**
  * @author peng-yongsheng
@@ -42,6 +42,22 @@ public class AddMergeOperation implements MergeOperation {
     }
 
     @Override public byte[] operate(byte[] newValue, byte[] oldValue) {
-        throw new UnsupportedOperationException("not support byte addition operation");
+        throw new UnsupportedOperationException("not support byte array addition operation");
+    }
+
+    @Override public StringLinkedList operate(StringLinkedList newValue, StringLinkedList oldValue) {
+        throw new UnsupportedOperationException("not support string list addition operation");
+    }
+
+    @Override public LongLinkedList operate(LongLinkedList newValue, LongLinkedList oldValue) {
+        throw new UnsupportedOperationException("not support long list addition operation");
+    }
+
+    @Override public IntegerLinkedList operate(IntegerLinkedList newValue, IntegerLinkedList oldValue) {
+        throw new UnsupportedOperationException("not support integer list addition operation");
+    }
+
+    @Override public DoubleLinkedList operate(DoubleLinkedList newValue, DoubleLinkedList oldValue) {
+        throw new UnsupportedOperationException("not support double list addition operation");
     }
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/CoverMergeOperation.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/CoverMergeOperation.java
index 5df0e1b..c49bd02 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/CoverMergeOperation.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/CoverMergeOperation.java
@@ -18,12 +18,13 @@
 
 package org.apache.skywalking.apm.collector.core.data.operator;
 
-import org.apache.skywalking.apm.collector.core.data.MergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
 
 /**
  * @author peng-yongsheng
  */
 public class CoverMergeOperation implements MergeOperation {
+
     @Override public String operate(String newValue, String oldValue) {
         return newValue;
     }
@@ -43,4 +44,20 @@ public class CoverMergeOperation implements MergeOperation {
     @Override public byte[] operate(byte[] newValue, byte[] oldValue) {
         return newValue;
     }
+
+    @Override public StringLinkedList operate(StringLinkedList newValue, StringLinkedList oldValue) {
+        return newValue;
+    }
+
+    @Override public LongLinkedList operate(LongLinkedList newValue, LongLinkedList oldValue) {
+        return newValue;
+    }
+
+    @Override public IntegerLinkedList operate(IntegerLinkedList newValue, IntegerLinkedList oldValue) {
+        return newValue;
+    }
+
+    @Override public DoubleLinkedList operate(DoubleLinkedList newValue, DoubleLinkedList oldValue) {
+        return newValue;
+    }
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/MaxMergeOperation.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/MaxMergeOperation.java
index 49cf398..81399ca 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/MaxMergeOperation.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/MaxMergeOperation.java
@@ -18,7 +18,7 @@
 
 package org.apache.skywalking.apm.collector.core.data.operator;
 
-import org.apache.skywalking.apm.collector.core.data.MergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
 
 /**
  * @author peng-yongsheng
@@ -54,6 +54,22 @@ public class MaxMergeOperation implements MergeOperation {
     }
 
     @Override public byte[] operate(byte[] newValue, byte[] oldValue) {
-        throw new UnsupportedOperationException("not support byte maximum operation");
+        throw new UnsupportedOperationException("not support byte array maximum operation");
+    }
+
+    @Override public StringLinkedList operate(StringLinkedList newValue, StringLinkedList oldValue) {
+        throw new UnsupportedOperationException("not support string list maximum operation");
+    }
+
+    @Override public LongLinkedList operate(LongLinkedList newValue, LongLinkedList oldValue) {
+        throw new UnsupportedOperationException("not support long list maximum operation");
+    }
+
+    @Override public IntegerLinkedList operate(IntegerLinkedList newValue, IntegerLinkedList oldValue) {
+        throw new UnsupportedOperationException("not support integer list maximum operation");
+    }
+
+    @Override public DoubleLinkedList operate(DoubleLinkedList newValue, DoubleLinkedList oldValue) {
+        throw new UnsupportedOperationException("not support double list maximum operation");
     }
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/MinMergeOperation.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/MinMergeOperation.java
index 3cd47ef..4bc6022 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/MinMergeOperation.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/MinMergeOperation.java
@@ -18,7 +18,7 @@
 
 package org.apache.skywalking.apm.collector.core.data.operator;
 
-import org.apache.skywalking.apm.collector.core.data.MergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
 
 /**
  * @author peng-yongsheng
@@ -54,6 +54,22 @@ public class MinMergeOperation implements MergeOperation {
     }
 
     @Override public byte[] operate(byte[] newValue, byte[] oldValue) {
-        throw new UnsupportedOperationException("not support byte minimum operation");
+        throw new UnsupportedOperationException("not support byte array minimum operation");
+    }
+
+    @Override public StringLinkedList operate(StringLinkedList newValue, StringLinkedList oldValue) {
+        throw new UnsupportedOperationException("not support string list minimum operation");
+    }
+
+    @Override public LongLinkedList operate(LongLinkedList newValue, LongLinkedList oldValue) {
+        throw new UnsupportedOperationException("not support long list minimum operation");
+    }
+
+    @Override public IntegerLinkedList operate(IntegerLinkedList newValue, IntegerLinkedList oldValue) {
+        throw new UnsupportedOperationException("not support integer list minimum operation");
+    }
+
+    @Override public DoubleLinkedList operate(DoubleLinkedList newValue, DoubleLinkedList oldValue) {
+        throw new UnsupportedOperationException("not support double list minimum operation");
     }
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/NonMergeOperation.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/NonMergeOperation.java
index 42c778c..d62d11f 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/NonMergeOperation.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/NonMergeOperation.java
@@ -18,7 +18,7 @@
 
 package org.apache.skywalking.apm.collector.core.data.operator;
 
-import org.apache.skywalking.apm.collector.core.data.MergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
 
 /**
  * @author peng-yongsheng
@@ -43,4 +43,20 @@ public class NonMergeOperation implements MergeOperation {
     @Override public byte[] operate(byte[] newValue, byte[] oldValue) {
         return oldValue;
     }
+
+    @Override public StringLinkedList operate(StringLinkedList newValue, StringLinkedList oldValue) {
+        return oldValue;
+    }
+
+    @Override public LongLinkedList operate(LongLinkedList newValue, LongLinkedList oldValue) {
+        return oldValue;
+    }
+
+    @Override public IntegerLinkedList operate(IntegerLinkedList newValue, IntegerLinkedList oldValue) {
+        return oldValue;
+    }
+
+    @Override public DoubleLinkedList operate(DoubleLinkedList newValue, DoubleLinkedList oldValue) {
+        return oldValue;
+    }
 }
diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/service/GRPCRemoteDeserializeService.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/service/GRPCRemoteDeserializeService.java
index 47aecad..e2fe21a 100644
--- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/service/GRPCRemoteDeserializeService.java
+++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/service/GRPCRemoteDeserializeService.java
@@ -31,14 +31,33 @@ public class GRPCRemoteDeserializeService implements RemoteDeserializeService<Re
         for (int i = 0; i < remoteData.getDataStringsCount(); i++) {
             data.setDataString(i, remoteData.getDataStrings(i));
         }
+
         for (int i = 0; i < remoteData.getDataIntegersCount(); i++) {
             data.setDataInteger(i, remoteData.getDataIntegers(i));
         }
+
         for (int i = 0; i < remoteData.getDataLongsCount(); i++) {
             data.setDataLong(i, remoteData.getDataLongs(i));
         }
+
         for (int i = 0; i < remoteData.getDataDoublesCount(); i++) {
             data.setDataDouble(i, remoteData.getDataDoubles(i));
         }
+
+        for (int i = 0; i < remoteData.getDataStringListsCount(); i++) {
+            data.getDataStringList(i).addAll(remoteData.getDataStringLists(i).getValueList());
+        }
+
+        for (int i = 0; i < remoteData.getDataIntegerListsCount(); i++) {
+            data.getDataIntegerList(i).addAll(remoteData.getDataIntegerLists(i).getValueList());
+        }
+
+        for (int i = 0; i < remoteData.getDataLongListsCount(); i++) {
+            data.getDataLongList(i).addAll(remoteData.getDataLongLists(i).getValueList());
+        }
+
+        for (int i = 0; i < remoteData.getDataDoubleListsCount(); i++) {
+            data.getDataDoubleList(i).addAll(remoteData.getDataDoubleLists(i).getValueList());
+        }
     }
 }
diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/service/GRPCRemoteSerializeService.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/service/GRPCRemoteSerializeService.java
index ea7396e..207bbdc 100644
--- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/service/GRPCRemoteSerializeService.java
+++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/service/GRPCRemoteSerializeService.java
@@ -18,9 +18,8 @@
 
 package org.apache.skywalking.apm.collector.remote.grpc.service;
 
-import org.apache.skywalking.apm.collector.core.util.Const;
-import org.apache.skywalking.apm.collector.core.util.StringUtils;
-import org.apache.skywalking.apm.collector.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.apm.collector.core.util.*;
+import org.apache.skywalking.apm.collector.remote.grpc.proto.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteSerializeService;
 
 /**
@@ -29,23 +28,51 @@ import org.apache.skywalking.apm.collector.remote.service.RemoteSerializeService
 public class GRPCRemoteSerializeService implements RemoteSerializeService<RemoteData.Builder> {
 
     @Override public RemoteData.Builder serialize(org.apache.skywalking.apm.collector.core.data.RemoteData data) {
-        RemoteData.Builder builder = RemoteData.newBuilder();
+        RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
         for (int i = 0; i < data.getDataStringsCount(); i++) {
             if (StringUtils.isNotEmpty(data.getDataString(i))) {
-                builder.addDataStrings(data.getDataString(i));
+                remoteBuilder.addDataStrings(data.getDataString(i));
             } else {
-                builder.addDataStrings(Const.EMPTY_STRING);
+                remoteBuilder.addDataStrings(Const.EMPTY_STRING);
             }
         }
+
         for (int i = 0; i < data.getDataIntegersCount(); i++) {
-            builder.addDataIntegers(data.getDataInteger(i));
+            remoteBuilder.addDataIntegers(data.getDataInteger(i));
         }
+
         for (int i = 0; i < data.getDataLongsCount(); i++) {
-            builder.addDataLongs(data.getDataLong(i));
+            remoteBuilder.addDataLongs(data.getDataLong(i));
         }
+
         for (int i = 0; i < data.getDataDoublesCount(); i++) {
-            builder.addDataDoubles(data.getDataDouble(i));
+            remoteBuilder.addDataDoubles(data.getDataDouble(i));
+        }
+
+        for (int i = 0; i < data.getDataStringListsCount(); i++) {
+            StringList.Builder stringList = StringList.newBuilder();
+            data.getDataStringList(i).forEach(stringList::addValue);
+            remoteBuilder.addDataStringLists(stringList);
+        }
+
+        for (int i = 0; i < data.getDataLongListsCount(); i++) {
+            LongList.Builder longList = LongList.newBuilder();
+            data.getDataLongList(i).forEach(longList::addValue);
+            remoteBuilder.addDataLongLists(longList);
+        }
+
+        for (int i = 0; i < data.getDataIntegerListsCount(); i++) {
+            IntegerList.Builder integerList = IntegerList.newBuilder();
+            data.getDataIntegerList(i).forEach(integerList::addValue);
+            remoteBuilder.addDataIntegerLists(integerList);
         }
-        return builder;
+
+        for (int i = 0; i < data.getDataDoubleListsCount(); i++) {
+            DoubleList.Builder doubleList = DoubleList.newBuilder();
+            data.getDataDoubleList(i).forEach(doubleList::addValue);
+            remoteBuilder.addDataDoubleLists(doubleList);
+        }
+
+        return remoteBuilder;
     }
 }
diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/proto/RemoteCommonService.proto b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/proto/RemoteCommonService.proto
index 7b36435..04dcf46 100644
--- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/proto/RemoteCommonService.proto
+++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/proto/RemoteCommonService.proto
@@ -39,7 +39,27 @@ message RemoteData {
     repeated double dataDoubles = 3;
     repeated int32 dataIntegers = 4;
     repeated bool dataBooleans = 5;
+    repeated StringList dataStringLists = 6;
+    repeated LongList dataLongLists = 7;
+    repeated DoubleList dataDoubleLists = 8;
+    repeated IntegerList dataIntegerLists = 9;
 }
 
 message Empty {
 }
+
+message StringList {
+    repeated string value = 1;
+}
+
+message LongList {
+    repeated int64 value = 1;
+}
+
+message DoubleList {
+    repeated double value = 1;
+}
+
+message IntegerList {
+    repeated int32 value = 1;
+}
\ No newline at end of file
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarm.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarm.java
index 44bd3c5..af918f3 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarm.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarm.java
@@ -18,11 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
@@ -30,27 +28,23 @@ import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterServ
  */
 public class ApplicationAlarm extends StreamData implements Alarm {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ApplicationAlarmTable.ID, new NonMergeOperation()),
-        new Column(ApplicationAlarmTable.ALARM_CONTENT, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ApplicationAlarmTable.ID, new NonMergeOperation()),
+        new StringColumn(ApplicationAlarmTable.ALARM_CONTENT, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ApplicationAlarmTable.LAST_TIME_BUCKET, new CoverMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ApplicationAlarmTable.LAST_TIME_BUCKET, new CoverMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ApplicationAlarmTable.ALARM_TYPE, new NonMergeOperation()),
-        new Column(ApplicationAlarmTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(ApplicationAlarmTable.APPLICATION_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ApplicationAlarmTable.ALARM_TYPE, new NonMergeOperation()),
+        new IntegerColumn(ApplicationAlarmTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(ApplicationAlarmTable.APPLICATION_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ApplicationAlarm() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarmList.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarmList.java
index 3561e57..d9627cc 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarmList.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarmList.java
@@ -18,11 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
@@ -30,28 +28,24 @@ import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterServ
  */
 public class ApplicationAlarmList extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ApplicationAlarmListTable.ID, new NonMergeOperation()),
-        new Column(ApplicationAlarmListTable.METRIC_ID, new NonMergeOperation()),
-        new Column(ApplicationAlarmListTable.ALARM_CONTENT, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ApplicationAlarmListTable.ID, new NonMergeOperation()),
+        new StringColumn(ApplicationAlarmListTable.METRIC_ID, new NonMergeOperation()),
+        new StringColumn(ApplicationAlarmListTable.ALARM_CONTENT, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ApplicationAlarmListTable.TIME_BUCKET, new NonMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ApplicationAlarmListTable.TIME_BUCKET, new NonMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ApplicationAlarmListTable.ALARM_TYPE, new NonMergeOperation()),
-        new Column(ApplicationAlarmListTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(ApplicationAlarmListTable.APPLICATION_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ApplicationAlarmListTable.ALARM_TYPE, new NonMergeOperation()),
+        new IntegerColumn(ApplicationAlarmListTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(ApplicationAlarmListTable.APPLICATION_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ApplicationAlarmList() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarm.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarm.java
index 8cdf2bb..8485720 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarm.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarm.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
 import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
 import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
@@ -27,28 +28,24 @@ import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterServ
  */
 public class ApplicationReferenceAlarm extends StreamData implements Alarm {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ApplicationReferenceAlarmTable.ID, new NonMergeOperation()),
-        new Column(ApplicationReferenceAlarmTable.ALARM_CONTENT, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ApplicationReferenceAlarmTable.ID, new NonMergeOperation()),
+        new StringColumn(ApplicationReferenceAlarmTable.ALARM_CONTENT, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ApplicationReferenceAlarmTable.LAST_TIME_BUCKET, new CoverMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ApplicationReferenceAlarmTable.LAST_TIME_BUCKET, new CoverMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ApplicationReferenceAlarmTable.ALARM_TYPE, new NonMergeOperation()),
-        new Column(ApplicationReferenceAlarmTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(ApplicationReferenceAlarmTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
-        new Column(ApplicationReferenceAlarmTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ApplicationReferenceAlarmTable.ALARM_TYPE, new NonMergeOperation()),
+        new IntegerColumn(ApplicationReferenceAlarmTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(ApplicationReferenceAlarmTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(ApplicationReferenceAlarmTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ApplicationReferenceAlarm() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarmList.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarmList.java
index 61ca181..6ff140a 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarmList.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarmList.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
 import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
 import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
@@ -27,28 +28,24 @@ import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterServ
  */
 public class ApplicationReferenceAlarmList extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ApplicationReferenceAlarmListTable.ID, new NonMergeOperation()),
-        new Column(ApplicationReferenceAlarmListTable.ALARM_CONTENT, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ApplicationReferenceAlarmListTable.ID, new NonMergeOperation()),
+        new StringColumn(ApplicationReferenceAlarmListTable.ALARM_CONTENT, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ApplicationReferenceAlarmListTable.TIME_BUCKET, new NonMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ApplicationReferenceAlarmListTable.TIME_BUCKET, new NonMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ApplicationReferenceAlarmListTable.ALARM_TYPE, new NonMergeOperation()),
-        new Column(ApplicationReferenceAlarmListTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(ApplicationReferenceAlarmListTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
-        new Column(ApplicationReferenceAlarmListTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ApplicationReferenceAlarmListTable.ALARM_TYPE, new NonMergeOperation()),
+        new IntegerColumn(ApplicationReferenceAlarmListTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(ApplicationReferenceAlarmListTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(ApplicationReferenceAlarmListTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ApplicationReferenceAlarmList() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarm.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarm.java
index e4e67aa..cd3ab1d 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarm.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarm.java
@@ -18,11 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
@@ -30,28 +28,24 @@ import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterServ
  */
 public class InstanceAlarm extends StreamData implements Alarm {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(InstanceAlarmTable.ID, new NonMergeOperation()),
-        new Column(InstanceAlarmTable.ALARM_CONTENT, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(InstanceAlarmTable.ID, new NonMergeOperation()),
+        new StringColumn(InstanceAlarmTable.ALARM_CONTENT, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(InstanceAlarmTable.LAST_TIME_BUCKET, new CoverMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(InstanceAlarmTable.LAST_TIME_BUCKET, new CoverMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(InstanceAlarmTable.ALARM_TYPE, new NonMergeOperation()),
-        new Column(InstanceAlarmTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(InstanceAlarmTable.APPLICATION_ID, new NonMergeOperation()),
-        new Column(InstanceAlarmTable.INSTANCE_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(InstanceAlarmTable.ALARM_TYPE, new NonMergeOperation()),
+        new IntegerColumn(InstanceAlarmTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(InstanceAlarmTable.APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(InstanceAlarmTable.INSTANCE_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public InstanceAlarm() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarmList.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarmList.java
index 37fdda5..ef8f4bc 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarmList.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarmList.java
@@ -18,11 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
@@ -30,28 +28,24 @@ import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterServ
  */
 public class InstanceAlarmList extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(InstanceAlarmListTable.ID, new NonMergeOperation()),
-        new Column(InstanceAlarmListTable.ALARM_CONTENT, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(InstanceAlarmListTable.ID, new NonMergeOperation()),
+        new StringColumn(InstanceAlarmListTable.ALARM_CONTENT, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(InstanceAlarmListTable.TIME_BUCKET, new NonMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(InstanceAlarmListTable.TIME_BUCKET, new NonMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(InstanceAlarmListTable.ALARM_TYPE, new NonMergeOperation()),
-        new Column(InstanceAlarmListTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(InstanceAlarmListTable.APPLICATION_ID, new NonMergeOperation()),
-        new Column(InstanceAlarmListTable.INSTANCE_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(InstanceAlarmListTable.ALARM_TYPE, new NonMergeOperation()),
+        new IntegerColumn(InstanceAlarmListTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(InstanceAlarmListTable.APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(InstanceAlarmListTable.INSTANCE_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public InstanceAlarmList() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarm.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarm.java
index b8cf1af..ca4a883 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarm.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarm.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
 import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
 import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
@@ -27,30 +28,26 @@ import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterServ
  */
 public class InstanceReferenceAlarm extends StreamData implements Alarm {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(InstanceReferenceAlarmTable.ID, new NonMergeOperation()),
-        new Column(InstanceReferenceAlarmTable.ALARM_CONTENT, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(InstanceReferenceAlarmTable.ID, new NonMergeOperation()),
+        new StringColumn(InstanceReferenceAlarmTable.ALARM_CONTENT, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(InstanceReferenceAlarmTable.LAST_TIME_BUCKET, new CoverMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(InstanceReferenceAlarmTable.LAST_TIME_BUCKET, new CoverMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(InstanceReferenceAlarmTable.ALARM_TYPE, new NonMergeOperation()),
-        new Column(InstanceReferenceAlarmTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(InstanceReferenceAlarmTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
-        new Column(InstanceReferenceAlarmTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
-        new Column(InstanceReferenceAlarmTable.FRONT_INSTANCE_ID, new NonMergeOperation()),
-        new Column(InstanceReferenceAlarmTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(InstanceReferenceAlarmTable.ALARM_TYPE, new NonMergeOperation()),
+        new IntegerColumn(InstanceReferenceAlarmTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(InstanceReferenceAlarmTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(InstanceReferenceAlarmTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(InstanceReferenceAlarmTable.FRONT_INSTANCE_ID, new NonMergeOperation()),
+        new IntegerColumn(InstanceReferenceAlarmTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public InstanceReferenceAlarm() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarmList.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarmList.java
index 599669d..4814a0b 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarmList.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarmList.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
 import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
 import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
@@ -27,30 +28,26 @@ import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterServ
  */
 public class InstanceReferenceAlarmList extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(InstanceReferenceAlarmListTable.ID, new NonMergeOperation()),
-        new Column(InstanceReferenceAlarmListTable.ALARM_CONTENT, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(InstanceReferenceAlarmListTable.ID, new NonMergeOperation()),
+        new StringColumn(InstanceReferenceAlarmListTable.ALARM_CONTENT, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(InstanceReferenceAlarmListTable.TIME_BUCKET, new NonMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(InstanceReferenceAlarmListTable.TIME_BUCKET, new NonMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(InstanceReferenceAlarmListTable.ALARM_TYPE, new NonMergeOperation()),
-        new Column(InstanceReferenceAlarmListTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(InstanceReferenceAlarmListTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
-        new Column(InstanceReferenceAlarmListTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
-        new Column(InstanceReferenceAlarmListTable.FRONT_INSTANCE_ID, new NonMergeOperation()),
-        new Column(InstanceReferenceAlarmListTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(InstanceReferenceAlarmListTable.ALARM_TYPE, new NonMergeOperation()),
+        new IntegerColumn(InstanceReferenceAlarmListTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(InstanceReferenceAlarmListTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(InstanceReferenceAlarmListTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(InstanceReferenceAlarmListTable.FRONT_INSTANCE_ID, new NonMergeOperation()),
+        new IntegerColumn(InstanceReferenceAlarmListTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public InstanceReferenceAlarmList() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarm.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarm.java
index 6db644e..f3dc1dc 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarm.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarm.java
@@ -18,11 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
@@ -30,29 +28,25 @@ import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterServ
  */
 public class ServiceAlarm extends StreamData implements Alarm {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ServiceAlarmTable.ID, new NonMergeOperation()),
-        new Column(ServiceAlarmTable.ALARM_CONTENT, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ServiceAlarmTable.ID, new NonMergeOperation()),
+        new StringColumn(ServiceAlarmTable.ALARM_CONTENT, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ServiceAlarmTable.LAST_TIME_BUCKET, new CoverMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ServiceAlarmTable.LAST_TIME_BUCKET, new CoverMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ServiceAlarmTable.ALARM_TYPE, new NonMergeOperation()),
-        new Column(ServiceAlarmTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(ServiceAlarmTable.APPLICATION_ID, new NonMergeOperation()),
-        new Column(ServiceAlarmTable.INSTANCE_ID, new NonMergeOperation()),
-        new Column(ServiceAlarmTable.SERVICE_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ServiceAlarmTable.ALARM_TYPE, new NonMergeOperation()),
+        new IntegerColumn(ServiceAlarmTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(ServiceAlarmTable.APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceAlarmTable.INSTANCE_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceAlarmTable.SERVICE_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ServiceAlarm() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarmList.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarmList.java
index 252bfa6..98bb5aa 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarmList.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarmList.java
@@ -18,11 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
@@ -30,29 +28,25 @@ import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterServ
  */
 public class ServiceAlarmList extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ServiceAlarmListTable.ID, new NonMergeOperation()),
-        new Column(ServiceAlarmListTable.ALARM_CONTENT, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ServiceAlarmListTable.ID, new NonMergeOperation()),
+        new StringColumn(ServiceAlarmListTable.ALARM_CONTENT, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ServiceAlarmListTable.TIME_BUCKET, new NonMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ServiceAlarmListTable.TIME_BUCKET, new NonMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ServiceAlarmListTable.ALARM_TYPE, new NonMergeOperation()),
-        new Column(ServiceAlarmListTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(ServiceAlarmListTable.APPLICATION_ID, new NonMergeOperation()),
-        new Column(ServiceAlarmListTable.INSTANCE_ID, new NonMergeOperation()),
-        new Column(ServiceAlarmListTable.SERVICE_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ServiceAlarmListTable.ALARM_TYPE, new NonMergeOperation()),
+        new IntegerColumn(ServiceAlarmListTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(ServiceAlarmListTable.APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceAlarmListTable.INSTANCE_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceAlarmListTable.SERVICE_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ServiceAlarmList() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarm.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarm.java
index 62017fc..b4483f8 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarm.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarm.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
 import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
 import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
@@ -27,32 +28,28 @@ import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterServ
  */
 public class ServiceReferenceAlarm extends StreamData implements Alarm {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ServiceReferenceAlarmTable.ID, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmTable.ALARM_CONTENT, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ServiceReferenceAlarmTable.ID, new NonMergeOperation()),
+        new StringColumn(ServiceReferenceAlarmTable.ALARM_CONTENT, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ServiceReferenceAlarmTable.LAST_TIME_BUCKET, new CoverMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ServiceReferenceAlarmTable.LAST_TIME_BUCKET, new CoverMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ServiceReferenceAlarmTable.ALARM_TYPE, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmTable.FRONT_INSTANCE_ID, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmTable.FRONT_SERVICE_ID, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmTable.BEHIND_SERVICE_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ServiceReferenceAlarmTable.ALARM_TYPE, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceAlarmTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceAlarmTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceAlarmTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceAlarmTable.FRONT_INSTANCE_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceAlarmTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceAlarmTable.FRONT_SERVICE_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceAlarmTable.BEHIND_SERVICE_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ServiceReferenceAlarm() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarmList.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarmList.java
index d13633f..d42b240 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarmList.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarmList.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
 import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
 import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
@@ -27,32 +28,28 @@ import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterServ
  */
 public class ServiceReferenceAlarmList extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ServiceReferenceAlarmListTable.ID, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmListTable.ALARM_CONTENT, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ServiceReferenceAlarmListTable.ID, new NonMergeOperation()),
+        new StringColumn(ServiceReferenceAlarmListTable.ALARM_CONTENT, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ServiceReferenceAlarmListTable.TIME_BUCKET, new NonMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ServiceReferenceAlarmListTable.TIME_BUCKET, new NonMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ServiceReferenceAlarmListTable.ALARM_TYPE, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmListTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmListTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmListTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmListTable.FRONT_INSTANCE_ID, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmListTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmListTable.FRONT_SERVICE_ID, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmListTable.BEHIND_SERVICE_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ServiceReferenceAlarmListTable.ALARM_TYPE, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceAlarmListTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceAlarmListTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceAlarmListTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceAlarmListTable.FRONT_INSTANCE_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceAlarmListTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceAlarmListTable.FRONT_SERVICE_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceAlarmListTable.BEHIND_SERVICE_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ServiceReferenceAlarmList() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationComponent.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationComponent.java
index f053c5d..31c44a3 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationComponent.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationComponent.java
@@ -18,11 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.application;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
@@ -30,26 +28,22 @@ import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterServ
  */
 public class ApplicationComponent extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ApplicationComponentTable.ID, new NonMergeOperation()),
-        new Column(ApplicationComponentTable.METRIC_ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ApplicationComponentTable.ID, new NonMergeOperation()),
+        new StringColumn(ApplicationComponentTable.METRIC_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ApplicationComponentTable.TIME_BUCKET, new CoverMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ApplicationComponentTable.TIME_BUCKET, new CoverMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ApplicationComponentTable.COMPONENT_ID, new CoverMergeOperation()),
-        new Column(ApplicationComponentTable.APPLICATION_ID, new CoverMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ApplicationComponentTable.COMPONENT_ID, new CoverMergeOperation()),
+        new IntegerColumn(ApplicationComponentTable.APPLICATION_ID, new CoverMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ApplicationComponent() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMapping.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMapping.java
index 249c192..811b051 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMapping.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMapping.java
@@ -18,11 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.application;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
@@ -30,26 +28,22 @@ import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterServ
  */
 public class ApplicationMapping extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ApplicationMappingTable.ID, new NonMergeOperation()),
-        new Column(ApplicationMappingTable.METRIC_ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ApplicationMappingTable.ID, new NonMergeOperation()),
+        new StringColumn(ApplicationMappingTable.METRIC_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ApplicationMappingTable.TIME_BUCKET, new CoverMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ApplicationMappingTable.TIME_BUCKET, new CoverMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ApplicationMappingTable.APPLICATION_ID, new CoverMergeOperation()),
-        new Column(ApplicationMappingTable.MAPPING_APPLICATION_ID, new CoverMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ApplicationMappingTable.APPLICATION_ID, new CoverMergeOperation()),
+        new IntegerColumn(ApplicationMappingTable.MAPPING_APPLICATION_ID, new CoverMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ApplicationMapping() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMetric.java
index d62e5d5..694ddef 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMetric.java
@@ -18,12 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.application;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.FormulaOperation;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.AddMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 import org.apache.skywalking.apm.collector.storage.table.Metric;
 
@@ -32,44 +29,41 @@ import org.apache.skywalking.apm.collector.storage.table.Metric;
  */
 public class ApplicationMetric extends StreamData implements Metric {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ApplicationMetricTable.ID, new NonMergeOperation()),
-        new Column(ApplicationMetricTable.METRIC_ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ApplicationMetricTable.ID, new NonMergeOperation()),
+        new StringColumn(ApplicationMetricTable.METRIC_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ApplicationMetricTable.TIME_BUCKET, new NonMergeOperation()),
-
-        new Column(ApplicationMetricTable.TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(ApplicationMetricTable.TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(ApplicationMetricTable.TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(ApplicationMetricTable.TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(ApplicationMetricTable.TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new TransactionAverageDurationFormulaOperation()),
-        new Column(ApplicationMetricTable.BUSINESS_TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(ApplicationMetricTable.BUSINESS_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(ApplicationMetricTable.BUSINESS_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(ApplicationMetricTable.BUSINESS_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(ApplicationMetricTable.BUSINESS_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new BusinessTransactionAverageDurationFormulaOperation()),
-        new Column(ApplicationMetricTable.MQ_TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(ApplicationMetricTable.MQ_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(ApplicationMetricTable.MQ_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(ApplicationMetricTable.MQ_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(ApplicationMetricTable.MQ_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new MqTransactionAverageDurationFormulaOperation()),
-        new Column(ApplicationMetricTable.SATISFIED_COUNT, new AddMergeOperation()),
-        new Column(ApplicationMetricTable.TOLERATING_COUNT, new AddMergeOperation()),
-        new Column(ApplicationMetricTable.FRUSTRATED_COUNT, new AddMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ApplicationMetricTable.TIME_BUCKET, new NonMergeOperation()),
+
+        new LongColumn(ApplicationMetricTable.TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(ApplicationMetricTable.TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(ApplicationMetricTable.TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ApplicationMetricTable.TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ApplicationMetricTable.TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new TransactionAverageDurationFormulaOperation()),
+        new LongColumn(ApplicationMetricTable.BUSINESS_TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(ApplicationMetricTable.BUSINESS_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(ApplicationMetricTable.BUSINESS_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ApplicationMetricTable.BUSINESS_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ApplicationMetricTable.BUSINESS_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new BusinessTransactionAverageDurationFormulaOperation()),
+        new LongColumn(ApplicationMetricTable.MQ_TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(ApplicationMetricTable.MQ_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(ApplicationMetricTable.MQ_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ApplicationMetricTable.MQ_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ApplicationMetricTable.MQ_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new MqTransactionAverageDurationFormulaOperation()),
+        new LongColumn(ApplicationMetricTable.SATISFIED_COUNT, new AddMergeOperation()),
+        new LongColumn(ApplicationMetricTable.TOLERATING_COUNT, new AddMergeOperation()),
+        new LongColumn(ApplicationMetricTable.FRUSTRATED_COUNT, new AddMergeOperation()),
     };
-    private static final Column[] DOUBLE_COLUMNS = {};
 
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ApplicationMetricTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(ApplicationMetricTable.APPLICATION_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ApplicationMetricTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(ApplicationMetricTable.APPLICATION_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ApplicationMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationReferenceMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationReferenceMetric.java
index 8124ef7..e0f994d 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationReferenceMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationReferenceMetric.java
@@ -18,12 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.application;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.FormulaOperation;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.AddMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 import org.apache.skywalking.apm.collector.storage.table.Metric;
 
@@ -32,47 +29,43 @@ import org.apache.skywalking.apm.collector.storage.table.Metric;
  */
 public class ApplicationReferenceMetric extends StreamData implements Metric {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ApplicationReferenceMetricTable.ID, new NonMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.METRIC_ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ApplicationReferenceMetricTable.ID, new NonMergeOperation()),
+        new StringColumn(ApplicationReferenceMetricTable.METRIC_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ApplicationReferenceMetricTable.TIME_BUCKET, new NonMergeOperation()),
-
-        new Column(ApplicationReferenceMetricTable.TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new TransactionAverageDurationFormulaOperation()),
-        new Column(ApplicationReferenceMetricTable.BUSINESS_TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.BUSINESS_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.BUSINESS_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.BUSINESS_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.BUSINESS_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new BusinessTransactionAverageDurationFormulaOperation()),
-        new Column(ApplicationReferenceMetricTable.MQ_TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.MQ_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.MQ_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.MQ_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.MQ_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new MqTransactionAverageDurationFormulaOperation()),
-
-        new Column(ApplicationReferenceMetricTable.SATISFIED_COUNT, new AddMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.TOLERATING_COUNT, new AddMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.FRUSTRATED_COUNT, new AddMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ApplicationReferenceMetricTable.TIME_BUCKET, new NonMergeOperation()),
+
+        new LongColumn(ApplicationReferenceMetricTable.TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new TransactionAverageDurationFormulaOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.BUSINESS_TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.BUSINESS_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.BUSINESS_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.BUSINESS_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.BUSINESS_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new BusinessTransactionAverageDurationFormulaOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.MQ_TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.MQ_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.MQ_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.MQ_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.MQ_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new MqTransactionAverageDurationFormulaOperation()),
+
+        new LongColumn(ApplicationReferenceMetricTable.SATISFIED_COUNT, new AddMergeOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.TOLERATING_COUNT, new AddMergeOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.FRUSTRATED_COUNT, new AddMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ApplicationReferenceMetricTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ApplicationReferenceMetricTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(ApplicationReferenceMetricTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(ApplicationReferenceMetricTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ApplicationReferenceMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/GlobalTrace.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/GlobalTrace.java
index 4184d24..44bb44c 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/GlobalTrace.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/GlobalTrace.java
@@ -18,35 +18,27 @@
 
 package org.apache.skywalking.apm.collector.storage.table.global;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 
 /**
  * @author peng-yongsheng
  */
 public class GlobalTrace extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(GlobalTraceTable.ID, new NonMergeOperation()),
-        new Column(GlobalTraceTable.SEGMENT_ID, new CoverMergeOperation()),
-        new Column(GlobalTraceTable.TRACE_ID, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(GlobalTraceTable.ID, new NonMergeOperation()),
+        new StringColumn(GlobalTraceTable.SEGMENT_ID, new CoverMergeOperation()),
+        new StringColumn(GlobalTraceTable.TRACE_ID, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(GlobalTraceTable.TIME_BUCKET, new CoverMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(GlobalTraceTable.TIME_BUCKET, new CoverMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-    };
-
-    private static final Column[] BYTE_COLUMNS = {};
-
     public GlobalTrace() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, new IntegerColumn[0], new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/ResponseTimeDistribution.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/ResponseTimeDistribution.java
index c4e8e6d..1974509 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/ResponseTimeDistribution.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/ResponseTimeDistribution.java
@@ -18,12 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.global;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.AddMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
@@ -31,28 +28,24 @@ import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterServ
  */
 public class ResponseTimeDistribution extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ResponseTimeDistributionTable.ID, new NonMergeOperation()),
-        new Column(ResponseTimeDistributionTable.METRIC_ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ResponseTimeDistributionTable.ID, new NonMergeOperation()),
+        new StringColumn(ResponseTimeDistributionTable.METRIC_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ResponseTimeDistributionTable.TIME_BUCKET, new CoverMergeOperation()),
-        new Column(ResponseTimeDistributionTable.CALLS, new AddMergeOperation()),
-        new Column(ResponseTimeDistributionTable.ERROR_CALLS, new AddMergeOperation()),
-        new Column(ResponseTimeDistributionTable.SUCCESS_CALLS, new AddMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ResponseTimeDistributionTable.TIME_BUCKET, new CoverMergeOperation()),
+        new LongColumn(ResponseTimeDistributionTable.CALLS, new AddMergeOperation()),
+        new LongColumn(ResponseTimeDistributionTable.ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(ResponseTimeDistributionTable.SUCCESS_CALLS, new AddMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ResponseTimeDistributionTable.STEP, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ResponseTimeDistributionTable.STEP, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ResponseTimeDistribution() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMapping.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMapping.java
index 0b4677d..5183663 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMapping.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMapping.java
@@ -18,11 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.instance;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
@@ -30,27 +28,23 @@ import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterServ
  */
 public class InstanceMapping extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(InstanceMappingTable.ID, new NonMergeOperation()),
-        new Column(InstanceMappingTable.METRIC_ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(InstanceMappingTable.ID, new NonMergeOperation()),
+        new StringColumn(InstanceMappingTable.METRIC_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(InstanceMappingTable.TIME_BUCKET, new CoverMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(InstanceMappingTable.TIME_BUCKET, new CoverMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(InstanceMappingTable.APPLICATION_ID, new CoverMergeOperation()),
-        new Column(InstanceMappingTable.INSTANCE_ID, new CoverMergeOperation()),
-        new Column(InstanceMappingTable.ADDRESS_ID, new CoverMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(InstanceMappingTable.APPLICATION_ID, new CoverMergeOperation()),
+        new IntegerColumn(InstanceMappingTable.INSTANCE_ID, new CoverMergeOperation()),
+        new IntegerColumn(InstanceMappingTable.ADDRESS_ID, new CoverMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public InstanceMapping() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMetric.java
index 384b943..8ffe0d8 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMetric.java
@@ -18,13 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.instance;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.FormulaOperation;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.AddMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 import org.apache.skywalking.apm.collector.storage.table.Metric;
 
@@ -33,43 +29,39 @@ import org.apache.skywalking.apm.collector.storage.table.Metric;
  */
 public class InstanceMetric extends StreamData implements Metric {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(InstanceMetricTable.ID, new NonMergeOperation()),
-        new Column(InstanceMetricTable.METRIC_ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(InstanceMetricTable.ID, new NonMergeOperation()),
+        new StringColumn(InstanceMetricTable.METRIC_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(InstanceMetricTable.TIME_BUCKET, new NonMergeOperation()),
-
-        new Column(InstanceMetricTable.TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(InstanceMetricTable.TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(InstanceMetricTable.TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(InstanceMetricTable.TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(InstanceMetricTable.TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new TransactionAverageDurationFormulaOperation()),
-        new Column(InstanceMetricTable.BUSINESS_TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(InstanceMetricTable.BUSINESS_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(InstanceMetricTable.BUSINESS_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(InstanceMetricTable.BUSINESS_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(InstanceMetricTable.BUSINESS_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new BusinessTransactionAverageDurationFormulaOperation()),
-        new Column(InstanceMetricTable.MQ_TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(InstanceMetricTable.MQ_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(InstanceMetricTable.MQ_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(InstanceMetricTable.MQ_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(InstanceMetricTable.MQ_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new MqTransactionAverageDurationFormulaOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(InstanceMetricTable.TIME_BUCKET, new NonMergeOperation()),
+
+        new LongColumn(InstanceMetricTable.TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(InstanceMetricTable.TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(InstanceMetricTable.TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(InstanceMetricTable.TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(InstanceMetricTable.TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new TransactionAverageDurationFormulaOperation()),
+        new LongColumn(InstanceMetricTable.BUSINESS_TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(InstanceMetricTable.BUSINESS_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(InstanceMetricTable.BUSINESS_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(InstanceMetricTable.BUSINESS_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(InstanceMetricTable.BUSINESS_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new BusinessTransactionAverageDurationFormulaOperation()),
+        new LongColumn(InstanceMetricTable.MQ_TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(InstanceMetricTable.MQ_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(InstanceMetricTable.MQ_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(InstanceMetricTable.MQ_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(InstanceMetricTable.MQ_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new MqTransactionAverageDurationFormulaOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(InstanceMetricTable.SOURCE_VALUE, new CoverMergeOperation()),
-        new Column(InstanceMetricTable.APPLICATION_ID, new CoverMergeOperation()),
-        new Column(InstanceMetricTable.INSTANCE_ID, new CoverMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(InstanceMetricTable.SOURCE_VALUE, new CoverMergeOperation()),
+        new IntegerColumn(InstanceMetricTable.APPLICATION_ID, new CoverMergeOperation()),
+        new IntegerColumn(InstanceMetricTable.INSTANCE_ID, new CoverMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public InstanceMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java
index 75c26f8..822d8e7 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java
@@ -18,12 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.instance;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.FormulaOperation;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.AddMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 import org.apache.skywalking.apm.collector.storage.table.Metric;
 
@@ -32,45 +29,41 @@ import org.apache.skywalking.apm.collector.storage.table.Metric;
  */
 public class InstanceReferenceMetric extends StreamData implements Metric {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(InstanceReferenceMetricTable.ID, new NonMergeOperation()),
-        new Column(InstanceReferenceMetricTable.METRIC_ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(InstanceReferenceMetricTable.ID, new NonMergeOperation()),
+        new StringColumn(InstanceReferenceMetricTable.METRIC_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(InstanceReferenceMetricTable.TIME_BUCKET, new NonMergeOperation()),
-
-        new Column(InstanceReferenceMetricTable.TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(InstanceReferenceMetricTable.TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(InstanceReferenceMetricTable.TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(InstanceReferenceMetricTable.TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(InstanceReferenceMetricTable.TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new TransactionAverageDurationFormulaOperation()),
-        new Column(InstanceReferenceMetricTable.BUSINESS_TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(InstanceReferenceMetricTable.BUSINESS_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(InstanceReferenceMetricTable.BUSINESS_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(InstanceReferenceMetricTable.BUSINESS_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(InstanceReferenceMetricTable.BUSINESS_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new BusinessTransactionAverageDurationFormulaOperation()),
-        new Column(InstanceReferenceMetricTable.MQ_TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(InstanceReferenceMetricTable.MQ_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(InstanceReferenceMetricTable.MQ_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(InstanceReferenceMetricTable.MQ_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(InstanceReferenceMetricTable.MQ_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new MqTransactionAverageDurationFormulaOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(InstanceReferenceMetricTable.TIME_BUCKET, new NonMergeOperation()),
+
+        new LongColumn(InstanceReferenceMetricTable.TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(InstanceReferenceMetricTable.TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(InstanceReferenceMetricTable.TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(InstanceReferenceMetricTable.TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(InstanceReferenceMetricTable.TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new TransactionAverageDurationFormulaOperation()),
+        new LongColumn(InstanceReferenceMetricTable.BUSINESS_TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(InstanceReferenceMetricTable.BUSINESS_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(InstanceReferenceMetricTable.BUSINESS_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(InstanceReferenceMetricTable.BUSINESS_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(InstanceReferenceMetricTable.BUSINESS_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new BusinessTransactionAverageDurationFormulaOperation()),
+        new LongColumn(InstanceReferenceMetricTable.MQ_TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(InstanceReferenceMetricTable.MQ_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(InstanceReferenceMetricTable.MQ_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(InstanceReferenceMetricTable.MQ_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(InstanceReferenceMetricTable.MQ_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new MqTransactionAverageDurationFormulaOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(InstanceReferenceMetricTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(InstanceReferenceMetricTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
-        new Column(InstanceReferenceMetricTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
-        new Column(InstanceReferenceMetricTable.FRONT_INSTANCE_ID, new NonMergeOperation()),
-        new Column(InstanceReferenceMetricTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(InstanceReferenceMetricTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(InstanceReferenceMetricTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(InstanceReferenceMetricTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(InstanceReferenceMetricTable.FRONT_INSTANCE_ID, new NonMergeOperation()),
+        new IntegerColumn(InstanceReferenceMetricTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public InstanceReferenceMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/CpuMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/CpuMetric.java
index 269352a..36ee0ee 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/CpuMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/CpuMetric.java
@@ -18,39 +18,35 @@
 
 package org.apache.skywalking.apm.collector.storage.table.jvm;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.AddMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 
 /**
  * @author peng-yongsheng
  */
 public class CpuMetric extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(CpuMetricTable.ID, new NonMergeOperation()),
-        new Column(CpuMetricTable.METRIC_ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(CpuMetricTable.ID, new NonMergeOperation()),
+        new StringColumn(CpuMetricTable.METRIC_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(CpuMetricTable.TIMES, new AddMergeOperation()),
-        new Column(CpuMetricTable.TIME_BUCKET, new CoverMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(CpuMetricTable.TIMES, new AddMergeOperation()),
+        new LongColumn(CpuMetricTable.TIME_BUCKET, new CoverMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {
-        new Column(CpuMetricTable.USAGE_PERCENT, new AddMergeOperation()),
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+        new DoubleColumn(CpuMetricTable.USAGE_PERCENT, new AddMergeOperation()),
     };
 
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(CpuMetricTable.INSTANCE_ID, new CoverMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(CpuMetricTable.INSTANCE_ID, new CoverMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public CpuMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/GCMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/GCMetric.java
index d032bd6..76d4cc6 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/GCMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/GCMetric.java
@@ -18,7 +18,8 @@
 
 package org.apache.skywalking.apm.collector.storage.table.jvm;
 
-import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.StreamData;
+import org.apache.skywalking.apm.collector.core.data.column.*;
 import org.apache.skywalking.apm.collector.core.data.operator.*;
 
 /**
@@ -26,30 +27,25 @@ import org.apache.skywalking.apm.collector.core.data.operator.*;
  */
 public class GCMetric extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(GCMetricTable.ID, new NonMergeOperation()),
-        new Column(GCMetricTable.METRIC_ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(GCMetricTable.ID, new NonMergeOperation()),
+        new StringColumn(GCMetricTable.METRIC_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(GCMetricTable.COUNT, new AddMergeOperation()),
-        new Column(GCMetricTable.TIMES, new AddMergeOperation()),
-        new Column(GCMetricTable.TIME_BUCKET, new CoverMergeOperation()),
-        new Column(GCMetricTable.DURATION, new AddMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(GCMetricTable.COUNT, new AddMergeOperation()),
+        new LongColumn(GCMetricTable.TIMES, new AddMergeOperation()),
+        new LongColumn(GCMetricTable.TIME_BUCKET, new CoverMergeOperation()),
+        new LongColumn(GCMetricTable.DURATION, new AddMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(GCMetricTable.INSTANCE_ID, new CoverMergeOperation()),
+        new IntegerColumn(GCMetricTable.PHRASE, new CoverMergeOperation()),
     };
 
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(GCMetricTable.INSTANCE_ID, new CoverMergeOperation()),
-        new Column(GCMetricTable.PHRASE, new CoverMergeOperation()),
-    };
-
-    private static final Column[] BYTE_COLUMNS = {};
-
     public GCMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryMetric.java
index 444924a..b37514a 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryMetric.java
@@ -18,45 +18,36 @@
 
 package org.apache.skywalking.apm.collector.storage.table.jvm;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.AddMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.MaxMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.MinMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 
 /**
  * @author peng-yongsheng
  */
 public class MemoryMetric extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(MemoryMetricTable.ID, new NonMergeOperation()),
-        new Column(MemoryMetricTable.METRIC_ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(MemoryMetricTable.ID, new NonMergeOperation()),
+        new StringColumn(MemoryMetricTable.METRIC_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(MemoryMetricTable.INIT, new MinMergeOperation()),
-        new Column(MemoryMetricTable.MAX, new MaxMergeOperation()),
-        new Column(MemoryMetricTable.USED, new AddMergeOperation()),
-        new Column(MemoryMetricTable.COMMITTED, new AddMergeOperation()),
-        new Column(MemoryMetricTable.TIMES, new AddMergeOperation()),
-        new Column(MemoryMetricTable.TIME_BUCKET, new NonMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(MemoryMetricTable.INIT, new MinMergeOperation()),
+        new LongColumn(MemoryMetricTable.MAX, new MaxMergeOperation()),
+        new LongColumn(MemoryMetricTable.USED, new AddMergeOperation()),
+        new LongColumn(MemoryMetricTable.COMMITTED, new AddMergeOperation()),
+        new LongColumn(MemoryMetricTable.TIMES, new AddMergeOperation()),
+        new LongColumn(MemoryMetricTable.TIME_BUCKET, new NonMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(MemoryMetricTable.INSTANCE_ID, new CoverMergeOperation()),
+        new IntegerColumn(MemoryMetricTable.IS_HEAP, new CoverMergeOperation()),
     };
 
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(MemoryMetricTable.INSTANCE_ID, new CoverMergeOperation()),
-        new Column(MemoryMetricTable.IS_HEAP, new CoverMergeOperation()),
-    };
-
-    private static final Column[] BYTE_COLUMNS = {};
-
     public MemoryMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryPoolMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryPoolMetric.java
index e47876b..5418136 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryPoolMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryPoolMetric.java
@@ -18,45 +18,36 @@
 
 package org.apache.skywalking.apm.collector.storage.table.jvm;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.AddMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.MaxMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.MinMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 
 /**
  * @author peng-yongsheng
  */
 public class MemoryPoolMetric extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(MemoryPoolMetricTable.ID, new NonMergeOperation()),
-        new Column(MemoryPoolMetricTable.METRIC_ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(MemoryPoolMetricTable.ID, new NonMergeOperation()),
+        new StringColumn(MemoryPoolMetricTable.METRIC_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(MemoryPoolMetricTable.INIT, new MinMergeOperation()),
-        new Column(MemoryPoolMetricTable.MAX, new MaxMergeOperation()),
-        new Column(MemoryPoolMetricTable.USED, new AddMergeOperation()),
-        new Column(MemoryPoolMetricTable.COMMITTED, new AddMergeOperation()),
-        new Column(MemoryPoolMetricTable.TIMES, new AddMergeOperation()),
-        new Column(MemoryPoolMetricTable.TIME_BUCKET, new NonMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(MemoryPoolMetricTable.INIT, new MinMergeOperation()),
+        new LongColumn(MemoryPoolMetricTable.MAX, new MaxMergeOperation()),
+        new LongColumn(MemoryPoolMetricTable.USED, new AddMergeOperation()),
+        new LongColumn(MemoryPoolMetricTable.COMMITTED, new AddMergeOperation()),
+        new LongColumn(MemoryPoolMetricTable.TIMES, new AddMergeOperation()),
+        new LongColumn(MemoryPoolMetricTable.TIME_BUCKET, new NonMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(MemoryPoolMetricTable.INSTANCE_ID, new NonMergeOperation()),
+        new IntegerColumn(MemoryPoolMetricTable.POOL_TYPE, new CoverMergeOperation()),
     };
 
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(MemoryPoolMetricTable.INSTANCE_ID, new NonMergeOperation()),
-        new Column(MemoryPoolMetricTable.POOL_TYPE, new CoverMergeOperation()),
-    };
-
-    private static final Column[] BYTE_COLUMNS = {};
-
     public MemoryPoolMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Application.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Application.java
index 930462c..928d032 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Application.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Application.java
@@ -18,11 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.register;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
@@ -30,25 +28,19 @@ import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterServ
  */
 public class Application extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ApplicationTable.ID, new NonMergeOperation()),
-        new Column(ApplicationTable.APPLICATION_CODE, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ApplicationTable.ID, new NonMergeOperation()),
+        new StringColumn(ApplicationTable.APPLICATION_CODE, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {};
-
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ApplicationTable.APPLICATION_ID, new CoverMergeOperation()),
-        new Column(ApplicationTable.ADDRESS_ID, new CoverMergeOperation()),
-        new Column(ApplicationTable.IS_ADDRESS, new CoverMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ApplicationTable.APPLICATION_ID, new CoverMergeOperation()),
+        new IntegerColumn(ApplicationTable.ADDRESS_ID, new CoverMergeOperation()),
+        new IntegerColumn(ApplicationTable.IS_ADDRESS, new CoverMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public Application() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, new LongColumn[0], INTEGER_COLUMNS, new DoubleColumn[0], new StringListColumn[0], new LongListColumn[0], new IntegerListColumn[0], new DoubleListColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Instance.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Instance.java
index 6e3d0b9..e8a78cb 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Instance.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Instance.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.apm.collector.storage.table.register;
 
 import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
 import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
@@ -27,31 +28,27 @@ import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterServ
  */
 public class Instance extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(InstanceTable.ID, new NonMergeOperation()),
-        new Column(InstanceTable.AGENT_UUID, new CoverMergeOperation()),
-        new Column(InstanceTable.OS_INFO, new CoverMergeOperation()),
-        new Column(InstanceTable.APPLICATION_CODE, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(InstanceTable.ID, new NonMergeOperation()),
+        new StringColumn(InstanceTable.AGENT_UUID, new CoverMergeOperation()),
+        new StringColumn(InstanceTable.OS_INFO, new CoverMergeOperation()),
+        new StringColumn(InstanceTable.APPLICATION_CODE, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(InstanceTable.REGISTER_TIME, new CoverMergeOperation()),
-        new Column(InstanceTable.HEARTBEAT_TIME, new MaxMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(InstanceTable.REGISTER_TIME, new CoverMergeOperation()),
+        new LongColumn(InstanceTable.HEARTBEAT_TIME, new MaxMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(InstanceTable.APPLICATION_ID, new CoverMergeOperation()),
-        new Column(InstanceTable.INSTANCE_ID, new CoverMergeOperation()),
-        new Column(InstanceTable.ADDRESS_ID, new CoverMergeOperation()),
-        new Column(InstanceTable.IS_ADDRESS, new CoverMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(InstanceTable.APPLICATION_ID, new CoverMergeOperation()),
+        new IntegerColumn(InstanceTable.INSTANCE_ID, new CoverMergeOperation()),
+        new IntegerColumn(InstanceTable.ADDRESS_ID, new CoverMergeOperation()),
+        new IntegerColumn(InstanceTable.IS_ADDRESS, new CoverMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public Instance() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/NetworkAddress.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/NetworkAddress.java
index 8e54a16..831f113 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/NetworkAddress.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/NetworkAddress.java
@@ -18,11 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.register;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
@@ -30,26 +28,20 @@ import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterServ
  */
 public class NetworkAddress extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(NetworkAddressTable.ID, new NonMergeOperation()),
-        new Column(NetworkAddressTable.NETWORK_ADDRESS, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(NetworkAddressTable.ID, new NonMergeOperation()),
+        new StringColumn(NetworkAddressTable.NETWORK_ADDRESS, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(NetworkAddressTable.ADDRESS_ID, new NonMergeOperation()),
+        new IntegerColumn(NetworkAddressTable.SRC_SPAN_LAYER, new CoverMergeOperation()),
+        new IntegerColumn(NetworkAddressTable.SERVER_TYPE, new CoverMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(NetworkAddressTable.ADDRESS_ID, new NonMergeOperation()),
-        new Column(NetworkAddressTable.SRC_SPAN_LAYER, new CoverMergeOperation()),
-        new Column(NetworkAddressTable.SERVER_TYPE, new CoverMergeOperation()),
-    };
-
-    private static final Column[] BYTE_COLUMNS = {};
-
     public NetworkAddress() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, new LongColumn[0], INTEGER_COLUMNS, new DoubleColumn[0], new StringListColumn[0], new LongListColumn[0], new IntegerListColumn[0], new DoubleListColumn[0]);
+
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/ServiceName.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/ServiceName.java
index 6f860b0..d04a7f0 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/ServiceName.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/ServiceName.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.apm.collector.storage.table.register;
 
 import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
 import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
@@ -27,28 +28,24 @@ import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterServ
  */
 public class ServiceName extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ServiceNameTable.ID, new NonMergeOperation()),
-        new Column(ServiceNameTable.SERVICE_NAME, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ServiceNameTable.ID, new NonMergeOperation()),
+        new StringColumn(ServiceNameTable.SERVICE_NAME, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ServiceNameTable.REGISTER_TIME, new NonMergeOperation()),
-        new Column(ServiceNameTable.HEARTBEAT_TIME, new MaxMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ServiceNameTable.REGISTER_TIME, new NonMergeOperation()),
+        new LongColumn(ServiceNameTable.HEARTBEAT_TIME, new MaxMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ServiceNameTable.APPLICATION_ID, new CoverMergeOperation()),
-        new Column(ServiceNameTable.SERVICE_ID, new CoverMergeOperation()),
-        new Column(ServiceNameTable.SRC_SPAN_TYPE, new CoverMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ServiceNameTable.APPLICATION_ID, new CoverMergeOperation()),
+        new IntegerColumn(ServiceNameTable.SERVICE_ID, new CoverMergeOperation()),
+        new IntegerColumn(ServiceNameTable.SRC_SPAN_TYPE, new CoverMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ServiceName() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/segment/Segment.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/segment/Segment.java
index f9ee9f7..3a411c7 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/segment/Segment.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/segment/Segment.java
@@ -18,35 +18,29 @@
 
 package org.apache.skywalking.apm.collector.storage.table.segment;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 
 /**
  * @author peng-yongsheng
  */
 public class Segment extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(SegmentTable.ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(SegmentTable.ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(SegmentTable.TIME_BUCKET, new NonMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(SegmentTable.TIME_BUCKET, new NonMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-    };
-
-    private static final Column[] BYTE_COLUMNS = {
-        new Column(SegmentTable.DATA_BINARY, new CoverMergeOperation()),
+    private static final ByteColumn[] BYTE_COLUMNS = {
+        new ByteColumn(SegmentTable.DATA_BINARY, new CoverMergeOperation()),
     };
 
     public Segment() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, new IntegerColumn[0], new DoubleColumn[0], BYTE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/segment/SegmentDuration.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/segment/SegmentDuration.java
index a309c2d..f3af686 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/segment/SegmentDuration.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/segment/SegmentDuration.java
@@ -18,41 +18,39 @@
 
 package org.apache.skywalking.apm.collector.storage.table.segment;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 
 /**
  * @author peng-yongsheng
  */
 public class SegmentDuration extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(SegmentDurationTable.ID, new NonMergeOperation()),
-        new Column(SegmentDurationTable.SEGMENT_ID, new CoverMergeOperation()),
-        new Column(SegmentDurationTable.SERVICE_NAME, new CoverMergeOperation()),
-        new Column(SegmentDurationTable.TRACE_ID, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(SegmentDurationTable.ID, new NonMergeOperation()),
+        new StringColumn(SegmentDurationTable.SEGMENT_ID, new CoverMergeOperation()),
+        new StringColumn(SegmentDurationTable.TRACE_ID, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(SegmentDurationTable.DURATION, new CoverMergeOperation()),
-        new Column(SegmentDurationTable.START_TIME, new CoverMergeOperation()),
-        new Column(SegmentDurationTable.END_TIME, new CoverMergeOperation()),
-        new Column(SegmentDurationTable.TIME_BUCKET, new CoverMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(SegmentDurationTable.DURATION, new CoverMergeOperation()),
+        new LongColumn(SegmentDurationTable.START_TIME, new CoverMergeOperation()),
+        new LongColumn(SegmentDurationTable.END_TIME, new CoverMergeOperation()),
+        new LongColumn(SegmentDurationTable.TIME_BUCKET, new CoverMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(SegmentDurationTable.APPLICATION_ID, new CoverMergeOperation()),
-        new Column(SegmentDurationTable.IS_ERROR, new CoverMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(SegmentDurationTable.APPLICATION_ID, new CoverMergeOperation()),
+        new IntegerColumn(SegmentDurationTable.IS_ERROR, new CoverMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
+    private static final StringListColumn[] STRING_LIST_COLUMNS = {
+        new StringListColumn(SegmentDurationTable.SERVICE_NAME, new CoverMergeOperation()),
+    };
 
     public SegmentDuration() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0], STRING_LIST_COLUMNS, new LongListColumn[0], new IntegerListColumn[0], new DoubleListColumn[0]);
     }
 
     @Override public String getId() {
@@ -79,20 +77,16 @@ public class SegmentDuration extends StreamData {
         setDataString(1, segmentId);
     }
 
-    public String getServiceName() {
+    public String getTraceId() {
         return getDataString(2);
     }
 
-    public void setServiceName(String serviceName) {
-        setDataString(2, serviceName);
-    }
-
-    public String getTraceId() {
-        return getDataString(3);
+    public void setTraceId(String traceId) {
+        setDataString(2, traceId);
     }
 
-    public void setTraceId(String traceId) {
-        setDataString(3, traceId);
+    public StringLinkedList getServiceName() {
+        return getDataStringList(0);
     }
 
     public Long getDuration() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceMetric.java
index fc3f6b7..91cdf74 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceMetric.java
@@ -18,12 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.service;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.FormulaOperation;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.AddMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 import org.apache.skywalking.apm.collector.storage.table.Metric;
 
@@ -32,44 +29,40 @@ import org.apache.skywalking.apm.collector.storage.table.Metric;
  */
 public class ServiceMetric extends StreamData implements Metric {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ServiceMetricTable.ID, new NonMergeOperation()),
-        new Column(ServiceMetricTable.METRIC_ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ServiceMetricTable.ID, new NonMergeOperation()),
+        new StringColumn(ServiceMetricTable.METRIC_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ServiceMetricTable.TIME_BUCKET, new NonMergeOperation()),
-
-        new Column(ServiceMetricTable.TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(ServiceMetricTable.TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(ServiceMetricTable.TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(ServiceMetricTable.TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(ServiceMetricTable.TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new TransactionAverageDurationFormulaOperation()),
-        new Column(ServiceMetricTable.BUSINESS_TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(ServiceMetricTable.BUSINESS_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(ServiceMetricTable.BUSINESS_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(ServiceMetricTable.BUSINESS_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(ServiceMetricTable.BUSINESS_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new BusinessTransactionAverageDurationFormulaOperation()),
-        new Column(ServiceMetricTable.MQ_TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(ServiceMetricTable.MQ_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(ServiceMetricTable.MQ_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(ServiceMetricTable.MQ_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(ServiceMetricTable.MQ_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new MqTransactionAverageDurationFormulaOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ServiceMetricTable.TIME_BUCKET, new NonMergeOperation()),
+
+        new LongColumn(ServiceMetricTable.TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(ServiceMetricTable.TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(ServiceMetricTable.TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ServiceMetricTable.TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ServiceMetricTable.TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new TransactionAverageDurationFormulaOperation()),
+        new LongColumn(ServiceMetricTable.BUSINESS_TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(ServiceMetricTable.BUSINESS_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(ServiceMetricTable.BUSINESS_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ServiceMetricTable.BUSINESS_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ServiceMetricTable.BUSINESS_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new BusinessTransactionAverageDurationFormulaOperation()),
+        new LongColumn(ServiceMetricTable.MQ_TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(ServiceMetricTable.MQ_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(ServiceMetricTable.MQ_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ServiceMetricTable.MQ_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ServiceMetricTable.MQ_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new MqTransactionAverageDurationFormulaOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ServiceMetricTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(ServiceMetricTable.APPLICATION_ID, new NonMergeOperation()),
-        new Column(ServiceMetricTable.INSTANCE_ID, new NonMergeOperation()),
-        new Column(ServiceMetricTable.SERVICE_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ServiceMetricTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(ServiceMetricTable.APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceMetricTable.INSTANCE_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceMetricTable.SERVICE_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ServiceMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceReferenceMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceReferenceMetric.java
index 638a375..e644e02 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceReferenceMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceReferenceMetric.java
@@ -18,12 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.service;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.FormulaOperation;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.AddMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 import org.apache.skywalking.apm.collector.storage.table.Metric;
 
@@ -32,50 +29,47 @@ import org.apache.skywalking.apm.collector.storage.table.Metric;
  */
 public class ServiceReferenceMetric extends StreamData implements Metric {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ServiceReferenceMetricTable.ID, new NonMergeOperation()),
-        new Column(ServiceReferenceMetricTable.METRIC_ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ServiceReferenceMetricTable.ID, new NonMergeOperation()),
+        new StringColumn(ServiceReferenceMetricTable.METRIC_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ServiceReferenceMetricTable.TIME_BUCKET, new NonMergeOperation()),
-
-        new Column(ServiceReferenceMetricTable.TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(ServiceReferenceMetricTable.TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(ServiceReferenceMetricTable.TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(ServiceReferenceMetricTable.TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(ServiceReferenceMetricTable.TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new TransactionAverageDurationFormulaOperation()),
-        new Column(ServiceReferenceMetricTable.BUSINESS_TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(ServiceReferenceMetricTable.BUSINESS_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(ServiceReferenceMetricTable.BUSINESS_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(ServiceReferenceMetricTable.BUSINESS_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(ServiceReferenceMetricTable.BUSINESS_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new BusinessTransactionAverageDurationFormulaOperation()),
-        new Column(ServiceReferenceMetricTable.MQ_TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(ServiceReferenceMetricTable.MQ_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(ServiceReferenceMetricTable.MQ_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(ServiceReferenceMetricTable.MQ_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(ServiceReferenceMetricTable.MQ_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new MqTransactionAverageDurationFormulaOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ServiceReferenceMetricTable.TIME_BUCKET, new NonMergeOperation()),
+
+        new LongColumn(ServiceReferenceMetricTable.TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(ServiceReferenceMetricTable.TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(ServiceReferenceMetricTable.TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ServiceReferenceMetricTable.TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ServiceReferenceMetricTable.TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new TransactionAverageDurationFormulaOperation()),
+        new LongColumn(ServiceReferenceMetricTable.BUSINESS_TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(ServiceReferenceMetricTable.BUSINESS_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(ServiceReferenceMetricTable.BUSINESS_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ServiceReferenceMetricTable.BUSINESS_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ServiceReferenceMetricTable.BUSINESS_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new BusinessTransactionAverageDurationFormulaOperation()),
+        new LongColumn(ServiceReferenceMetricTable.MQ_TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(ServiceReferenceMetricTable.MQ_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(ServiceReferenceMetricTable.MQ_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ServiceReferenceMetricTable.MQ_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ServiceReferenceMetricTable.MQ_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new MqTransactionAverageDurationFormulaOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ServiceReferenceMetricTable.SOURCE_VALUE, new NonMergeOperation()),
 
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ServiceReferenceMetricTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceMetricTable.FRONT_SERVICE_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceMetricTable.BEHIND_SERVICE_ID, new NonMergeOperation()),
 
-        new Column(ServiceReferenceMetricTable.FRONT_SERVICE_ID, new NonMergeOperation()),
-        new Column(ServiceReferenceMetricTable.BEHIND_SERVICE_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceMetricTable.FRONT_INSTANCE_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceMetricTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
 
-        new Column(ServiceReferenceMetricTable.FRONT_INSTANCE_ID, new NonMergeOperation()),
-        new Column(ServiceReferenceMetricTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
-
-        new Column(ServiceReferenceMetricTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
-        new Column(ServiceReferenceMetricTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceMetricTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceMetricTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ServiceReferenceMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0], new StringListColumn[0], new LongListColumn[0], new IntegerListColumn[0], new DoubleListColumn[0]);
+
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/ui/trace/BasicTrace.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/ui/trace/BasicTrace.java
index 0d21d3e..4f99c6f 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/ui/trace/BasicTrace.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/ui/trace/BasicTrace.java
@@ -18,8 +18,7 @@
 
 package org.apache.skywalking.apm.collector.storage.ui.trace;
 
-import java.util.LinkedList;
-import java.util.List;
+import java.util.*;
 
 /**
  * @author peng-yongsheng
@@ -27,12 +26,17 @@ import java.util.List;
 public class BasicTrace {
 
     private String segmentId;
-    private String operationName;
+    private List<String> operationNames;
     private int duration;
     private long start;
     private Boolean isError;
     private List<String> traceIds;
 
+    public BasicTrace() {
+        this.traceIds = new LinkedList<>();
+        this.operationNames = new LinkedList<>();
+    }
+
     public String getSegmentId() {
         return segmentId;
     }
@@ -41,16 +45,8 @@ public class BasicTrace {
         this.segmentId = segmentId;
     }
 
-    public BasicTrace() {
-        this.traceIds = new LinkedList<>();
-    }
-
-    public String getOperationName() {
-        return operationName;
-    }
-
-    public void setOperationName(String operationName) {
-        this.operationName = operationName;
+    public List<String> getOperationName() {
+        return operationNames;
     }
 
     public int getDuration() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentDurationEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentDurationEsPersistenceDAO.java
index 30de27d..24a739d 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentDurationEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentDurationEsPersistenceDAO.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao;
 
+import com.google.gson.Gson;
 import java.util.*;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.storage.dao.ISegmentDurationPersistenceDAO;
@@ -36,6 +37,8 @@ public class SegmentDurationEsPersistenceDAO extends EsDAO implements ISegmentDu
 
     private final Logger logger = LoggerFactory.getLogger(SegmentDurationEsPersistenceDAO.class);
 
+    private final Gson gson = new Gson();
+
     public SegmentDurationEsPersistenceDAO(ElasticSearchClient client) {
         super(client);
     }
@@ -55,7 +58,7 @@ public class SegmentDurationEsPersistenceDAO extends EsDAO implements ISegmentDu
         Map<String, Object> target = new HashMap<>();
         target.put(SegmentDurationTable.SEGMENT_ID.getName(), data.getSegmentId());
         target.put(SegmentDurationTable.APPLICATION_ID.getName(), data.getApplicationId());
-        target.put(SegmentDurationTable.SERVICE_NAME.getName(), data.getServiceName());
+        target.put(SegmentDurationTable.SERVICE_NAME.getName(), gson.toJson(data.getServiceName()));
         target.put(SegmentDurationTable.DURATION.getName(), data.getDuration());
         target.put(SegmentDurationTable.START_TIME.getName(), data.getStartTime());
         target.put(SegmentDurationTable.END_TIME.getName(), data.getEndTime());
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/SegmentDurationEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/SegmentDurationEsUIDAO.java
index e12f1e4..7e65b47 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/SegmentDurationEsUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/SegmentDurationEsUIDAO.java
@@ -18,7 +18,8 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.ui;
 
-import java.util.List;
+import com.google.gson.Gson;
+import java.util.*;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.util.*;
 import org.apache.skywalking.apm.collector.storage.dao.ui.ISegmentDurationUIDAO;
@@ -35,6 +36,8 @@ import org.elasticsearch.search.sort.SortOrder;
  */
 public class SegmentDurationEsUIDAO extends EsDAO implements ISegmentDurationUIDAO {
 
+    private final Gson gson = new Gson();
+
     public SegmentDurationEsUIDAO(ElasticSearchClient client) {
         super(client);
     }
@@ -102,7 +105,12 @@ public class SegmentDurationEsUIDAO extends EsDAO implements ISegmentDurationUID
 
             basicTrace.setSegmentId((String)searchHit.getSource().get(SegmentDurationTable.SEGMENT_ID.getName()));
             basicTrace.setStart(((Number)searchHit.getSource().get(SegmentDurationTable.START_TIME.getName())).longValue());
-            basicTrace.setOperationName((String)searchHit.getSource().get(SegmentDurationTable.SERVICE_NAME.getName()));
+
+            String serviceNameJsonStr = (String)searchHit.getSource().get(SegmentDurationTable.SERVICE_NAME.getName());
+            if (StringUtils.isNotEmpty(serviceNameJsonStr)) {
+                List serviceNames = gson.fromJson(serviceNameJsonStr, LinkedList.class);
+                basicTrace.getOperationName().addAll(serviceNames);
+            }
             basicTrace.setDuration(((Number)searchHit.getSource().get(SegmentDurationTable.DURATION.getName())).intValue());
             basicTrace.setError(BooleanUtils.valueToBoolean(((Number)searchHit.getSource().get(SegmentDurationTable.IS_ERROR.getName())).intValue()));
             traceBrief.getTraces().add(basicTrace);
diff --git a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/ui/SegmentDurationH2UIDAO.java b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/ui/SegmentDurationH2UIDAO.java
index 5f154e8..2ffddc7 100644
--- a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/ui/SegmentDurationH2UIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/ui/SegmentDurationH2UIDAO.java
@@ -128,7 +128,7 @@ public class SegmentDurationH2UIDAO extends H2DAO implements ISegmentDurationUID
                 basicTrace.setSegmentId(rs.getString(SegmentDurationTable.SEGMENT_ID.getName()));
                 basicTrace.setDuration(rs.getInt(SegmentDurationTable.DURATION.getName()));
                 basicTrace.setStart(rs.getLong(SegmentDurationTable.START_TIME.getName()));
-                basicTrace.setOperationName(rs.getString(SegmentDurationTable.SERVICE_NAME.getName()));
+//                basicTrace.setOperationName(rs.getString(SegmentDurationTable.SERVICE_NAME.getName()));
                 basicTrace.setError(BooleanUtils.valueToBoolean(rs.getInt(SegmentDurationTable.IS_ERROR.getName())));
                 traceBrief.getTraces().add(basicTrace);
                 cnt++;
diff --git a/apm-collector/apm-collector-storage/collector-storage-shardingjdbc-provider/src/main/java/org/apache/skywalking/apm/collector/storage/shardingjdbc/dao/ui/SegmentDurationShardingjdbcUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-shardingjdbc-provider/src/main/java/org/apache/skywalking/apm/collector/storage/shardingjdbc/dao/ui/SegmentDurationShardingjdbcUIDAO.java
index 5510caa..eac02da 100644
--- a/apm-collector/apm-collector-storage/collector-storage-shardingjdbc-provider/src/main/java/org/apache/skywalking/apm/collector/storage/shardingjdbc/dao/ui/SegmentDurationShardingjdbcUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-shardingjdbc-provider/src/main/java/org/apache/skywalking/apm/collector/storage/shardingjdbc/dao/ui/SegmentDurationShardingjdbcUIDAO.java
@@ -141,7 +141,8 @@ public class SegmentDurationShardingjdbcUIDAO extends ShardingjdbcDAO implements
                 basicTrace.setSegmentId(rs.getString(SegmentDurationTable.SEGMENT_ID.getName()));
                 basicTrace.setDuration(rs.getInt(SegmentDurationTable.DURATION.getName()));
                 basicTrace.setStart(rs.getLong(SegmentDurationTable.START_TIME.getName()));
-                basicTrace.setOperationName(rs.getString(SegmentDurationTable.SERVICE_NAME.getName()));
+                //TODO linjiaqi operation name was changed to contains multiple values
+                //basicTrace.setOperationName(rs.getString(SegmentDurationTable.SERVICE_NAME.getName()));
                 basicTrace.setError(BooleanUtils.valueToBoolean(rs.getInt(SegmentDurationTable.IS_ERROR.getName())));
                 traceBrief.getTraces().add(basicTrace);
                 cnt++;
diff --git a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/test/java/org/apache/skywalking/apm/collector/ui/service/SegmentTopServiceTest.java b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/test/java/org/apache/skywalking/apm/collector/ui/service/SegmentTopServiceTest.java
index d727465..c9d3f62 100644
--- a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/test/java/org/apache/skywalking/apm/collector/ui/service/SegmentTopServiceTest.java
+++ b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/test/java/org/apache/skywalking/apm/collector/ui/service/SegmentTopServiceTest.java
@@ -83,7 +83,7 @@ public class SegmentTopServiceTest {
         BasicTrace basicTrace = new BasicTrace();
         basicTrace.setDuration(12);
         basicTrace.setError(false);
-        basicTrace.setOperationName("test");
+        basicTrace.getOperationName().add("test");
         basicTrace.setSegmentId("segmentId");
         basicTrace.setStart(System.currentTimeMillis());
         basicTrace.setTraceIds(Collections.singletonList("traceId"));
diff --git a/apm-protocol/apm-ui-protocol/src/main/resources/ui-graphql/trace.graphqls b/apm-protocol/apm-ui-protocol/src/main/resources/ui-graphql/trace.graphqls
index e6f336b..9bb30be 100644
--- a/apm-protocol/apm-ui-protocol/src/main/resources/ui-graphql/trace.graphqls
+++ b/apm-protocol/apm-ui-protocol/src/main/resources/ui-graphql/trace.graphqls
@@ -23,7 +23,7 @@ type TraceBrief {
 # Trace basic info
 type BasicTrace {
    segmentId: String!
-   operationName: String!
+   operationNames: [String!]!
    duration: Int!
    start: String!
    isError: Boolean