You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2018/06/21 02:52:41 UTC

[GitHub] wu-sheng closed pull request #1372: Accept multi entry spans in collector analysis

wu-sheng closed pull request #1372: Accept multi entry spans in collector analysis
URL: https://github.com/apache/incubator-skywalking/pull/1372
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/ConsumerMock.java b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/ConsumerMock.java
index d83a6cd7b..e080d3da0 100644
--- a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/ConsumerMock.java
+++ b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/test/java/org/apache/skywalking/apm/collector/agent/grpc/provider/handler/mock/ConsumerMock.java
@@ -20,12 +20,7 @@
 
 import com.google.protobuf.ByteString;
 import io.grpc.stub.StreamObserver;
-import org.apache.skywalking.apm.network.proto.SpanLayer;
-import org.apache.skywalking.apm.network.proto.SpanObject;
-import org.apache.skywalking.apm.network.proto.SpanType;
-import org.apache.skywalking.apm.network.proto.TraceSegmentObject;
-import org.apache.skywalking.apm.network.proto.UniqueId;
-import org.apache.skywalking.apm.network.proto.UpstreamSegment;
+import org.apache.skywalking.apm.network.proto.*;
 import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
 
 /**
@@ -47,20 +42,76 @@ private ByteString createSegment(long startTimestamp, UniqueId.Builder segmentId
         segment.setTraceSegmentId(segmentId);
         segment.setApplicationId(-1);
         segment.setApplicationInstanceId(2);
-        segment.addSpans(createExitSpan(startTimestamp, isPrepare));
         segment.addSpans(createEntrySpan(startTimestamp, isPrepare));
+        segment.addSpans(createLocalSpan(startTimestamp, isPrepare));
+        segment.addSpans(createMqEntrySpan(startTimestamp, isPrepare));
+        segment.addSpans(createExitSpan(startTimestamp, isPrepare));
+        segment.addSpans(createMqEntrySpan2(startTimestamp, isPrepare));
+        segment.addSpans(createExitSpan2(startTimestamp, isPrepare));
 
         return segment.build().toByteString();
     }
 
-    private SpanObject.Builder createExitSpan(long startTimestamp, boolean isPrepare) {
+    private SpanObject.Builder createEntrySpan(long startTimestamp, boolean isPrepare) {
+        SpanObject.Builder span = SpanObject.newBuilder();
+        span.setSpanId(0);
+        span.setSpanType(SpanType.Entry);
+        span.setSpanLayer(SpanLayer.Http);
+        span.setParentSpanId(-1);
+        span.setStartTime(startTimestamp);
+        span.setEndTime(startTimestamp + 2000);
+        span.setComponentId(ComponentsDefine.TOMCAT.getId());
+        if (isPrepare) {
+            span.setOperationName("/dubbox-case/case/dubbox-rest");
+        } else {
+            span.setOperationNameId(2);
+        }
+        span.setIsError(false);
+        return span;
+    }
+
+    private SpanObject.Builder createLocalSpan(long startTimestamp, boolean isPrepare) {
         SpanObject.Builder span = SpanObject.newBuilder();
         span.setSpanId(1);
+        span.setSpanType(SpanType.Local);
+        span.setParentSpanId(0);
+        span.setStartTime(startTimestamp + 100);
+        span.setEndTime(startTimestamp + 1900);
+        if (isPrepare) {
+            span.setOperationName("org.apache.skywalking.Local.do");
+        } else {
+            span.setOperationNameId(2);
+        }
+        span.setIsError(false);
+        return span;
+    }
+
+    private SpanObject.Builder createMqEntrySpan(long startTimestamp, boolean isPrepare) {
+        SpanObject.Builder span = SpanObject.newBuilder();
+        span.setSpanId(2);
+        span.setSpanType(SpanType.Entry);
+        span.setSpanLayer(SpanLayer.MQ);
+        span.setParentSpanId(1);
+        span.setStartTime(startTimestamp + 110);
+        span.setEndTime(startTimestamp + 1800);
+        span.setComponentId(ComponentsDefine.ROCKET_MQ_CONSUMER.getId());
+        if (isPrepare) {
+            span.setOperationName("org.apache.skywalking.RocketMQ");
+        } else {
+            span.setOperationNameId(2);
+        }
+        span.setIsError(false);
+        return span;
+    }
+
+    private SpanObject.Builder createExitSpan(long startTimestamp, boolean isPrepare) {
+        SpanObject.Builder span = SpanObject.newBuilder();
+        span.setSpanId(3);
         span.setSpanType(SpanType.Exit);
         span.setSpanLayer(SpanLayer.RPCFramework);
-        span.setParentSpanId(0);
-        span.setStartTime(startTimestamp + 10);
-        span.setEndTime(startTimestamp + 1990);
+        span.setParentSpanId(2);
+        span.setStartTime(startTimestamp + 120);
+        span.setEndTime(startTimestamp + 1780);
         span.setComponentId(ComponentsDefine.DUBBO.getId());
         if (isPrepare) {
             span.setPeer("172.25.0.4:20880");
@@ -73,21 +124,41 @@ private ByteString createSegment(long startTimestamp, UniqueId.Builder segmentId
         return span;
     }
 
-    private SpanObject.Builder createEntrySpan(long startTimestamp, boolean isPrepare) {
+    private SpanObject.Builder createMqEntrySpan2(long startTimestamp, boolean isPrepare) {
         SpanObject.Builder span = SpanObject.newBuilder();
-        span.setSpanId(0);
+        span.setSpanId(4);
         span.setSpanType(SpanType.Entry);
-        span.setSpanLayer(SpanLayer.Http);
-        span.setParentSpanId(-1);
-        span.setStartTime(startTimestamp);
-        span.setEndTime(startTimestamp + 2000);
-        span.setComponentId(ComponentsDefine.TOMCAT.getId());
+        span.setSpanLayer(SpanLayer.MQ);
+        span.setParentSpanId(1);
+        span.setStartTime(startTimestamp + 110);
+        span.setEndTime(startTimestamp + 1800);
+        span.setComponentId(ComponentsDefine.ROCKET_MQ_CONSUMER.getId());
         if (isPrepare) {
-            span.setOperationName("/dubbox-case/case/dubbox-rest");
+            span.setOperationName("org.apache.skywalking.RocketMQ");
         } else {
             span.setOperationNameId(2);
         }
         span.setIsError(false);
         return span;
     }
+
+    private SpanObject.Builder createExitSpan2(long startTimestamp, boolean isPrepare) {
+        SpanObject.Builder span = SpanObject.newBuilder();
+        span.setSpanId(5);
+        span.setSpanType(SpanType.Exit);
+        span.setSpanLayer(SpanLayer.RPCFramework);
+        span.setParentSpanId(4);
+        span.setStartTime(startTimestamp + 120);
+        span.setEndTime(startTimestamp + 1780);
+        span.setComponentId(ComponentsDefine.DUBBO.getId());
+        if (isPrepare) {
+            span.setPeer("172.25.0.4:20880");
+            span.setOperationName("org.skywaking.apm.testcase.dubbo.services.GreetService.doBusiness()");
+        } else {
+            span.setOperationNameId(-1);
+            span.setPeerId(-1);
+        }
+        span.setIsError(false);
+        return span;
+    }
 }
diff --git a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/segment/SegmentDurationSpanListener.java b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/segment/SegmentDurationSpanListener.java
index c763de515..30269c44d 100644
--- a/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/segment/SegmentDurationSpanListener.java
+++ b/apm-collector/apm-collector-analysis/analysis-metric/metric-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/metric/provider/worker/segment/SegmentDurationSpanListener.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.segment;
 
+import java.util.*;
 import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricGraphIdDefine;
 import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.*;
 import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.*;
@@ -39,11 +40,12 @@
 
     private final SegmentDuration segmentDuration;
     private final ServiceNameCacheService serviceNameCacheService;
-    private int entryOperationNameId = 0;
+    private Set<Integer> entryOperationNameIds;
     private int firstOperationNameId = 0;
 
     private SegmentDurationSpanListener(ModuleManager moduleManager) {
         this.segmentDuration = new SegmentDuration();
+        this.entryOperationNameIds = new HashSet<>();
         this.serviceNameCacheService = moduleManager.find(CacheModule.NAME).getService(ServiceNameCacheService.class);
     }
 
@@ -56,6 +58,7 @@ public void parseFirst(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreI
         long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(segmentCoreInfo.getStartTime());
 
         segmentDuration.setId(segmentCoreInfo.getSegmentId());
+        segmentDuration.setTraceId(segmentCoreInfo.getTraceId());
         segmentDuration.setSegmentId(segmentCoreInfo.getSegmentId());
         segmentDuration.setApplicationId(segmentCoreInfo.getApplicationId());
         segmentDuration.setDuration(segmentCoreInfo.getEndTime() - segmentCoreInfo.getStartTime());
@@ -68,16 +71,16 @@ public void parseFirst(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreI
     }
 
     @Override public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
-        entryOperationNameId = spanDecorator.getOperationNameId();
+        entryOperationNameIds.add(spanDecorator.getOperationNameId());
     }
 
     @Override public void build() {
         Graph<SegmentDuration> graph = GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.SEGMENT_DURATION_GRAPH_ID, SegmentDuration.class);
         logger.debug("segment duration listener build");
-        if (entryOperationNameId == 0) {
-            segmentDuration.setServiceName(serviceNameCacheService.get(firstOperationNameId).getServiceName());
+        if (entryOperationNameIds.size() == 0) {
+            segmentDuration.getServiceName().add(serviceNameCacheService.get(firstOperationNameId).getServiceName());
         } else {
-            segmentDuration.setServiceName(serviceNameCacheService.get(entryOperationNameId).getServiceName());
+            entryOperationNameIds.forEach(entryOperationNameId -> segmentDuration.getServiceName().add(serviceNameCacheService.get(entryOperationNameId).getServiceName()));
         }
 
         graph.start(segmentDuration);
diff --git a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-define/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/define/decorator/SegmentCoreInfo.java b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-define/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/define/decorator/SegmentCoreInfo.java
index fb1514574..48607c37a 100644
--- a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-define/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/define/decorator/SegmentCoreInfo.java
+++ b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-define/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/define/decorator/SegmentCoreInfo.java
@@ -23,6 +23,7 @@
  */
 public class SegmentCoreInfo {
     private String segmentId;
+    private String traceId;
     private int applicationId;
     private int applicationInstanceId;
     private long startTime;
@@ -85,4 +86,12 @@ public long getMinuteTimeBucket() {
     public void setMinuteTimeBucket(long minuteTimeBucket) {
         this.minuteTimeBucket = minuteTimeBucket;
     }
+
+    public String getTraceId() {
+        return traceId;
+    }
+
+    public void setTraceId(String traceId) {
+        this.traceId = traceId;
+    }
 }
diff --git a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/SegmentParse.java b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/SegmentParse.java
index aaa0f8364..85acfc026 100644
--- a/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/SegmentParse.java
+++ b/apm-collector/apm-collector-analysis/analysis-segment-parser/segment-parser-provider/src/main/java/org/apache/skywalking/apm/collector/analysis/segment/parser/provider/parser/SegmentParse.java
@@ -19,32 +19,19 @@
 package org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser;
 
 import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.ReferenceDecorator;
-import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SegmentCoreInfo;
-import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SegmentDecorator;
-import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SpanDecorator;
+import java.util.*;
+import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.*;
 import org.apache.skywalking.apm.collector.analysis.segment.parser.define.graph.GraphIdDefine;
 import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.*;
 import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParseService;
-import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization.ReferenceIdExchanger;
-import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization.SegmentStandardization;
-import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization.SpanIdExchanger;
-import org.apache.skywalking.apm.collector.core.UnexpectedException;
+import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization.*;
 import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
-import org.apache.skywalking.apm.collector.core.graph.Graph;
-import org.apache.skywalking.apm.collector.core.graph.GraphManager;
+import org.apache.skywalking.apm.collector.core.graph.*;
 import org.apache.skywalking.apm.collector.core.module.ModuleManager;
 import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
 import org.apache.skywalking.apm.collector.storage.table.segment.Segment;
-import org.apache.skywalking.apm.network.proto.SpanType;
-import org.apache.skywalking.apm.network.proto.TraceSegmentObject;
-import org.apache.skywalking.apm.network.proto.UniqueId;
-import org.apache.skywalking.apm.network.proto.UpstreamSegment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.LinkedList;
-import java.util.List;
+import org.apache.skywalking.apm.network.proto.*;
+import org.slf4j.*;
 
 /**
  * @author peng-yongsheng
@@ -121,7 +108,6 @@ private boolean preBuild(List<UniqueId> traceIds, SegmentDecorator segmentDecora
         segmentCoreInfo.setApplicationId(segmentDecorator.getApplicationId());
         segmentCoreInfo.setApplicationInstanceId(segmentDecorator.getApplicationInstanceId());
 
-        int entrySpanCount = 0;
         for (int i = 0; i < segmentDecorator.getSpansCount(); i++) {
             SpanDecorator spanDecorator = segmentDecorator.getSpans(i);
 
@@ -136,10 +122,6 @@ private boolean preBuild(List<UniqueId> traceIds, SegmentDecorator segmentDecora
                 }
             }
 
-            if (SpanType.Entry.equals(spanDecorator.getSpanType())) {
-                entrySpanCount++;
-            }
-
             if (segmentCoreInfo.getStartTime() > spanDecorator.getStartTime()) {
                 segmentCoreInfo.setStartTime(spanDecorator.getStartTime());
             }
@@ -147,10 +129,6 @@ private boolean preBuild(List<UniqueId> traceIds, SegmentDecorator segmentDecora
                 segmentCoreInfo.setEndTime(spanDecorator.getEndTime());
             }
             segmentCoreInfo.setError(spanDecorator.getIsError() || segmentCoreInfo.isError());
-
-            if (entrySpanCount > 1) {
-                throw new UnexpectedException("This segment contains multiple entry span.");
-            }
         }
 
         long minuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(segmentCoreInfo.getStartTime());
@@ -205,7 +183,7 @@ private void notifyListenerToBuild() {
     private void notifyExitListener(SpanDecorator spanDecorator) {
         spanListeners.forEach(listener -> {
             if (listener.containsPoint(SpanListener.Point.Exit)) {
-                ((ExitSpanListener) listener).parseExit(spanDecorator, segmentCoreInfo);
+                ((ExitSpanListener)listener).parseExit(spanDecorator, segmentCoreInfo);
             }
         });
     }
@@ -214,7 +192,7 @@ private void notifyExitListener(SpanDecorator spanDecorator) {
     private void notifyEntryListener(SpanDecorator spanDecorator) {
         spanListeners.forEach(listener -> {
             if (listener.containsPoint(SpanListener.Point.Entry)) {
-                ((EntrySpanListener) listener).parseEntry(spanDecorator, segmentCoreInfo);
+                ((EntrySpanListener)listener).parseEntry(spanDecorator, segmentCoreInfo);
             }
         });
     }
@@ -223,7 +201,7 @@ private void notifyEntryListener(SpanDecorator spanDecorator) {
     private void notifyLocalListener(SpanDecorator spanDecorator) {
         spanListeners.forEach(listener -> {
             if (listener.containsPoint(SpanListener.Point.Local)) {
-                ((LocalSpanListener) listener).parseLocal(spanDecorator, segmentCoreInfo);
+                ((LocalSpanListener)listener).parseLocal(spanDecorator, segmentCoreInfo);
             }
         });
     }
@@ -232,7 +210,7 @@ private void notifyLocalListener(SpanDecorator spanDecorator) {
     private void notifyFirstListener(SpanDecorator spanDecorator) {
         spanListeners.forEach(listener -> {
             if (listener.containsPoint(SpanListener.Point.First)) {
-                ((FirstSpanListener) listener).parseFirst(spanDecorator, segmentCoreInfo);
+                ((FirstSpanListener)listener).parseFirst(spanDecorator, segmentCoreInfo);
             }
         });
     }
@@ -241,7 +219,7 @@ private void notifyFirstListener(SpanDecorator spanDecorator) {
     private void notifyGlobalsListener(UniqueId uniqueId) {
         spanListeners.forEach(listener -> {
             if (listener.containsPoint(SpanListener.Point.GlobalTraceIds)) {
-                ((GlobalTraceIdsListener) listener).parseGlobalTraceId(uniqueId, segmentCoreInfo);
+                ((GlobalTraceIdsListener)listener).parseGlobalTraceId(uniqueId, segmentCoreInfo);
             }
         });
     }
@@ -250,5 +228,4 @@ private void notifyGlobalsListener(UniqueId uniqueId) {
     private void createSpanListeners() {
         listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager)));
     }
-
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/AbstractData.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/AbstractData.java
index 663474389..5af3ff2df 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/AbstractData.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/AbstractData.java
@@ -18,82 +18,141 @@
 
 package org.apache.skywalking.apm.collector.core.data;
 
+import org.apache.skywalking.apm.collector.core.data.column.*;
+
 import static java.util.Objects.nonNull;
 
 /**
  * @author peng-yongsheng
  */
-public abstract class AbstractData {
-    private String[] dataStrings;
-    private Long[] dataLongs;
-    private Double[] dataDoubles;
-    private Integer[] dataIntegers;
-    private byte[][] dataBytes;
-    private final Column[] stringColumns;
-    private final Column[] longColumns;
-    private final Column[] doubleColumns;
-    private final Column[] integerColumns;
-    private final Column[] byteColumns;
-
-    public AbstractData(Column[] stringColumns, Column[] longColumns, Column[] doubleColumns,
-        Column[] integerColumns, Column[] byteColumns) {
-        this.dataStrings = new String[stringColumns.length];
-        this.dataLongs = new Long[longColumns.length];
-        this.dataDoubles = new Double[doubleColumns.length];
-        this.dataIntegers = new Integer[integerColumns.length];
-        this.dataBytes = new byte[byteColumns.length][];
+public abstract class AbstractData implements RemoteData {
+    private final String[] dataStrings;
+    private final Long[] dataLongs;
+    private final Double[] dataDoubles;
+    private final Integer[] dataIntegers;
+    private final byte[][] dataBytes;
+
+    private final StringLinkedList[] dataStringLists;
+    private final LongLinkedList[] dataLongLists;
+    private final DoubleLinkedList[] dataDoubleLists;
+    private final IntegerLinkedList[] dataIntegerLists;
+
+    private final StringColumn[] stringColumns;
+    private final LongColumn[] longColumns;
+    private final IntegerColumn[] integerColumns;
+    private final DoubleColumn[] doubleColumns;
+    private final ByteColumn[] byteColumns;
+
+    private final StringListColumn[] stringListColumns;
+    private final LongListColumn[] longListColumns;
+    private final IntegerListColumn[] integerListColumns;
+    private final DoubleListColumn[] doubleListColumns;
+
+    AbstractData(StringColumn[] stringColumns, LongColumn[] longColumns,
+        IntegerColumn[] integerColumns,
+        DoubleColumn[] doubleColumns, ByteColumn[] byteColumns,
+        StringListColumn[] stringListColumns,
+        LongListColumn[] longListColumns,
+        IntegerListColumn[] integerListColumns, DoubleListColumn[] doubleListColumns) {
         this.stringColumns = stringColumns;
         this.longColumns = longColumns;
-        this.doubleColumns = doubleColumns;
         this.integerColumns = integerColumns;
+        this.doubleColumns = doubleColumns;
         this.byteColumns = byteColumns;
+
+        this.stringListColumns = stringListColumns;
+        this.longListColumns = longListColumns;
+        this.integerListColumns = integerListColumns;
+        this.doubleListColumns = doubleListColumns;
+
+        this.dataStrings = new String[stringColumns.length];
+        this.dataLongs = new Long[longColumns.length];
+        this.dataIntegers = new Integer[integerColumns.length];
+        this.dataDoubles = new Double[doubleColumns.length];
+        this.dataBytes = new byte[byteColumns.length][];
+
+        this.dataStringLists = new StringLinkedList[stringListColumns.length];
+        for (int i = 0; i < this.dataStringLists.length; i++) {
+            this.dataStringLists[i] = new StringLinkedList();
+        }
+
+        this.dataLongLists = new LongLinkedList[longListColumns.length];
+        for (int i = 0; i < this.dataLongLists.length; i++) {
+            this.dataLongLists[i] = new LongLinkedList();
+        }
+
+        this.dataIntegerLists = new IntegerLinkedList[integerListColumns.length];
+        for (int i = 0; i < this.dataIntegerLists.length; i++) {
+            this.dataIntegerLists[i] = new IntegerLinkedList();
+        }
+
+        this.dataDoubleLists = new DoubleLinkedList[doubleListColumns.length];
+        for (int i = 0; i < this.dataDoubleLists.length; i++) {
+            this.dataDoubleLists[i] = new DoubleLinkedList();
+        }
     }
 
-    public final int getDataStringsCount() {
+    @Override public final int getDataStringsCount() {
         return dataStrings.length;
     }
 
-    public final int getDataLongsCount() {
+    @Override public final int getDataLongsCount() {
         return dataLongs.length;
     }
 
-    public final int getDataDoublesCount() {
+    @Override public final int getDataDoublesCount() {
         return dataDoubles.length;
     }
 
-    public final int getDataIntegersCount() {
+    @Override public final int getDataIntegersCount() {
         return dataIntegers.length;
     }
 
-    public final int getDataBytesCount() {
+    @Override public final int getDataBytesCount() {
         return dataBytes.length;
     }
 
-    public final void setDataString(int position, String value) {
+    @Override public int getDataStringListsCount() {
+        return dataStringLists.length;
+    }
+
+    @Override public int getDataLongListsCount() {
+        return dataLongLists.length;
+    }
+
+    @Override public int getDataDoubleListsCount() {
+        return dataDoubleLists.length;
+    }
+
+    @Override public int getDataIntegerListsCount() {
+        return dataIntegerLists.length;
+    }
+
+    @Override public final void setDataString(int position, String value) {
         dataStrings[position] = value;
     }
 
-    public final void setDataLong(int position, Long value) {
+    @Override public final void setDataLong(int position, Long value) {
         dataLongs[position] = value;
     }
 
-    public final void setDataDouble(int position, Double value) {
+    @Override public final void setDataDouble(int position, Double value) {
         dataDoubles[position] = value;
     }
 
-    public final void setDataInteger(int position, Integer value) {
+    @Override public final void setDataInteger(int position, Integer value) {
         dataIntegers[position] = value;
     }
 
-    public final void setDataBytes(int position, byte[] dataBytes) {
+    @Override public final void setDataBytes(int position, byte[] dataBytes) {
         this.dataBytes[position] = dataBytes;
     }
 
-    public final String getDataString(int position) {
+    @Override public final String getDataString(int position) {
         return dataStrings[position];
     }
 
-    public final Long getDataLong(int position) {
+    @Override public final Long getDataLong(int position) {
         if (position + 1 > dataLongs.length) {
             throw new IndexOutOfBoundsException();
         } else if (dataLongs[position] == null) {
@@ -103,7 +162,7 @@ public final Long getDataLong(int position) {
         }
     }
 
-    public final Double getDataDouble(int position) {
+    @Override public final Double getDataDouble(int position) {
         if (position + 1 > dataDoubles.length) {
             throw new IndexOutOfBoundsException();
         } else if (dataDoubles[position] == null) {
@@ -113,7 +172,7 @@ public final Double getDataDouble(int position) {
         }
     }
 
-    public final Integer getDataInteger(int position) {
+    @Override public final Integer getDataInteger(int position) {
         if (position + 1 > dataIntegers.length) {
             throw new IndexOutOfBoundsException();
         } else if (dataIntegers[position] == null) {
@@ -123,10 +182,42 @@ public final Integer getDataInteger(int position) {
         }
     }
 
-    public final byte[] getDataBytes(int position) {
+    @Override public final byte[] getDataBytes(int position) {
         return dataBytes[position];
     }
 
+    @Override public StringLinkedList getDataStringList(int position) {
+        if (position + 1 > dataStringLists.length) {
+            throw new IndexOutOfBoundsException();
+        } else {
+            return dataStringLists[position];
+        }
+    }
+
+    @Override public LongLinkedList getDataLongList(int position) {
+        if (position + 1 > dataLongLists.length) {
+            throw new IndexOutOfBoundsException();
+        } else {
+            return dataLongLists[position];
+        }
+    }
+
+    @Override public DoubleLinkedList getDataDoubleList(int position) {
+        if (position + 1 > dataDoubleLists.length) {
+            throw new IndexOutOfBoundsException();
+        } else {
+            return dataDoubleLists[position];
+        }
+    }
+
+    @Override public IntegerLinkedList getDataIntegerList(int position) {
+        if (position + 1 > dataIntegerLists.length) {
+            throw new IndexOutOfBoundsException();
+        } else {
+            return dataIntegerLists[position];
+        }
+    }
+
     public final void mergeAndFormulaCalculateData(AbstractData newData) {
         mergeData(newData);
         calculateFormula();
@@ -153,6 +244,22 @@ private void mergeData(AbstractData newData) {
             byte[] byteData = byteColumns[i].getMergeOperation().operate(newData.getDataBytes(i), this.getDataBytes(i));
             this.dataBytes[i] = byteData;
         }
+        for (int i = 0; i < stringListColumns.length; i++) {
+            StringLinkedList stringListData = stringListColumns[i].getMergeOperation().operate(newData.getDataStringList(i), this.getDataStringList(i));
+            this.dataStringLists[i] = stringListData;
+        }
+        for (int i = 0; i < longListColumns.length; i++) {
+            LongLinkedList longListData = longListColumns[i].getMergeOperation().operate(newData.getDataLongList(i), this.getDataLongList(i));
+            this.dataLongLists[i] = longListData;
+        }
+        for (int i = 0; i < doubleListColumns.length; i++) {
+            DoubleLinkedList doubleListData = doubleListColumns[i].getMergeOperation().operate(newData.getDataDoubleList(i), this.getDataDoubleList(i));
+            this.dataDoubleLists[i] = doubleListData;
+        }
+        for (int i = 0; i < integerListColumns.length; i++) {
+            IntegerLinkedList integerListData = integerListColumns[i].getMergeOperation().operate(newData.getDataIntegerList(i), this.getDataIntegerList(i));
+            this.dataIntegerLists[i] = integerListData;
+        }
     }
 
     @SuppressWarnings("unchecked")
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/Data.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/Data.java
index 862615433..540b3589e 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/Data.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/Data.java
@@ -30,6 +30,14 @@
 
     int getDataIntegersCount();
 
+    int getDataStringListsCount();
+
+    int getDataLongListsCount();
+
+    int getDataDoubleListsCount();
+
+    int getDataIntegerListsCount();
+
     int getDataBytesCount();
 
     void setDataString(int position, String value);
@@ -50,5 +58,13 @@
 
     Integer getDataInteger(int position);
 
+    StringLinkedList getDataStringList(int position);
+
+    LongLinkedList getDataLongList(int position);
+
+    DoubleLinkedList getDataDoubleList(int position);
+
+    IntegerLinkedList getDataIntegerList(int position);
+
     byte[] getDataBytes(int position);
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/DoubleLinkedList.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/DoubleLinkedList.java
new file mode 100644
index 000000000..b3254efe5
--- /dev/null
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/DoubleLinkedList.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.collector.core.data;
+
+import java.util.LinkedList;
+
+/**
+ * @author peng-yongsheng
+ */
+public class DoubleLinkedList extends LinkedList<Double> {
+}
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/IntegerLinkedList.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/IntegerLinkedList.java
new file mode 100644
index 000000000..2374a2021
--- /dev/null
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/IntegerLinkedList.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.collector.core.data;
+
+import java.util.LinkedList;
+
+/**
+ * @author peng-yongsheng
+ */
+public class IntegerLinkedList extends LinkedList<Integer> {
+}
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/LongLinkedList.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/LongLinkedList.java
new file mode 100644
index 000000000..88dd6d984
--- /dev/null
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/LongLinkedList.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.collector.core.data;
+
+import java.util.LinkedList;
+
+/**
+ * @author peng-yongsheng
+ */
+public class LongLinkedList extends LinkedList<Long> {
+}
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
index 1b3ddbaed..5ebc92706 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/MergeOperation.java
@@ -26,9 +26,17 @@
 
     Long operate(Long newValue, Long oldValue);
 
-    Double operate(Double newValue, Double oldValue);
-
     Integer operate(Integer newValue, Integer oldValue);
 
+    Double operate(Double newValue, Double oldValue);
+
     byte[] operate(byte[] newValue, byte[] oldValue);
+
+    StringLinkedList operate(StringLinkedList newValue, StringLinkedList oldValue);
+
+    LongLinkedList operate(LongLinkedList newValue, LongLinkedList oldValue);
+
+    IntegerLinkedList operate(IntegerLinkedList newValue, IntegerLinkedList oldValue);
+
+    DoubleLinkedList operate(DoubleLinkedList newValue, DoubleLinkedList oldValue);
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/StreamData.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/StreamData.java
index faf127eb5..ece2da938 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/StreamData.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/StreamData.java
@@ -18,12 +18,13 @@
 
 package org.apache.skywalking.apm.collector.core.data;
 
+import org.apache.skywalking.apm.collector.core.data.column.*;
 import org.apache.skywalking.apm.collector.core.queue.EndOfBatchContext;
 
 /**
  * @author peng-yongsheng
  */
-public abstract class StreamData extends AbstractData implements RemoteData, QueueData {
+public abstract class StreamData extends AbstractData implements QueueData {
 
     private EndOfBatchContext endOfBatchContext;
 
@@ -35,9 +36,23 @@
         this.endOfBatchContext = context;
     }
 
-    public StreamData(Column[] stringColumns, Column[] longColumns, Column[] doubleColumns,
-        Column[] integerColumns, Column[] byteColumns) {
-        super(stringColumns, longColumns, doubleColumns, integerColumns, byteColumns);
+    public StreamData(StringColumn[] stringColumns, LongColumn[] longColumns,
+        IntegerColumn[] integerColumns,
+        DoubleColumn[] doubleColumns, StringListColumn[] stringListColumns,
+        LongListColumn[] longListColumns,
+        IntegerListColumn[] integerListColumns, DoubleListColumn[] doubleListColumns) {
+        super(stringColumns, longColumns, integerColumns, doubleColumns, new ByteColumn[0], stringListColumns, longListColumns, integerListColumns, doubleListColumns);
+    }
+
+    public StreamData(StringColumn[] stringColumns, LongColumn[] longColumns,
+        IntegerColumn[] integerColumns, DoubleColumn[] doubleColumns) {
+        super(stringColumns, longColumns, integerColumns, doubleColumns, new ByteColumn[0], new StringListColumn[0], new LongListColumn[0], new IntegerListColumn[0], new DoubleListColumn[0]);
+    }
+
+    public StreamData(StringColumn[] stringColumns, LongColumn[] longColumns,
+        IntegerColumn[] integerColumns, DoubleColumn[] doubleColumns,
+        ByteColumn[] byteColumns) {
+        super(stringColumns, longColumns, integerColumns, doubleColumns, byteColumns, new StringListColumn[0], new LongListColumn[0], new IntegerListColumn[0], new DoubleListColumn[0]);
     }
 
     @Override public final String selectKey() {
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/StringLinkedList.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/StringLinkedList.java
new file mode 100644
index 000000000..7293e6727
--- /dev/null
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/StringLinkedList.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.collector.core.data;
+
+import java.util.LinkedList;
+
+/**
+ * @author peng-yongsheng
+ */
+public class StringLinkedList extends LinkedList<String> {
+}
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/ByteColumn.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/ByteColumn.java
new file mode 100644
index 000000000..2063af253
--- /dev/null
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/ByteColumn.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.collector.core.data.column;
+
+import org.apache.skywalking.apm.collector.core.data.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class ByteColumn extends Column {
+
+    public ByteColumn(ColumnName columnName, MergeOperation mergeOperation) {
+        super(columnName, mergeOperation);
+    }
+
+    public ByteColumn(ColumnName columnName, MergeOperation mergeOperation, FormulaOperation formulaOperation) {
+        super(columnName, mergeOperation, formulaOperation);
+    }
+}
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/Column.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/Column.java
similarity index 74%
rename from apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/Column.java
rename to apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/Column.java
index 3a651d299..d7fee6913 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/Column.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/Column.java
@@ -16,37 +16,40 @@
  *
  */
 
-package org.apache.skywalking.apm.collector.core.data;
+package org.apache.skywalking.apm.collector.core.data.column;
+
+import org.apache.skywalking.apm.collector.core.data.*;
 
 /**
  * @author peng-yongsheng
  */
-public class Column {
+class Column {
     private final ColumnName columnName;
     private final MergeOperation mergeOperation;
     private final FormulaOperation formulaOperation;
 
-    public Column(ColumnName columnName, MergeOperation mergeOperation) {
+    Column(ColumnName columnName, MergeOperation mergeOperation) {
         this.columnName = columnName;
         this.mergeOperation = mergeOperation;
         this.formulaOperation = null;
     }
 
-    public Column(ColumnName columnName, MergeOperation mergeOperation, FormulaOperation formulaOperation) {
+    Column(ColumnName columnName, MergeOperation mergeOperation,
+        FormulaOperation formulaOperation) {
         this.columnName = columnName;
         this.mergeOperation = mergeOperation;
         this.formulaOperation = formulaOperation;
     }
 
-    public ColumnName getColumnName() {
+    public final ColumnName getColumnName() {
         return columnName;
     }
 
-    MergeOperation getMergeOperation() {
+    public final MergeOperation getMergeOperation() {
         return mergeOperation;
     }
 
-    FormulaOperation getFormulaOperation() {
+    public final FormulaOperation getFormulaOperation() {
         return formulaOperation;
     }
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/DoubleColumn.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/DoubleColumn.java
new file mode 100644
index 000000000..bc5182c8c
--- /dev/null
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/DoubleColumn.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.collector.core.data.column;
+
+import org.apache.skywalking.apm.collector.core.data.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class DoubleColumn extends Column {
+
+    public DoubleColumn(ColumnName columnName, MergeOperation mergeOperation) {
+        super(columnName, mergeOperation);
+    }
+
+    public DoubleColumn(ColumnName columnName, MergeOperation mergeOperation, FormulaOperation formulaOperation) {
+        super(columnName, mergeOperation, formulaOperation);
+    }
+}
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/DoubleListColumn.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/DoubleListColumn.java
new file mode 100644
index 000000000..f07da95e9
--- /dev/null
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/DoubleListColumn.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.collector.core.data.column;
+
+import org.apache.skywalking.apm.collector.core.data.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class DoubleListColumn extends Column {
+
+    public DoubleListColumn(ColumnName columnName, MergeOperation mergeOperation) {
+        super(columnName, mergeOperation);
+    }
+
+    public DoubleListColumn(ColumnName columnName, MergeOperation mergeOperation, FormulaOperation formulaOperation) {
+        super(columnName, mergeOperation, formulaOperation);
+    }
+}
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/IntegerColumn.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/IntegerColumn.java
new file mode 100644
index 000000000..6e734ce6b
--- /dev/null
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/IntegerColumn.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.collector.core.data.column;
+
+import org.apache.skywalking.apm.collector.core.data.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class IntegerColumn extends Column {
+
+    public IntegerColumn(ColumnName columnName, MergeOperation mergeOperation) {
+        super(columnName, mergeOperation);
+    }
+
+    public IntegerColumn(ColumnName columnName, MergeOperation mergeOperation, FormulaOperation formulaOperation) {
+        super(columnName, mergeOperation, formulaOperation);
+    }
+}
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/IntegerListColumn.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/IntegerListColumn.java
new file mode 100644
index 000000000..cc92f1b16
--- /dev/null
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/IntegerListColumn.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.collector.core.data.column;
+
+import org.apache.skywalking.apm.collector.core.data.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class IntegerListColumn extends Column {
+
+    public IntegerListColumn(ColumnName columnName, MergeOperation mergeOperation) {
+        super(columnName, mergeOperation);
+    }
+
+    public IntegerListColumn(ColumnName columnName, MergeOperation mergeOperation, FormulaOperation formulaOperation) {
+        super(columnName, mergeOperation, formulaOperation);
+    }
+}
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/LongColumn.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/LongColumn.java
new file mode 100644
index 000000000..2319d8117
--- /dev/null
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/LongColumn.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.collector.core.data.column;
+
+import org.apache.skywalking.apm.collector.core.data.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class LongColumn extends Column {
+
+    public LongColumn(ColumnName columnName, MergeOperation mergeOperation) {
+        super(columnName, mergeOperation);
+    }
+
+    public LongColumn(ColumnName columnName, MergeOperation mergeOperation, FormulaOperation formulaOperation) {
+        super(columnName, mergeOperation, formulaOperation);
+    }
+}
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/LongListColumn.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/LongListColumn.java
new file mode 100644
index 000000000..406fc30c4
--- /dev/null
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/LongListColumn.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.collector.core.data.column;
+
+import org.apache.skywalking.apm.collector.core.data.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class LongListColumn extends Column {
+
+    public LongListColumn(ColumnName columnName, MergeOperation mergeOperation) {
+        super(columnName, mergeOperation);
+    }
+
+    public LongListColumn(ColumnName columnName, MergeOperation mergeOperation, FormulaOperation formulaOperation) {
+        super(columnName, mergeOperation, formulaOperation);
+    }
+}
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/StringColumn.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/StringColumn.java
new file mode 100644
index 000000000..915e73308
--- /dev/null
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/StringColumn.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.collector.core.data.column;
+
+import org.apache.skywalking.apm.collector.core.data.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class StringColumn extends Column {
+
+    public StringColumn(ColumnName columnName, MergeOperation mergeOperation) {
+        super(columnName, mergeOperation);
+    }
+
+    public StringColumn(ColumnName columnName, MergeOperation mergeOperation, FormulaOperation formulaOperation) {
+        super(columnName, mergeOperation, formulaOperation);
+    }
+}
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/StringListColumn.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/StringListColumn.java
new file mode 100644
index 000000000..6a12de468
--- /dev/null
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/column/StringListColumn.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.collector.core.data.column;
+
+import org.apache.skywalking.apm.collector.core.data.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class StringListColumn extends Column {
+
+    public StringListColumn(ColumnName columnName, MergeOperation mergeOperation) {
+        super(columnName, mergeOperation);
+    }
+
+    public StringListColumn(ColumnName columnName, MergeOperation mergeOperation, FormulaOperation formulaOperation) {
+        super(columnName, mergeOperation, formulaOperation);
+    }
+}
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/AddMergeOperation.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/AddMergeOperation.java
index 2b7d416ce..c3465579e 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/AddMergeOperation.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/AddMergeOperation.java
@@ -18,7 +18,7 @@
 
 package org.apache.skywalking.apm.collector.core.data.operator;
 
-import org.apache.skywalking.apm.collector.core.data.MergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
 
 /**
  * @author peng-yongsheng
@@ -42,6 +42,22 @@
     }
 
     @Override public byte[] operate(byte[] newValue, byte[] oldValue) {
-        throw new UnsupportedOperationException("not support byte addition operation");
+        throw new UnsupportedOperationException("not support byte array addition operation");
+    }
+
+    @Override public StringLinkedList operate(StringLinkedList newValue, StringLinkedList oldValue) {
+        throw new UnsupportedOperationException("not support string list addition operation");
+    }
+
+    @Override public LongLinkedList operate(LongLinkedList newValue, LongLinkedList oldValue) {
+        throw new UnsupportedOperationException("not support long list addition operation");
+    }
+
+    @Override public IntegerLinkedList operate(IntegerLinkedList newValue, IntegerLinkedList oldValue) {
+        throw new UnsupportedOperationException("not support integer list addition operation");
+    }
+
+    @Override public DoubleLinkedList operate(DoubleLinkedList newValue, DoubleLinkedList oldValue) {
+        throw new UnsupportedOperationException("not support double list addition operation");
     }
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/CoverMergeOperation.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/CoverMergeOperation.java
index 5df0e1bcf..c49bd02fc 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/CoverMergeOperation.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/CoverMergeOperation.java
@@ -18,12 +18,13 @@
 
 package org.apache.skywalking.apm.collector.core.data.operator;
 
-import org.apache.skywalking.apm.collector.core.data.MergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
 
 /**
  * @author peng-yongsheng
  */
 public class CoverMergeOperation implements MergeOperation {
+
     @Override public String operate(String newValue, String oldValue) {
         return newValue;
     }
@@ -43,4 +44,20 @@
     @Override public byte[] operate(byte[] newValue, byte[] oldValue) {
         return newValue;
     }
+
+    @Override public StringLinkedList operate(StringLinkedList newValue, StringLinkedList oldValue) {
+        return newValue;
+    }
+
+    @Override public LongLinkedList operate(LongLinkedList newValue, LongLinkedList oldValue) {
+        return newValue;
+    }
+
+    @Override public IntegerLinkedList operate(IntegerLinkedList newValue, IntegerLinkedList oldValue) {
+        return newValue;
+    }
+
+    @Override public DoubleLinkedList operate(DoubleLinkedList newValue, DoubleLinkedList oldValue) {
+        return newValue;
+    }
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/MaxMergeOperation.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/MaxMergeOperation.java
index 49cf398e2..81399caaa 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/MaxMergeOperation.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/MaxMergeOperation.java
@@ -18,7 +18,7 @@
 
 package org.apache.skywalking.apm.collector.core.data.operator;
 
-import org.apache.skywalking.apm.collector.core.data.MergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
 
 /**
  * @author peng-yongsheng
@@ -54,6 +54,22 @@
     }
 
     @Override public byte[] operate(byte[] newValue, byte[] oldValue) {
-        throw new UnsupportedOperationException("not support byte maximum operation");
+        throw new UnsupportedOperationException("not support byte array maximum operation");
+    }
+
+    @Override public StringLinkedList operate(StringLinkedList newValue, StringLinkedList oldValue) {
+        throw new UnsupportedOperationException("not support string list maximum operation");
+    }
+
+    @Override public LongLinkedList operate(LongLinkedList newValue, LongLinkedList oldValue) {
+        throw new UnsupportedOperationException("not support long list maximum operation");
+    }
+
+    @Override public IntegerLinkedList operate(IntegerLinkedList newValue, IntegerLinkedList oldValue) {
+        throw new UnsupportedOperationException("not support integer list maximum operation");
+    }
+
+    @Override public DoubleLinkedList operate(DoubleLinkedList newValue, DoubleLinkedList oldValue) {
+        throw new UnsupportedOperationException("not support double list maximum operation");
     }
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/MinMergeOperation.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/MinMergeOperation.java
index 3cd47ef6e..4bc6022ee 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/MinMergeOperation.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/MinMergeOperation.java
@@ -18,7 +18,7 @@
 
 package org.apache.skywalking.apm.collector.core.data.operator;
 
-import org.apache.skywalking.apm.collector.core.data.MergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
 
 /**
  * @author peng-yongsheng
@@ -54,6 +54,22 @@
     }
 
     @Override public byte[] operate(byte[] newValue, byte[] oldValue) {
-        throw new UnsupportedOperationException("not support byte minimum operation");
+        throw new UnsupportedOperationException("not support byte array minimum operation");
+    }
+
+    @Override public StringLinkedList operate(StringLinkedList newValue, StringLinkedList oldValue) {
+        throw new UnsupportedOperationException("not support string list minimum operation");
+    }
+
+    @Override public LongLinkedList operate(LongLinkedList newValue, LongLinkedList oldValue) {
+        throw new UnsupportedOperationException("not support long list minimum operation");
+    }
+
+    @Override public IntegerLinkedList operate(IntegerLinkedList newValue, IntegerLinkedList oldValue) {
+        throw new UnsupportedOperationException("not support integer list minimum operation");
+    }
+
+    @Override public DoubleLinkedList operate(DoubleLinkedList newValue, DoubleLinkedList oldValue) {
+        throw new UnsupportedOperationException("not support double list minimum operation");
     }
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/NonMergeOperation.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/NonMergeOperation.java
index 42c778c28..d62d11f5f 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/NonMergeOperation.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/data/operator/NonMergeOperation.java
@@ -18,7 +18,7 @@
 
 package org.apache.skywalking.apm.collector.core.data.operator;
 
-import org.apache.skywalking.apm.collector.core.data.MergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
 
 /**
  * @author peng-yongsheng
@@ -43,4 +43,20 @@
     @Override public byte[] operate(byte[] newValue, byte[] oldValue) {
         return oldValue;
     }
+
+    @Override public StringLinkedList operate(StringLinkedList newValue, StringLinkedList oldValue) {
+        return oldValue;
+    }
+
+    @Override public LongLinkedList operate(LongLinkedList newValue, LongLinkedList oldValue) {
+        return oldValue;
+    }
+
+    @Override public IntegerLinkedList operate(IntegerLinkedList newValue, IntegerLinkedList oldValue) {
+        return oldValue;
+    }
+
+    @Override public DoubleLinkedList operate(DoubleLinkedList newValue, DoubleLinkedList oldValue) {
+        return oldValue;
+    }
 }
diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/service/GRPCRemoteDeserializeService.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/service/GRPCRemoteDeserializeService.java
index 47aecad03..e2fe21aec 100644
--- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/service/GRPCRemoteDeserializeService.java
+++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/service/GRPCRemoteDeserializeService.java
@@ -31,14 +31,33 @@ public void deserialize(RemoteData remoteData, org.apache.skywalking.apm.collect
         for (int i = 0; i < remoteData.getDataStringsCount(); i++) {
             data.setDataString(i, remoteData.getDataStrings(i));
         }
+
         for (int i = 0; i < remoteData.getDataIntegersCount(); i++) {
             data.setDataInteger(i, remoteData.getDataIntegers(i));
         }
+
         for (int i = 0; i < remoteData.getDataLongsCount(); i++) {
             data.setDataLong(i, remoteData.getDataLongs(i));
         }
+
         for (int i = 0; i < remoteData.getDataDoublesCount(); i++) {
             data.setDataDouble(i, remoteData.getDataDoubles(i));
         }
+
+        for (int i = 0; i < remoteData.getDataStringListsCount(); i++) {
+            data.getDataStringList(i).addAll(remoteData.getDataStringLists(i).getValueList());
+        }
+
+        for (int i = 0; i < remoteData.getDataIntegerListsCount(); i++) {
+            data.getDataIntegerList(i).addAll(remoteData.getDataIntegerLists(i).getValueList());
+        }
+
+        for (int i = 0; i < remoteData.getDataLongListsCount(); i++) {
+            data.getDataLongList(i).addAll(remoteData.getDataLongLists(i).getValueList());
+        }
+
+        for (int i = 0; i < remoteData.getDataDoubleListsCount(); i++) {
+            data.getDataDoubleList(i).addAll(remoteData.getDataDoubleLists(i).getValueList());
+        }
     }
 }
diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/service/GRPCRemoteSerializeService.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/service/GRPCRemoteSerializeService.java
index ea7396e0e..207bbdc77 100644
--- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/service/GRPCRemoteSerializeService.java
+++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/service/GRPCRemoteSerializeService.java
@@ -18,9 +18,8 @@
 
 package org.apache.skywalking.apm.collector.remote.grpc.service;
 
-import org.apache.skywalking.apm.collector.core.util.Const;
-import org.apache.skywalking.apm.collector.core.util.StringUtils;
-import org.apache.skywalking.apm.collector.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.apm.collector.core.util.*;
+import org.apache.skywalking.apm.collector.remote.grpc.proto.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteSerializeService;
 
 /**
@@ -29,23 +28,51 @@
 public class GRPCRemoteSerializeService implements RemoteSerializeService<RemoteData.Builder> {
 
     @Override public RemoteData.Builder serialize(org.apache.skywalking.apm.collector.core.data.RemoteData data) {
-        RemoteData.Builder builder = RemoteData.newBuilder();
+        RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
         for (int i = 0; i < data.getDataStringsCount(); i++) {
             if (StringUtils.isNotEmpty(data.getDataString(i))) {
-                builder.addDataStrings(data.getDataString(i));
+                remoteBuilder.addDataStrings(data.getDataString(i));
             } else {
-                builder.addDataStrings(Const.EMPTY_STRING);
+                remoteBuilder.addDataStrings(Const.EMPTY_STRING);
             }
         }
+
         for (int i = 0; i < data.getDataIntegersCount(); i++) {
-            builder.addDataIntegers(data.getDataInteger(i));
+            remoteBuilder.addDataIntegers(data.getDataInteger(i));
         }
+
         for (int i = 0; i < data.getDataLongsCount(); i++) {
-            builder.addDataLongs(data.getDataLong(i));
+            remoteBuilder.addDataLongs(data.getDataLong(i));
         }
+
         for (int i = 0; i < data.getDataDoublesCount(); i++) {
-            builder.addDataDoubles(data.getDataDouble(i));
+            remoteBuilder.addDataDoubles(data.getDataDouble(i));
+        }
+
+        for (int i = 0; i < data.getDataStringListsCount(); i++) {
+            StringList.Builder stringList = StringList.newBuilder();
+            data.getDataStringList(i).forEach(stringList::addValue);
+            remoteBuilder.addDataStringLists(stringList);
+        }
+
+        for (int i = 0; i < data.getDataLongListsCount(); i++) {
+            LongList.Builder longList = LongList.newBuilder();
+            data.getDataLongList(i).forEach(longList::addValue);
+            remoteBuilder.addDataLongLists(longList);
+        }
+
+        for (int i = 0; i < data.getDataIntegerListsCount(); i++) {
+            IntegerList.Builder integerList = IntegerList.newBuilder();
+            data.getDataIntegerList(i).forEach(integerList::addValue);
+            remoteBuilder.addDataIntegerLists(integerList);
         }
-        return builder;
+
+        for (int i = 0; i < data.getDataDoubleListsCount(); i++) {
+            DoubleList.Builder doubleList = DoubleList.newBuilder();
+            data.getDataDoubleList(i).forEach(doubleList::addValue);
+            remoteBuilder.addDataDoubleLists(doubleList);
+        }
+
+        return remoteBuilder;
     }
 }
diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/proto/RemoteCommonService.proto b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/proto/RemoteCommonService.proto
index 7b3643523..04dcf46c4 100644
--- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/proto/RemoteCommonService.proto
+++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/proto/RemoteCommonService.proto
@@ -39,7 +39,27 @@ message RemoteData {
     repeated double dataDoubles = 3;
     repeated int32 dataIntegers = 4;
     repeated bool dataBooleans = 5;
+    repeated StringList dataStringLists = 6;
+    repeated LongList dataLongLists = 7;
+    repeated DoubleList dataDoubleLists = 8;
+    repeated IntegerList dataIntegerLists = 9;
 }
 
 message Empty {
 }
+
+message StringList {
+    repeated string value = 1;
+}
+
+message LongList {
+    repeated int64 value = 1;
+}
+
+message DoubleList {
+    repeated double value = 1;
+}
+
+message IntegerList {
+    repeated int32 value = 1;
+}
\ No newline at end of file
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarm.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarm.java
index 44bd3c5da..af918f346 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarm.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarm.java
@@ -18,11 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
@@ -30,27 +28,23 @@
  */
 public class ApplicationAlarm extends StreamData implements Alarm {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ApplicationAlarmTable.ID, new NonMergeOperation()),
-        new Column(ApplicationAlarmTable.ALARM_CONTENT, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ApplicationAlarmTable.ID, new NonMergeOperation()),
+        new StringColumn(ApplicationAlarmTable.ALARM_CONTENT, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ApplicationAlarmTable.LAST_TIME_BUCKET, new CoverMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ApplicationAlarmTable.LAST_TIME_BUCKET, new CoverMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ApplicationAlarmTable.ALARM_TYPE, new NonMergeOperation()),
-        new Column(ApplicationAlarmTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(ApplicationAlarmTable.APPLICATION_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ApplicationAlarmTable.ALARM_TYPE, new NonMergeOperation()),
+        new IntegerColumn(ApplicationAlarmTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(ApplicationAlarmTable.APPLICATION_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ApplicationAlarm() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarmList.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarmList.java
index 3561e57c8..d9627cc78 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarmList.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationAlarmList.java
@@ -18,11 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
@@ -30,28 +28,24 @@
  */
 public class ApplicationAlarmList extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ApplicationAlarmListTable.ID, new NonMergeOperation()),
-        new Column(ApplicationAlarmListTable.METRIC_ID, new NonMergeOperation()),
-        new Column(ApplicationAlarmListTable.ALARM_CONTENT, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ApplicationAlarmListTable.ID, new NonMergeOperation()),
+        new StringColumn(ApplicationAlarmListTable.METRIC_ID, new NonMergeOperation()),
+        new StringColumn(ApplicationAlarmListTable.ALARM_CONTENT, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ApplicationAlarmListTable.TIME_BUCKET, new NonMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ApplicationAlarmListTable.TIME_BUCKET, new NonMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ApplicationAlarmListTable.ALARM_TYPE, new NonMergeOperation()),
-        new Column(ApplicationAlarmListTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(ApplicationAlarmListTable.APPLICATION_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ApplicationAlarmListTable.ALARM_TYPE, new NonMergeOperation()),
+        new IntegerColumn(ApplicationAlarmListTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(ApplicationAlarmListTable.APPLICATION_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ApplicationAlarmList() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarm.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarm.java
index 8cdf2bbd4..8485720fc 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarm.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarm.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
 import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
 import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
@@ -27,28 +28,24 @@
  */
 public class ApplicationReferenceAlarm extends StreamData implements Alarm {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ApplicationReferenceAlarmTable.ID, new NonMergeOperation()),
-        new Column(ApplicationReferenceAlarmTable.ALARM_CONTENT, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ApplicationReferenceAlarmTable.ID, new NonMergeOperation()),
+        new StringColumn(ApplicationReferenceAlarmTable.ALARM_CONTENT, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ApplicationReferenceAlarmTable.LAST_TIME_BUCKET, new CoverMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ApplicationReferenceAlarmTable.LAST_TIME_BUCKET, new CoverMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ApplicationReferenceAlarmTable.ALARM_TYPE, new NonMergeOperation()),
-        new Column(ApplicationReferenceAlarmTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(ApplicationReferenceAlarmTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
-        new Column(ApplicationReferenceAlarmTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ApplicationReferenceAlarmTable.ALARM_TYPE, new NonMergeOperation()),
+        new IntegerColumn(ApplicationReferenceAlarmTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(ApplicationReferenceAlarmTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(ApplicationReferenceAlarmTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ApplicationReferenceAlarm() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarmList.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarmList.java
index 61ca18168..6ff140a17 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarmList.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ApplicationReferenceAlarmList.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
 import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
 import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
@@ -27,28 +28,24 @@
  */
 public class ApplicationReferenceAlarmList extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ApplicationReferenceAlarmListTable.ID, new NonMergeOperation()),
-        new Column(ApplicationReferenceAlarmListTable.ALARM_CONTENT, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ApplicationReferenceAlarmListTable.ID, new NonMergeOperation()),
+        new StringColumn(ApplicationReferenceAlarmListTable.ALARM_CONTENT, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ApplicationReferenceAlarmListTable.TIME_BUCKET, new NonMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ApplicationReferenceAlarmListTable.TIME_BUCKET, new NonMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ApplicationReferenceAlarmListTable.ALARM_TYPE, new NonMergeOperation()),
-        new Column(ApplicationReferenceAlarmListTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(ApplicationReferenceAlarmListTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
-        new Column(ApplicationReferenceAlarmListTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ApplicationReferenceAlarmListTable.ALARM_TYPE, new NonMergeOperation()),
+        new IntegerColumn(ApplicationReferenceAlarmListTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(ApplicationReferenceAlarmListTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(ApplicationReferenceAlarmListTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ApplicationReferenceAlarmList() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarm.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarm.java
index e4e67aabb..cd3ab1ded 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarm.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarm.java
@@ -18,11 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
@@ -30,28 +28,24 @@
  */
 public class InstanceAlarm extends StreamData implements Alarm {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(InstanceAlarmTable.ID, new NonMergeOperation()),
-        new Column(InstanceAlarmTable.ALARM_CONTENT, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(InstanceAlarmTable.ID, new NonMergeOperation()),
+        new StringColumn(InstanceAlarmTable.ALARM_CONTENT, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(InstanceAlarmTable.LAST_TIME_BUCKET, new CoverMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(InstanceAlarmTable.LAST_TIME_BUCKET, new CoverMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(InstanceAlarmTable.ALARM_TYPE, new NonMergeOperation()),
-        new Column(InstanceAlarmTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(InstanceAlarmTable.APPLICATION_ID, new NonMergeOperation()),
-        new Column(InstanceAlarmTable.INSTANCE_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(InstanceAlarmTable.ALARM_TYPE, new NonMergeOperation()),
+        new IntegerColumn(InstanceAlarmTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(InstanceAlarmTable.APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(InstanceAlarmTable.INSTANCE_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public InstanceAlarm() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarmList.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarmList.java
index 37fdda5bf..ef8f4bc1a 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarmList.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceAlarmList.java
@@ -18,11 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
@@ -30,28 +28,24 @@
  */
 public class InstanceAlarmList extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(InstanceAlarmListTable.ID, new NonMergeOperation()),
-        new Column(InstanceAlarmListTable.ALARM_CONTENT, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(InstanceAlarmListTable.ID, new NonMergeOperation()),
+        new StringColumn(InstanceAlarmListTable.ALARM_CONTENT, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(InstanceAlarmListTable.TIME_BUCKET, new NonMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(InstanceAlarmListTable.TIME_BUCKET, new NonMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(InstanceAlarmListTable.ALARM_TYPE, new NonMergeOperation()),
-        new Column(InstanceAlarmListTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(InstanceAlarmListTable.APPLICATION_ID, new NonMergeOperation()),
-        new Column(InstanceAlarmListTable.INSTANCE_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(InstanceAlarmListTable.ALARM_TYPE, new NonMergeOperation()),
+        new IntegerColumn(InstanceAlarmListTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(InstanceAlarmListTable.APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(InstanceAlarmListTable.INSTANCE_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public InstanceAlarmList() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarm.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarm.java
index b8cf1afae..ca4a8839f 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarm.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarm.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
 import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
 import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
@@ -27,30 +28,26 @@
  */
 public class InstanceReferenceAlarm extends StreamData implements Alarm {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(InstanceReferenceAlarmTable.ID, new NonMergeOperation()),
-        new Column(InstanceReferenceAlarmTable.ALARM_CONTENT, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(InstanceReferenceAlarmTable.ID, new NonMergeOperation()),
+        new StringColumn(InstanceReferenceAlarmTable.ALARM_CONTENT, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(InstanceReferenceAlarmTable.LAST_TIME_BUCKET, new CoverMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(InstanceReferenceAlarmTable.LAST_TIME_BUCKET, new CoverMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(InstanceReferenceAlarmTable.ALARM_TYPE, new NonMergeOperation()),
-        new Column(InstanceReferenceAlarmTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(InstanceReferenceAlarmTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
-        new Column(InstanceReferenceAlarmTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
-        new Column(InstanceReferenceAlarmTable.FRONT_INSTANCE_ID, new NonMergeOperation()),
-        new Column(InstanceReferenceAlarmTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(InstanceReferenceAlarmTable.ALARM_TYPE, new NonMergeOperation()),
+        new IntegerColumn(InstanceReferenceAlarmTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(InstanceReferenceAlarmTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(InstanceReferenceAlarmTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(InstanceReferenceAlarmTable.FRONT_INSTANCE_ID, new NonMergeOperation()),
+        new IntegerColumn(InstanceReferenceAlarmTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public InstanceReferenceAlarm() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarmList.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarmList.java
index 599669d05..4814a0bcd 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarmList.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/InstanceReferenceAlarmList.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
 import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
 import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
@@ -27,30 +28,26 @@
  */
 public class InstanceReferenceAlarmList extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(InstanceReferenceAlarmListTable.ID, new NonMergeOperation()),
-        new Column(InstanceReferenceAlarmListTable.ALARM_CONTENT, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(InstanceReferenceAlarmListTable.ID, new NonMergeOperation()),
+        new StringColumn(InstanceReferenceAlarmListTable.ALARM_CONTENT, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(InstanceReferenceAlarmListTable.TIME_BUCKET, new NonMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(InstanceReferenceAlarmListTable.TIME_BUCKET, new NonMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(InstanceReferenceAlarmListTable.ALARM_TYPE, new NonMergeOperation()),
-        new Column(InstanceReferenceAlarmListTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(InstanceReferenceAlarmListTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
-        new Column(InstanceReferenceAlarmListTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
-        new Column(InstanceReferenceAlarmListTable.FRONT_INSTANCE_ID, new NonMergeOperation()),
-        new Column(InstanceReferenceAlarmListTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(InstanceReferenceAlarmListTable.ALARM_TYPE, new NonMergeOperation()),
+        new IntegerColumn(InstanceReferenceAlarmListTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(InstanceReferenceAlarmListTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(InstanceReferenceAlarmListTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(InstanceReferenceAlarmListTable.FRONT_INSTANCE_ID, new NonMergeOperation()),
+        new IntegerColumn(InstanceReferenceAlarmListTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public InstanceReferenceAlarmList() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarm.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarm.java
index 6db644e93..f3dc1dc8b 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarm.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarm.java
@@ -18,11 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
@@ -30,29 +28,25 @@
  */
 public class ServiceAlarm extends StreamData implements Alarm {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ServiceAlarmTable.ID, new NonMergeOperation()),
-        new Column(ServiceAlarmTable.ALARM_CONTENT, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ServiceAlarmTable.ID, new NonMergeOperation()),
+        new StringColumn(ServiceAlarmTable.ALARM_CONTENT, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ServiceAlarmTable.LAST_TIME_BUCKET, new CoverMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ServiceAlarmTable.LAST_TIME_BUCKET, new CoverMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ServiceAlarmTable.ALARM_TYPE, new NonMergeOperation()),
-        new Column(ServiceAlarmTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(ServiceAlarmTable.APPLICATION_ID, new NonMergeOperation()),
-        new Column(ServiceAlarmTable.INSTANCE_ID, new NonMergeOperation()),
-        new Column(ServiceAlarmTable.SERVICE_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ServiceAlarmTable.ALARM_TYPE, new NonMergeOperation()),
+        new IntegerColumn(ServiceAlarmTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(ServiceAlarmTable.APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceAlarmTable.INSTANCE_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceAlarmTable.SERVICE_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ServiceAlarm() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarmList.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarmList.java
index 252bfa639..98bb5aad5 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarmList.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceAlarmList.java
@@ -18,11 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
@@ -30,29 +28,25 @@
  */
 public class ServiceAlarmList extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ServiceAlarmListTable.ID, new NonMergeOperation()),
-        new Column(ServiceAlarmListTable.ALARM_CONTENT, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ServiceAlarmListTable.ID, new NonMergeOperation()),
+        new StringColumn(ServiceAlarmListTable.ALARM_CONTENT, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ServiceAlarmListTable.TIME_BUCKET, new NonMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ServiceAlarmListTable.TIME_BUCKET, new NonMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ServiceAlarmListTable.ALARM_TYPE, new NonMergeOperation()),
-        new Column(ServiceAlarmListTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(ServiceAlarmListTable.APPLICATION_ID, new NonMergeOperation()),
-        new Column(ServiceAlarmListTable.INSTANCE_ID, new NonMergeOperation()),
-        new Column(ServiceAlarmListTable.SERVICE_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ServiceAlarmListTable.ALARM_TYPE, new NonMergeOperation()),
+        new IntegerColumn(ServiceAlarmListTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(ServiceAlarmListTable.APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceAlarmListTable.INSTANCE_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceAlarmListTable.SERVICE_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ServiceAlarmList() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarm.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarm.java
index 62017fc45..b4483f821 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarm.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarm.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
 import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
 import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
@@ -27,32 +28,28 @@
  */
 public class ServiceReferenceAlarm extends StreamData implements Alarm {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ServiceReferenceAlarmTable.ID, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmTable.ALARM_CONTENT, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ServiceReferenceAlarmTable.ID, new NonMergeOperation()),
+        new StringColumn(ServiceReferenceAlarmTable.ALARM_CONTENT, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ServiceReferenceAlarmTable.LAST_TIME_BUCKET, new CoverMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ServiceReferenceAlarmTable.LAST_TIME_BUCKET, new CoverMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ServiceReferenceAlarmTable.ALARM_TYPE, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmTable.FRONT_INSTANCE_ID, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmTable.FRONT_SERVICE_ID, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmTable.BEHIND_SERVICE_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ServiceReferenceAlarmTable.ALARM_TYPE, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceAlarmTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceAlarmTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceAlarmTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceAlarmTable.FRONT_INSTANCE_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceAlarmTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceAlarmTable.FRONT_SERVICE_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceAlarmTable.BEHIND_SERVICE_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ServiceReferenceAlarm() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarmList.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarmList.java
index d13633f84..d42b240c9 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarmList.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/alarm/ServiceReferenceAlarmList.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.apm.collector.storage.table.alarm;
 
 import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
 import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
@@ -27,32 +28,28 @@
  */
 public class ServiceReferenceAlarmList extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ServiceReferenceAlarmListTable.ID, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmListTable.ALARM_CONTENT, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ServiceReferenceAlarmListTable.ID, new NonMergeOperation()),
+        new StringColumn(ServiceReferenceAlarmListTable.ALARM_CONTENT, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ServiceReferenceAlarmListTable.TIME_BUCKET, new NonMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ServiceReferenceAlarmListTable.TIME_BUCKET, new NonMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ServiceReferenceAlarmListTable.ALARM_TYPE, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmListTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmListTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmListTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmListTable.FRONT_INSTANCE_ID, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmListTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmListTable.FRONT_SERVICE_ID, new NonMergeOperation()),
-        new Column(ServiceReferenceAlarmListTable.BEHIND_SERVICE_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ServiceReferenceAlarmListTable.ALARM_TYPE, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceAlarmListTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceAlarmListTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceAlarmListTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceAlarmListTable.FRONT_INSTANCE_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceAlarmListTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceAlarmListTable.FRONT_SERVICE_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceAlarmListTable.BEHIND_SERVICE_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ServiceReferenceAlarmList() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationComponent.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationComponent.java
index f053c5dcb..31c44a344 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationComponent.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationComponent.java
@@ -18,11 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.application;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
@@ -30,26 +28,22 @@
  */
 public class ApplicationComponent extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ApplicationComponentTable.ID, new NonMergeOperation()),
-        new Column(ApplicationComponentTable.METRIC_ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ApplicationComponentTable.ID, new NonMergeOperation()),
+        new StringColumn(ApplicationComponentTable.METRIC_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ApplicationComponentTable.TIME_BUCKET, new CoverMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ApplicationComponentTable.TIME_BUCKET, new CoverMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ApplicationComponentTable.COMPONENT_ID, new CoverMergeOperation()),
-        new Column(ApplicationComponentTable.APPLICATION_ID, new CoverMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ApplicationComponentTable.COMPONENT_ID, new CoverMergeOperation()),
+        new IntegerColumn(ApplicationComponentTable.APPLICATION_ID, new CoverMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ApplicationComponent() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMapping.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMapping.java
index 249c1921e..811b05166 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMapping.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMapping.java
@@ -18,11 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.application;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
@@ -30,26 +28,22 @@
  */
 public class ApplicationMapping extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ApplicationMappingTable.ID, new NonMergeOperation()),
-        new Column(ApplicationMappingTable.METRIC_ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ApplicationMappingTable.ID, new NonMergeOperation()),
+        new StringColumn(ApplicationMappingTable.METRIC_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ApplicationMappingTable.TIME_BUCKET, new CoverMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ApplicationMappingTable.TIME_BUCKET, new CoverMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ApplicationMappingTable.APPLICATION_ID, new CoverMergeOperation()),
-        new Column(ApplicationMappingTable.MAPPING_APPLICATION_ID, new CoverMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ApplicationMappingTable.APPLICATION_ID, new CoverMergeOperation()),
+        new IntegerColumn(ApplicationMappingTable.MAPPING_APPLICATION_ID, new CoverMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ApplicationMapping() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMetric.java
index d62e5d5f8..694ddefd1 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationMetric.java
@@ -18,12 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.application;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.FormulaOperation;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.AddMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 import org.apache.skywalking.apm.collector.storage.table.Metric;
 
@@ -32,44 +29,41 @@
  */
 public class ApplicationMetric extends StreamData implements Metric {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ApplicationMetricTable.ID, new NonMergeOperation()),
-        new Column(ApplicationMetricTable.METRIC_ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ApplicationMetricTable.ID, new NonMergeOperation()),
+        new StringColumn(ApplicationMetricTable.METRIC_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ApplicationMetricTable.TIME_BUCKET, new NonMergeOperation()),
-
-        new Column(ApplicationMetricTable.TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(ApplicationMetricTable.TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(ApplicationMetricTable.TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(ApplicationMetricTable.TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(ApplicationMetricTable.TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new TransactionAverageDurationFormulaOperation()),
-        new Column(ApplicationMetricTable.BUSINESS_TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(ApplicationMetricTable.BUSINESS_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(ApplicationMetricTable.BUSINESS_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(ApplicationMetricTable.BUSINESS_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(ApplicationMetricTable.BUSINESS_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new BusinessTransactionAverageDurationFormulaOperation()),
-        new Column(ApplicationMetricTable.MQ_TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(ApplicationMetricTable.MQ_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(ApplicationMetricTable.MQ_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(ApplicationMetricTable.MQ_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(ApplicationMetricTable.MQ_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new MqTransactionAverageDurationFormulaOperation()),
-        new Column(ApplicationMetricTable.SATISFIED_COUNT, new AddMergeOperation()),
-        new Column(ApplicationMetricTable.TOLERATING_COUNT, new AddMergeOperation()),
-        new Column(ApplicationMetricTable.FRUSTRATED_COUNT, new AddMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ApplicationMetricTable.TIME_BUCKET, new NonMergeOperation()),
+
+        new LongColumn(ApplicationMetricTable.TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(ApplicationMetricTable.TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(ApplicationMetricTable.TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ApplicationMetricTable.TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ApplicationMetricTable.TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new TransactionAverageDurationFormulaOperation()),
+        new LongColumn(ApplicationMetricTable.BUSINESS_TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(ApplicationMetricTable.BUSINESS_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(ApplicationMetricTable.BUSINESS_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ApplicationMetricTable.BUSINESS_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ApplicationMetricTable.BUSINESS_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new BusinessTransactionAverageDurationFormulaOperation()),
+        new LongColumn(ApplicationMetricTable.MQ_TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(ApplicationMetricTable.MQ_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(ApplicationMetricTable.MQ_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ApplicationMetricTable.MQ_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ApplicationMetricTable.MQ_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new MqTransactionAverageDurationFormulaOperation()),
+        new LongColumn(ApplicationMetricTable.SATISFIED_COUNT, new AddMergeOperation()),
+        new LongColumn(ApplicationMetricTable.TOLERATING_COUNT, new AddMergeOperation()),
+        new LongColumn(ApplicationMetricTable.FRUSTRATED_COUNT, new AddMergeOperation()),
     };
-    private static final Column[] DOUBLE_COLUMNS = {};
 
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ApplicationMetricTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(ApplicationMetricTable.APPLICATION_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ApplicationMetricTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(ApplicationMetricTable.APPLICATION_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ApplicationMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationReferenceMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationReferenceMetric.java
index 8124ef749..e0f994dca 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationReferenceMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/application/ApplicationReferenceMetric.java
@@ -18,12 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.application;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.FormulaOperation;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.AddMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 import org.apache.skywalking.apm.collector.storage.table.Metric;
 
@@ -32,47 +29,43 @@
  */
 public class ApplicationReferenceMetric extends StreamData implements Metric {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ApplicationReferenceMetricTable.ID, new NonMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.METRIC_ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ApplicationReferenceMetricTable.ID, new NonMergeOperation()),
+        new StringColumn(ApplicationReferenceMetricTable.METRIC_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ApplicationReferenceMetricTable.TIME_BUCKET, new NonMergeOperation()),
-
-        new Column(ApplicationReferenceMetricTable.TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new TransactionAverageDurationFormulaOperation()),
-        new Column(ApplicationReferenceMetricTable.BUSINESS_TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.BUSINESS_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.BUSINESS_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.BUSINESS_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.BUSINESS_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new BusinessTransactionAverageDurationFormulaOperation()),
-        new Column(ApplicationReferenceMetricTable.MQ_TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.MQ_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.MQ_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.MQ_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.MQ_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new MqTransactionAverageDurationFormulaOperation()),
-
-        new Column(ApplicationReferenceMetricTable.SATISFIED_COUNT, new AddMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.TOLERATING_COUNT, new AddMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.FRUSTRATED_COUNT, new AddMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ApplicationReferenceMetricTable.TIME_BUCKET, new NonMergeOperation()),
+
+        new LongColumn(ApplicationReferenceMetricTable.TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new TransactionAverageDurationFormulaOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.BUSINESS_TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.BUSINESS_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.BUSINESS_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.BUSINESS_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.BUSINESS_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new BusinessTransactionAverageDurationFormulaOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.MQ_TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.MQ_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.MQ_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.MQ_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.MQ_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new MqTransactionAverageDurationFormulaOperation()),
+
+        new LongColumn(ApplicationReferenceMetricTable.SATISFIED_COUNT, new AddMergeOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.TOLERATING_COUNT, new AddMergeOperation()),
+        new LongColumn(ApplicationReferenceMetricTable.FRUSTRATED_COUNT, new AddMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ApplicationReferenceMetricTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
-        new Column(ApplicationReferenceMetricTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ApplicationReferenceMetricTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(ApplicationReferenceMetricTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(ApplicationReferenceMetricTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ApplicationReferenceMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/GlobalTrace.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/GlobalTrace.java
index 4184d2401..44bb44cc7 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/GlobalTrace.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/GlobalTrace.java
@@ -18,35 +18,27 @@
 
 package org.apache.skywalking.apm.collector.storage.table.global;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 
 /**
  * @author peng-yongsheng
  */
 public class GlobalTrace extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(GlobalTraceTable.ID, new NonMergeOperation()),
-        new Column(GlobalTraceTable.SEGMENT_ID, new CoverMergeOperation()),
-        new Column(GlobalTraceTable.TRACE_ID, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(GlobalTraceTable.ID, new NonMergeOperation()),
+        new StringColumn(GlobalTraceTable.SEGMENT_ID, new CoverMergeOperation()),
+        new StringColumn(GlobalTraceTable.TRACE_ID, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(GlobalTraceTable.TIME_BUCKET, new CoverMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(GlobalTraceTable.TIME_BUCKET, new CoverMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-    };
-
-    private static final Column[] BYTE_COLUMNS = {};
-
     public GlobalTrace() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, new IntegerColumn[0], new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/ResponseTimeDistribution.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/ResponseTimeDistribution.java
index c4e8e6d97..1974509a8 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/ResponseTimeDistribution.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/global/ResponseTimeDistribution.java
@@ -18,12 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.global;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.AddMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
@@ -31,28 +28,24 @@
  */
 public class ResponseTimeDistribution extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ResponseTimeDistributionTable.ID, new NonMergeOperation()),
-        new Column(ResponseTimeDistributionTable.METRIC_ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ResponseTimeDistributionTable.ID, new NonMergeOperation()),
+        new StringColumn(ResponseTimeDistributionTable.METRIC_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ResponseTimeDistributionTable.TIME_BUCKET, new CoverMergeOperation()),
-        new Column(ResponseTimeDistributionTable.CALLS, new AddMergeOperation()),
-        new Column(ResponseTimeDistributionTable.ERROR_CALLS, new AddMergeOperation()),
-        new Column(ResponseTimeDistributionTable.SUCCESS_CALLS, new AddMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ResponseTimeDistributionTable.TIME_BUCKET, new CoverMergeOperation()),
+        new LongColumn(ResponseTimeDistributionTable.CALLS, new AddMergeOperation()),
+        new LongColumn(ResponseTimeDistributionTable.ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(ResponseTimeDistributionTable.SUCCESS_CALLS, new AddMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ResponseTimeDistributionTable.STEP, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ResponseTimeDistributionTable.STEP, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ResponseTimeDistribution() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMapping.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMapping.java
index 0b4677d18..518366327 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMapping.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMapping.java
@@ -18,11 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.instance;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
@@ -30,27 +28,23 @@
  */
 public class InstanceMapping extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(InstanceMappingTable.ID, new NonMergeOperation()),
-        new Column(InstanceMappingTable.METRIC_ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(InstanceMappingTable.ID, new NonMergeOperation()),
+        new StringColumn(InstanceMappingTable.METRIC_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(InstanceMappingTable.TIME_BUCKET, new CoverMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(InstanceMappingTable.TIME_BUCKET, new CoverMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(InstanceMappingTable.APPLICATION_ID, new CoverMergeOperation()),
-        new Column(InstanceMappingTable.INSTANCE_ID, new CoverMergeOperation()),
-        new Column(InstanceMappingTable.ADDRESS_ID, new CoverMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(InstanceMappingTable.APPLICATION_ID, new CoverMergeOperation()),
+        new IntegerColumn(InstanceMappingTable.INSTANCE_ID, new CoverMergeOperation()),
+        new IntegerColumn(InstanceMappingTable.ADDRESS_ID, new CoverMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public InstanceMapping() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMetric.java
index 384b9436c..8ffe0d8ee 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceMetric.java
@@ -18,13 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.instance;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.FormulaOperation;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.AddMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 import org.apache.skywalking.apm.collector.storage.table.Metric;
 
@@ -33,43 +29,39 @@
  */
 public class InstanceMetric extends StreamData implements Metric {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(InstanceMetricTable.ID, new NonMergeOperation()),
-        new Column(InstanceMetricTable.METRIC_ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(InstanceMetricTable.ID, new NonMergeOperation()),
+        new StringColumn(InstanceMetricTable.METRIC_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(InstanceMetricTable.TIME_BUCKET, new NonMergeOperation()),
-
-        new Column(InstanceMetricTable.TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(InstanceMetricTable.TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(InstanceMetricTable.TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(InstanceMetricTable.TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(InstanceMetricTable.TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new TransactionAverageDurationFormulaOperation()),
-        new Column(InstanceMetricTable.BUSINESS_TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(InstanceMetricTable.BUSINESS_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(InstanceMetricTable.BUSINESS_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(InstanceMetricTable.BUSINESS_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(InstanceMetricTable.BUSINESS_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new BusinessTransactionAverageDurationFormulaOperation()),
-        new Column(InstanceMetricTable.MQ_TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(InstanceMetricTable.MQ_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(InstanceMetricTable.MQ_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(InstanceMetricTable.MQ_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(InstanceMetricTable.MQ_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new MqTransactionAverageDurationFormulaOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(InstanceMetricTable.TIME_BUCKET, new NonMergeOperation()),
+
+        new LongColumn(InstanceMetricTable.TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(InstanceMetricTable.TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(InstanceMetricTable.TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(InstanceMetricTable.TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(InstanceMetricTable.TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new TransactionAverageDurationFormulaOperation()),
+        new LongColumn(InstanceMetricTable.BUSINESS_TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(InstanceMetricTable.BUSINESS_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(InstanceMetricTable.BUSINESS_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(InstanceMetricTable.BUSINESS_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(InstanceMetricTable.BUSINESS_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new BusinessTransactionAverageDurationFormulaOperation()),
+        new LongColumn(InstanceMetricTable.MQ_TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(InstanceMetricTable.MQ_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(InstanceMetricTable.MQ_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(InstanceMetricTable.MQ_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(InstanceMetricTable.MQ_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new MqTransactionAverageDurationFormulaOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(InstanceMetricTable.SOURCE_VALUE, new CoverMergeOperation()),
-        new Column(InstanceMetricTable.APPLICATION_ID, new CoverMergeOperation()),
-        new Column(InstanceMetricTable.INSTANCE_ID, new CoverMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(InstanceMetricTable.SOURCE_VALUE, new CoverMergeOperation()),
+        new IntegerColumn(InstanceMetricTable.APPLICATION_ID, new CoverMergeOperation()),
+        new IntegerColumn(InstanceMetricTable.INSTANCE_ID, new CoverMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public InstanceMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java
index 75c26f8f6..822d8e717 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/instance/InstanceReferenceMetric.java
@@ -18,12 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.instance;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.FormulaOperation;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.AddMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 import org.apache.skywalking.apm.collector.storage.table.Metric;
 
@@ -32,45 +29,41 @@
  */
 public class InstanceReferenceMetric extends StreamData implements Metric {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(InstanceReferenceMetricTable.ID, new NonMergeOperation()),
-        new Column(InstanceReferenceMetricTable.METRIC_ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(InstanceReferenceMetricTable.ID, new NonMergeOperation()),
+        new StringColumn(InstanceReferenceMetricTable.METRIC_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(InstanceReferenceMetricTable.TIME_BUCKET, new NonMergeOperation()),
-
-        new Column(InstanceReferenceMetricTable.TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(InstanceReferenceMetricTable.TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(InstanceReferenceMetricTable.TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(InstanceReferenceMetricTable.TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(InstanceReferenceMetricTable.TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new TransactionAverageDurationFormulaOperation()),
-        new Column(InstanceReferenceMetricTable.BUSINESS_TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(InstanceReferenceMetricTable.BUSINESS_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(InstanceReferenceMetricTable.BUSINESS_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(InstanceReferenceMetricTable.BUSINESS_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(InstanceReferenceMetricTable.BUSINESS_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new BusinessTransactionAverageDurationFormulaOperation()),
-        new Column(InstanceReferenceMetricTable.MQ_TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(InstanceReferenceMetricTable.MQ_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(InstanceReferenceMetricTable.MQ_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(InstanceReferenceMetricTable.MQ_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(InstanceReferenceMetricTable.MQ_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new MqTransactionAverageDurationFormulaOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(InstanceReferenceMetricTable.TIME_BUCKET, new NonMergeOperation()),
+
+        new LongColumn(InstanceReferenceMetricTable.TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(InstanceReferenceMetricTable.TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(InstanceReferenceMetricTable.TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(InstanceReferenceMetricTable.TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(InstanceReferenceMetricTable.TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new TransactionAverageDurationFormulaOperation()),
+        new LongColumn(InstanceReferenceMetricTable.BUSINESS_TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(InstanceReferenceMetricTable.BUSINESS_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(InstanceReferenceMetricTable.BUSINESS_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(InstanceReferenceMetricTable.BUSINESS_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(InstanceReferenceMetricTable.BUSINESS_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new BusinessTransactionAverageDurationFormulaOperation()),
+        new LongColumn(InstanceReferenceMetricTable.MQ_TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(InstanceReferenceMetricTable.MQ_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(InstanceReferenceMetricTable.MQ_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(InstanceReferenceMetricTable.MQ_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(InstanceReferenceMetricTable.MQ_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new MqTransactionAverageDurationFormulaOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(InstanceReferenceMetricTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(InstanceReferenceMetricTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
-        new Column(InstanceReferenceMetricTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
-        new Column(InstanceReferenceMetricTable.FRONT_INSTANCE_ID, new NonMergeOperation()),
-        new Column(InstanceReferenceMetricTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(InstanceReferenceMetricTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(InstanceReferenceMetricTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(InstanceReferenceMetricTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(InstanceReferenceMetricTable.FRONT_INSTANCE_ID, new NonMergeOperation()),
+        new IntegerColumn(InstanceReferenceMetricTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public InstanceReferenceMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/CpuMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/CpuMetric.java
index 269352a35..36ee0ee3d 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/CpuMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/CpuMetric.java
@@ -18,39 +18,35 @@
 
 package org.apache.skywalking.apm.collector.storage.table.jvm;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.AddMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 
 /**
  * @author peng-yongsheng
  */
 public class CpuMetric extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(CpuMetricTable.ID, new NonMergeOperation()),
-        new Column(CpuMetricTable.METRIC_ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(CpuMetricTable.ID, new NonMergeOperation()),
+        new StringColumn(CpuMetricTable.METRIC_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(CpuMetricTable.TIMES, new AddMergeOperation()),
-        new Column(CpuMetricTable.TIME_BUCKET, new CoverMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(CpuMetricTable.TIMES, new AddMergeOperation()),
+        new LongColumn(CpuMetricTable.TIME_BUCKET, new CoverMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {
-        new Column(CpuMetricTable.USAGE_PERCENT, new AddMergeOperation()),
+    private static final DoubleColumn[] DOUBLE_COLUMNS = {
+        new DoubleColumn(CpuMetricTable.USAGE_PERCENT, new AddMergeOperation()),
     };
 
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(CpuMetricTable.INSTANCE_ID, new CoverMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(CpuMetricTable.INSTANCE_ID, new CoverMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public CpuMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, DOUBLE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/GCMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/GCMetric.java
index d032bd6c2..76d4cc6b4 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/GCMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/GCMetric.java
@@ -18,7 +18,8 @@
 
 package org.apache.skywalking.apm.collector.storage.table.jvm;
 
-import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.StreamData;
+import org.apache.skywalking.apm.collector.core.data.column.*;
 import org.apache.skywalking.apm.collector.core.data.operator.*;
 
 /**
@@ -26,30 +27,25 @@
  */
 public class GCMetric extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(GCMetricTable.ID, new NonMergeOperation()),
-        new Column(GCMetricTable.METRIC_ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(GCMetricTable.ID, new NonMergeOperation()),
+        new StringColumn(GCMetricTable.METRIC_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(GCMetricTable.COUNT, new AddMergeOperation()),
-        new Column(GCMetricTable.TIMES, new AddMergeOperation()),
-        new Column(GCMetricTable.TIME_BUCKET, new CoverMergeOperation()),
-        new Column(GCMetricTable.DURATION, new AddMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(GCMetricTable.COUNT, new AddMergeOperation()),
+        new LongColumn(GCMetricTable.TIMES, new AddMergeOperation()),
+        new LongColumn(GCMetricTable.TIME_BUCKET, new CoverMergeOperation()),
+        new LongColumn(GCMetricTable.DURATION, new AddMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(GCMetricTable.INSTANCE_ID, new CoverMergeOperation()),
+        new IntegerColumn(GCMetricTable.PHRASE, new CoverMergeOperation()),
     };
 
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(GCMetricTable.INSTANCE_ID, new CoverMergeOperation()),
-        new Column(GCMetricTable.PHRASE, new CoverMergeOperation()),
-    };
-
-    private static final Column[] BYTE_COLUMNS = {};
-
     public GCMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryMetric.java
index 444924a68..b37514a5e 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryMetric.java
@@ -18,45 +18,36 @@
 
 package org.apache.skywalking.apm.collector.storage.table.jvm;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.AddMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.MaxMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.MinMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 
 /**
  * @author peng-yongsheng
  */
 public class MemoryMetric extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(MemoryMetricTable.ID, new NonMergeOperation()),
-        new Column(MemoryMetricTable.METRIC_ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(MemoryMetricTable.ID, new NonMergeOperation()),
+        new StringColumn(MemoryMetricTable.METRIC_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(MemoryMetricTable.INIT, new MinMergeOperation()),
-        new Column(MemoryMetricTable.MAX, new MaxMergeOperation()),
-        new Column(MemoryMetricTable.USED, new AddMergeOperation()),
-        new Column(MemoryMetricTable.COMMITTED, new AddMergeOperation()),
-        new Column(MemoryMetricTable.TIMES, new AddMergeOperation()),
-        new Column(MemoryMetricTable.TIME_BUCKET, new NonMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(MemoryMetricTable.INIT, new MinMergeOperation()),
+        new LongColumn(MemoryMetricTable.MAX, new MaxMergeOperation()),
+        new LongColumn(MemoryMetricTable.USED, new AddMergeOperation()),
+        new LongColumn(MemoryMetricTable.COMMITTED, new AddMergeOperation()),
+        new LongColumn(MemoryMetricTable.TIMES, new AddMergeOperation()),
+        new LongColumn(MemoryMetricTable.TIME_BUCKET, new NonMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(MemoryMetricTable.INSTANCE_ID, new CoverMergeOperation()),
+        new IntegerColumn(MemoryMetricTable.IS_HEAP, new CoverMergeOperation()),
     };
 
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(MemoryMetricTable.INSTANCE_ID, new CoverMergeOperation()),
-        new Column(MemoryMetricTable.IS_HEAP, new CoverMergeOperation()),
-    };
-
-    private static final Column[] BYTE_COLUMNS = {};
-
     public MemoryMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryPoolMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryPoolMetric.java
index e47876be1..54181360f 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryPoolMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/jvm/MemoryPoolMetric.java
@@ -18,45 +18,36 @@
 
 package org.apache.skywalking.apm.collector.storage.table.jvm;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.AddMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.MaxMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.MinMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 
 /**
  * @author peng-yongsheng
  */
 public class MemoryPoolMetric extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(MemoryPoolMetricTable.ID, new NonMergeOperation()),
-        new Column(MemoryPoolMetricTable.METRIC_ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(MemoryPoolMetricTable.ID, new NonMergeOperation()),
+        new StringColumn(MemoryPoolMetricTable.METRIC_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(MemoryPoolMetricTable.INIT, new MinMergeOperation()),
-        new Column(MemoryPoolMetricTable.MAX, new MaxMergeOperation()),
-        new Column(MemoryPoolMetricTable.USED, new AddMergeOperation()),
-        new Column(MemoryPoolMetricTable.COMMITTED, new AddMergeOperation()),
-        new Column(MemoryPoolMetricTable.TIMES, new AddMergeOperation()),
-        new Column(MemoryPoolMetricTable.TIME_BUCKET, new NonMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(MemoryPoolMetricTable.INIT, new MinMergeOperation()),
+        new LongColumn(MemoryPoolMetricTable.MAX, new MaxMergeOperation()),
+        new LongColumn(MemoryPoolMetricTable.USED, new AddMergeOperation()),
+        new LongColumn(MemoryPoolMetricTable.COMMITTED, new AddMergeOperation()),
+        new LongColumn(MemoryPoolMetricTable.TIMES, new AddMergeOperation()),
+        new LongColumn(MemoryPoolMetricTable.TIME_BUCKET, new NonMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(MemoryPoolMetricTable.INSTANCE_ID, new NonMergeOperation()),
+        new IntegerColumn(MemoryPoolMetricTable.POOL_TYPE, new CoverMergeOperation()),
     };
 
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(MemoryPoolMetricTable.INSTANCE_ID, new NonMergeOperation()),
-        new Column(MemoryPoolMetricTable.POOL_TYPE, new CoverMergeOperation()),
-    };
-
-    private static final Column[] BYTE_COLUMNS = {};
-
     public MemoryPoolMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Application.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Application.java
index 930462c7c..928d0324d 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Application.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Application.java
@@ -18,11 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.register;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
@@ -30,25 +28,19 @@
  */
 public class Application extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ApplicationTable.ID, new NonMergeOperation()),
-        new Column(ApplicationTable.APPLICATION_CODE, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ApplicationTable.ID, new NonMergeOperation()),
+        new StringColumn(ApplicationTable.APPLICATION_CODE, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {};
-
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ApplicationTable.APPLICATION_ID, new CoverMergeOperation()),
-        new Column(ApplicationTable.ADDRESS_ID, new CoverMergeOperation()),
-        new Column(ApplicationTable.IS_ADDRESS, new CoverMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ApplicationTable.APPLICATION_ID, new CoverMergeOperation()),
+        new IntegerColumn(ApplicationTable.ADDRESS_ID, new CoverMergeOperation()),
+        new IntegerColumn(ApplicationTable.IS_ADDRESS, new CoverMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public Application() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, new LongColumn[0], INTEGER_COLUMNS, new DoubleColumn[0], new StringListColumn[0], new LongListColumn[0], new IntegerListColumn[0], new DoubleListColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Instance.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Instance.java
index 6e3d0b936..e8a78cb14 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Instance.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/Instance.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.apm.collector.storage.table.register;
 
 import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
 import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
@@ -27,31 +28,27 @@
  */
 public class Instance extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(InstanceTable.ID, new NonMergeOperation()),
-        new Column(InstanceTable.AGENT_UUID, new CoverMergeOperation()),
-        new Column(InstanceTable.OS_INFO, new CoverMergeOperation()),
-        new Column(InstanceTable.APPLICATION_CODE, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(InstanceTable.ID, new NonMergeOperation()),
+        new StringColumn(InstanceTable.AGENT_UUID, new CoverMergeOperation()),
+        new StringColumn(InstanceTable.OS_INFO, new CoverMergeOperation()),
+        new StringColumn(InstanceTable.APPLICATION_CODE, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(InstanceTable.REGISTER_TIME, new CoverMergeOperation()),
-        new Column(InstanceTable.HEARTBEAT_TIME, new MaxMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(InstanceTable.REGISTER_TIME, new CoverMergeOperation()),
+        new LongColumn(InstanceTable.HEARTBEAT_TIME, new MaxMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(InstanceTable.APPLICATION_ID, new CoverMergeOperation()),
-        new Column(InstanceTable.INSTANCE_ID, new CoverMergeOperation()),
-        new Column(InstanceTable.ADDRESS_ID, new CoverMergeOperation()),
-        new Column(InstanceTable.IS_ADDRESS, new CoverMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(InstanceTable.APPLICATION_ID, new CoverMergeOperation()),
+        new IntegerColumn(InstanceTable.INSTANCE_ID, new CoverMergeOperation()),
+        new IntegerColumn(InstanceTable.ADDRESS_ID, new CoverMergeOperation()),
+        new IntegerColumn(InstanceTable.IS_ADDRESS, new CoverMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public Instance() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/NetworkAddress.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/NetworkAddress.java
index 8e54a16c1..831f1135a 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/NetworkAddress.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/NetworkAddress.java
@@ -18,11 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.register;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
 /**
@@ -30,26 +28,20 @@
  */
 public class NetworkAddress extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(NetworkAddressTable.ID, new NonMergeOperation()),
-        new Column(NetworkAddressTable.NETWORK_ADDRESS, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(NetworkAddressTable.ID, new NonMergeOperation()),
+        new StringColumn(NetworkAddressTable.NETWORK_ADDRESS, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(NetworkAddressTable.ADDRESS_ID, new NonMergeOperation()),
+        new IntegerColumn(NetworkAddressTable.SRC_SPAN_LAYER, new CoverMergeOperation()),
+        new IntegerColumn(NetworkAddressTable.SERVER_TYPE, new CoverMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(NetworkAddressTable.ADDRESS_ID, new NonMergeOperation()),
-        new Column(NetworkAddressTable.SRC_SPAN_LAYER, new CoverMergeOperation()),
-        new Column(NetworkAddressTable.SERVER_TYPE, new CoverMergeOperation()),
-    };
-
-    private static final Column[] BYTE_COLUMNS = {};
-
     public NetworkAddress() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, new LongColumn[0], INTEGER_COLUMNS, new DoubleColumn[0], new StringListColumn[0], new LongListColumn[0], new IntegerListColumn[0], new DoubleListColumn[0]);
+
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/ServiceName.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/ServiceName.java
index 6f860b023..d04a7f054 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/ServiceName.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/register/ServiceName.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.apm.collector.storage.table.register;
 
 import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
 import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 
@@ -27,28 +28,24 @@
  */
 public class ServiceName extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ServiceNameTable.ID, new NonMergeOperation()),
-        new Column(ServiceNameTable.SERVICE_NAME, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ServiceNameTable.ID, new NonMergeOperation()),
+        new StringColumn(ServiceNameTable.SERVICE_NAME, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ServiceNameTable.REGISTER_TIME, new NonMergeOperation()),
-        new Column(ServiceNameTable.HEARTBEAT_TIME, new MaxMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ServiceNameTable.REGISTER_TIME, new NonMergeOperation()),
+        new LongColumn(ServiceNameTable.HEARTBEAT_TIME, new MaxMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ServiceNameTable.APPLICATION_ID, new CoverMergeOperation()),
-        new Column(ServiceNameTable.SERVICE_ID, new CoverMergeOperation()),
-        new Column(ServiceNameTable.SRC_SPAN_TYPE, new CoverMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ServiceNameTable.APPLICATION_ID, new CoverMergeOperation()),
+        new IntegerColumn(ServiceNameTable.SERVICE_ID, new CoverMergeOperation()),
+        new IntegerColumn(ServiceNameTable.SRC_SPAN_TYPE, new CoverMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ServiceName() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/segment/Segment.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/segment/Segment.java
index f9ee9f737..3a411c706 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/segment/Segment.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/segment/Segment.java
@@ -18,35 +18,29 @@
 
 package org.apache.skywalking.apm.collector.storage.table.segment;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
 import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 
 /**
  * @author peng-yongsheng
  */
 public class Segment extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(SegmentTable.ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(SegmentTable.ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(SegmentTable.TIME_BUCKET, new NonMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(SegmentTable.TIME_BUCKET, new NonMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-    };
-
-    private static final Column[] BYTE_COLUMNS = {
-        new Column(SegmentTable.DATA_BINARY, new CoverMergeOperation()),
+    private static final ByteColumn[] BYTE_COLUMNS = {
+        new ByteColumn(SegmentTable.DATA_BINARY, new CoverMergeOperation()),
     };
 
     public Segment() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, new IntegerColumn[0], new DoubleColumn[0], BYTE_COLUMNS);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/segment/SegmentDuration.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/segment/SegmentDuration.java
index a309c2d7d..f3af6861b 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/segment/SegmentDuration.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/segment/SegmentDuration.java
@@ -18,41 +18,39 @@
 
 package org.apache.skywalking.apm.collector.storage.table.segment;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 
 /**
  * @author peng-yongsheng
  */
 public class SegmentDuration extends StreamData {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(SegmentDurationTable.ID, new NonMergeOperation()),
-        new Column(SegmentDurationTable.SEGMENT_ID, new CoverMergeOperation()),
-        new Column(SegmentDurationTable.SERVICE_NAME, new CoverMergeOperation()),
-        new Column(SegmentDurationTable.TRACE_ID, new CoverMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(SegmentDurationTable.ID, new NonMergeOperation()),
+        new StringColumn(SegmentDurationTable.SEGMENT_ID, new CoverMergeOperation()),
+        new StringColumn(SegmentDurationTable.TRACE_ID, new CoverMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(SegmentDurationTable.DURATION, new CoverMergeOperation()),
-        new Column(SegmentDurationTable.START_TIME, new CoverMergeOperation()),
-        new Column(SegmentDurationTable.END_TIME, new CoverMergeOperation()),
-        new Column(SegmentDurationTable.TIME_BUCKET, new CoverMergeOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(SegmentDurationTable.DURATION, new CoverMergeOperation()),
+        new LongColumn(SegmentDurationTable.START_TIME, new CoverMergeOperation()),
+        new LongColumn(SegmentDurationTable.END_TIME, new CoverMergeOperation()),
+        new LongColumn(SegmentDurationTable.TIME_BUCKET, new CoverMergeOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(SegmentDurationTable.APPLICATION_ID, new CoverMergeOperation()),
-        new Column(SegmentDurationTable.IS_ERROR, new CoverMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(SegmentDurationTable.APPLICATION_ID, new CoverMergeOperation()),
+        new IntegerColumn(SegmentDurationTable.IS_ERROR, new CoverMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
+    private static final StringListColumn[] STRING_LIST_COLUMNS = {
+        new StringListColumn(SegmentDurationTable.SERVICE_NAME, new CoverMergeOperation()),
+    };
 
     public SegmentDuration() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0], STRING_LIST_COLUMNS, new LongListColumn[0], new IntegerListColumn[0], new DoubleListColumn[0]);
     }
 
     @Override public String getId() {
@@ -79,20 +77,16 @@ public void setSegmentId(String segmentId) {
         setDataString(1, segmentId);
     }
 
-    public String getServiceName() {
+    public String getTraceId() {
         return getDataString(2);
     }
 
-    public void setServiceName(String serviceName) {
-        setDataString(2, serviceName);
-    }
-
-    public String getTraceId() {
-        return getDataString(3);
+    public void setTraceId(String traceId) {
+        setDataString(2, traceId);
     }
 
-    public void setTraceId(String traceId) {
-        setDataString(3, traceId);
+    public StringLinkedList getServiceName() {
+        return getDataStringList(0);
     }
 
     public Long getDuration() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceMetric.java
index fc3f6b7e0..91cdf74d3 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceMetric.java
@@ -18,12 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.service;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.FormulaOperation;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.AddMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 import org.apache.skywalking.apm.collector.storage.table.Metric;
 
@@ -32,44 +29,40 @@
  */
 public class ServiceMetric extends StreamData implements Metric {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ServiceMetricTable.ID, new NonMergeOperation()),
-        new Column(ServiceMetricTable.METRIC_ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ServiceMetricTable.ID, new NonMergeOperation()),
+        new StringColumn(ServiceMetricTable.METRIC_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ServiceMetricTable.TIME_BUCKET, new NonMergeOperation()),
-
-        new Column(ServiceMetricTable.TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(ServiceMetricTable.TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(ServiceMetricTable.TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(ServiceMetricTable.TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(ServiceMetricTable.TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new TransactionAverageDurationFormulaOperation()),
-        new Column(ServiceMetricTable.BUSINESS_TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(ServiceMetricTable.BUSINESS_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(ServiceMetricTable.BUSINESS_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(ServiceMetricTable.BUSINESS_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(ServiceMetricTable.BUSINESS_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new BusinessTransactionAverageDurationFormulaOperation()),
-        new Column(ServiceMetricTable.MQ_TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(ServiceMetricTable.MQ_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(ServiceMetricTable.MQ_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(ServiceMetricTable.MQ_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(ServiceMetricTable.MQ_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new MqTransactionAverageDurationFormulaOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ServiceMetricTable.TIME_BUCKET, new NonMergeOperation()),
+
+        new LongColumn(ServiceMetricTable.TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(ServiceMetricTable.TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(ServiceMetricTable.TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ServiceMetricTable.TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ServiceMetricTable.TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new TransactionAverageDurationFormulaOperation()),
+        new LongColumn(ServiceMetricTable.BUSINESS_TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(ServiceMetricTable.BUSINESS_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(ServiceMetricTable.BUSINESS_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ServiceMetricTable.BUSINESS_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ServiceMetricTable.BUSINESS_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new BusinessTransactionAverageDurationFormulaOperation()),
+        new LongColumn(ServiceMetricTable.MQ_TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(ServiceMetricTable.MQ_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(ServiceMetricTable.MQ_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ServiceMetricTable.MQ_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ServiceMetricTable.MQ_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new MqTransactionAverageDurationFormulaOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
-
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ServiceMetricTable.SOURCE_VALUE, new NonMergeOperation()),
-        new Column(ServiceMetricTable.APPLICATION_ID, new NonMergeOperation()),
-        new Column(ServiceMetricTable.INSTANCE_ID, new NonMergeOperation()),
-        new Column(ServiceMetricTable.SERVICE_ID, new NonMergeOperation()),
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ServiceMetricTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(ServiceMetricTable.APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceMetricTable.INSTANCE_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceMetricTable.SERVICE_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ServiceMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0]);
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceReferenceMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceReferenceMetric.java
index 638a37520..e644e02ee 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceReferenceMetric.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/table/service/ServiceReferenceMetric.java
@@ -18,12 +18,9 @@
 
 package org.apache.skywalking.apm.collector.storage.table.service;
 
-import org.apache.skywalking.apm.collector.core.data.Column;
-import org.apache.skywalking.apm.collector.core.data.FormulaOperation;
-import org.apache.skywalking.apm.collector.core.data.RemoteData;
-import org.apache.skywalking.apm.collector.core.data.StreamData;
-import org.apache.skywalking.apm.collector.core.data.operator.AddMergeOperation;
-import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
+import org.apache.skywalking.apm.collector.core.data.*;
+import org.apache.skywalking.apm.collector.core.data.column.*;
+import org.apache.skywalking.apm.collector.core.data.operator.*;
 import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
 import org.apache.skywalking.apm.collector.storage.table.Metric;
 
@@ -32,50 +29,47 @@
  */
 public class ServiceReferenceMetric extends StreamData implements Metric {
 
-    private static final Column[] STRING_COLUMNS = {
-        new Column(ServiceReferenceMetricTable.ID, new NonMergeOperation()),
-        new Column(ServiceReferenceMetricTable.METRIC_ID, new NonMergeOperation()),
+    private static final StringColumn[] STRING_COLUMNS = {
+        new StringColumn(ServiceReferenceMetricTable.ID, new NonMergeOperation()),
+        new StringColumn(ServiceReferenceMetricTable.METRIC_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] LONG_COLUMNS = {
-        new Column(ServiceReferenceMetricTable.TIME_BUCKET, new NonMergeOperation()),
-
-        new Column(ServiceReferenceMetricTable.TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(ServiceReferenceMetricTable.TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(ServiceReferenceMetricTable.TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(ServiceReferenceMetricTable.TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(ServiceReferenceMetricTable.TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new TransactionAverageDurationFormulaOperation()),
-        new Column(ServiceReferenceMetricTable.BUSINESS_TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(ServiceReferenceMetricTable.BUSINESS_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(ServiceReferenceMetricTable.BUSINESS_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(ServiceReferenceMetricTable.BUSINESS_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(ServiceReferenceMetricTable.BUSINESS_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new BusinessTransactionAverageDurationFormulaOperation()),
-        new Column(ServiceReferenceMetricTable.MQ_TRANSACTION_CALLS, new AddMergeOperation()),
-        new Column(ServiceReferenceMetricTable.MQ_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
-        new Column(ServiceReferenceMetricTable.MQ_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
-        new Column(ServiceReferenceMetricTable.MQ_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
-        new Column(ServiceReferenceMetricTable.MQ_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new MqTransactionAverageDurationFormulaOperation()),
+    private static final LongColumn[] LONG_COLUMNS = {
+        new LongColumn(ServiceReferenceMetricTable.TIME_BUCKET, new NonMergeOperation()),
+
+        new LongColumn(ServiceReferenceMetricTable.TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(ServiceReferenceMetricTable.TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(ServiceReferenceMetricTable.TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ServiceReferenceMetricTable.TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ServiceReferenceMetricTable.TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new TransactionAverageDurationFormulaOperation()),
+        new LongColumn(ServiceReferenceMetricTable.BUSINESS_TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(ServiceReferenceMetricTable.BUSINESS_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(ServiceReferenceMetricTable.BUSINESS_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ServiceReferenceMetricTable.BUSINESS_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ServiceReferenceMetricTable.BUSINESS_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new BusinessTransactionAverageDurationFormulaOperation()),
+        new LongColumn(ServiceReferenceMetricTable.MQ_TRANSACTION_CALLS, new AddMergeOperation()),
+        new LongColumn(ServiceReferenceMetricTable.MQ_TRANSACTION_ERROR_CALLS, new AddMergeOperation()),
+        new LongColumn(ServiceReferenceMetricTable.MQ_TRANSACTION_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ServiceReferenceMetricTable.MQ_TRANSACTION_ERROR_DURATION_SUM, new AddMergeOperation()),
+        new LongColumn(ServiceReferenceMetricTable.MQ_TRANSACTION_AVERAGE_DURATION, new NonMergeOperation(), new MqTransactionAverageDurationFormulaOperation()),
     };
 
-    private static final Column[] DOUBLE_COLUMNS = {};
+    private static final IntegerColumn[] INTEGER_COLUMNS = {
+        new IntegerColumn(ServiceReferenceMetricTable.SOURCE_VALUE, new NonMergeOperation()),
 
-    private static final Column[] INTEGER_COLUMNS = {
-        new Column(ServiceReferenceMetricTable.SOURCE_VALUE, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceMetricTable.FRONT_SERVICE_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceMetricTable.BEHIND_SERVICE_ID, new NonMergeOperation()),
 
-        new Column(ServiceReferenceMetricTable.FRONT_SERVICE_ID, new NonMergeOperation()),
-        new Column(ServiceReferenceMetricTable.BEHIND_SERVICE_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceMetricTable.FRONT_INSTANCE_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceMetricTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
 
-        new Column(ServiceReferenceMetricTable.FRONT_INSTANCE_ID, new NonMergeOperation()),
-        new Column(ServiceReferenceMetricTable.BEHIND_INSTANCE_ID, new NonMergeOperation()),
-
-        new Column(ServiceReferenceMetricTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
-        new Column(ServiceReferenceMetricTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceMetricTable.FRONT_APPLICATION_ID, new NonMergeOperation()),
+        new IntegerColumn(ServiceReferenceMetricTable.BEHIND_APPLICATION_ID, new NonMergeOperation()),
     };
 
-    private static final Column[] BYTE_COLUMNS = {};
-
     public ServiceReferenceMetric() {
-        super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BYTE_COLUMNS);
+        super(STRING_COLUMNS, LONG_COLUMNS, INTEGER_COLUMNS, new DoubleColumn[0], new StringListColumn[0], new LongListColumn[0], new IntegerListColumn[0], new DoubleListColumn[0]);
+
     }
 
     @Override public String getId() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/ui/trace/BasicTrace.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/ui/trace/BasicTrace.java
index 0d21d3ee3..4f99c6f74 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/ui/trace/BasicTrace.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/ui/trace/BasicTrace.java
@@ -18,8 +18,7 @@
 
 package org.apache.skywalking.apm.collector.storage.ui.trace;
 
-import java.util.LinkedList;
-import java.util.List;
+import java.util.*;
 
 /**
  * @author peng-yongsheng
@@ -27,12 +26,17 @@
 public class BasicTrace {
 
     private String segmentId;
-    private String operationName;
+    private List<String> operationNames;
     private int duration;
     private long start;
     private Boolean isError;
     private List<String> traceIds;
 
+    public BasicTrace() {
+        this.traceIds = new LinkedList<>();
+        this.operationNames = new LinkedList<>();
+    }
+
     public String getSegmentId() {
         return segmentId;
     }
@@ -41,16 +45,8 @@ public void setSegmentId(String segmentId) {
         this.segmentId = segmentId;
     }
 
-    public BasicTrace() {
-        this.traceIds = new LinkedList<>();
-    }
-
-    public String getOperationName() {
-        return operationName;
-    }
-
-    public void setOperationName(String operationName) {
-        this.operationName = operationName;
+    public List<String> getOperationName() {
+        return operationNames;
     }
 
     public int getDuration() {
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentDurationEsPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentDurationEsPersistenceDAO.java
index 30de27dbf..24a739dac 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentDurationEsPersistenceDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/SegmentDurationEsPersistenceDAO.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao;
 
+import com.google.gson.Gson;
 import java.util.*;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.storage.dao.ISegmentDurationPersistenceDAO;
@@ -36,6 +37,8 @@
 
     private final Logger logger = LoggerFactory.getLogger(SegmentDurationEsPersistenceDAO.class);
 
+    private final Gson gson = new Gson();
+
     public SegmentDurationEsPersistenceDAO(ElasticSearchClient client) {
         super(client);
     }
@@ -55,7 +58,7 @@ public IndexRequestBuilder prepareBatchInsert(SegmentDuration data) {
         Map<String, Object> target = new HashMap<>();
         target.put(SegmentDurationTable.SEGMENT_ID.getName(), data.getSegmentId());
         target.put(SegmentDurationTable.APPLICATION_ID.getName(), data.getApplicationId());
-        target.put(SegmentDurationTable.SERVICE_NAME.getName(), data.getServiceName());
+        target.put(SegmentDurationTable.SERVICE_NAME.getName(), gson.toJson(data.getServiceName()));
         target.put(SegmentDurationTable.DURATION.getName(), data.getDuration());
         target.put(SegmentDurationTable.START_TIME.getName(), data.getStartTime());
         target.put(SegmentDurationTable.END_TIME.getName(), data.getEndTime());
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/SegmentDurationEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/SegmentDurationEsUIDAO.java
index e12f1e43b..7e65b4721 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/SegmentDurationEsUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/dao/ui/SegmentDurationEsUIDAO.java
@@ -18,7 +18,8 @@
 
 package org.apache.skywalking.apm.collector.storage.es.dao.ui;
 
-import java.util.List;
+import com.google.gson.Gson;
+import java.util.*;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.util.*;
 import org.apache.skywalking.apm.collector.storage.dao.ui.ISegmentDurationUIDAO;
@@ -35,6 +36,8 @@
  */
 public class SegmentDurationEsUIDAO extends EsDAO implements ISegmentDurationUIDAO {
 
+    private final Gson gson = new Gson();
+
     public SegmentDurationEsUIDAO(ElasticSearchClient client) {
         super(client);
     }
@@ -102,7 +105,12 @@ public TraceBrief loadTop(long startSecondTimeBucket, long endSecondTimeBucket,
 
             basicTrace.setSegmentId((String)searchHit.getSource().get(SegmentDurationTable.SEGMENT_ID.getName()));
             basicTrace.setStart(((Number)searchHit.getSource().get(SegmentDurationTable.START_TIME.getName())).longValue());
-            basicTrace.setOperationName((String)searchHit.getSource().get(SegmentDurationTable.SERVICE_NAME.getName()));
+
+            String serviceNameJsonStr = (String)searchHit.getSource().get(SegmentDurationTable.SERVICE_NAME.getName());
+            if (StringUtils.isNotEmpty(serviceNameJsonStr)) {
+                List serviceNames = gson.fromJson(serviceNameJsonStr, LinkedList.class);
+                basicTrace.getOperationName().addAll(serviceNames);
+            }
             basicTrace.setDuration(((Number)searchHit.getSource().get(SegmentDurationTable.DURATION.getName())).intValue());
             basicTrace.setError(BooleanUtils.valueToBoolean(((Number)searchHit.getSource().get(SegmentDurationTable.IS_ERROR.getName())).intValue()));
             traceBrief.getTraces().add(basicTrace);
diff --git a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/ui/SegmentDurationH2UIDAO.java b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/ui/SegmentDurationH2UIDAO.java
index 5f154e831..2ffddc7b0 100644
--- a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/ui/SegmentDurationH2UIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/dao/ui/SegmentDurationH2UIDAO.java
@@ -128,7 +128,7 @@ public TraceBrief loadTop(long startSecondTimeBucket, long endSecondTimeBucket,
                 basicTrace.setSegmentId(rs.getString(SegmentDurationTable.SEGMENT_ID.getName()));
                 basicTrace.setDuration(rs.getInt(SegmentDurationTable.DURATION.getName()));
                 basicTrace.setStart(rs.getLong(SegmentDurationTable.START_TIME.getName()));
-                basicTrace.setOperationName(rs.getString(SegmentDurationTable.SERVICE_NAME.getName()));
+//                basicTrace.setOperationName(rs.getString(SegmentDurationTable.SERVICE_NAME.getName()));
                 basicTrace.setError(BooleanUtils.valueToBoolean(rs.getInt(SegmentDurationTable.IS_ERROR.getName())));
                 traceBrief.getTraces().add(basicTrace);
                 cnt++;
diff --git a/apm-collector/apm-collector-storage/collector-storage-shardingjdbc-provider/src/main/java/org/apache/skywalking/apm/collector/storage/shardingjdbc/dao/ui/SegmentDurationShardingjdbcUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-shardingjdbc-provider/src/main/java/org/apache/skywalking/apm/collector/storage/shardingjdbc/dao/ui/SegmentDurationShardingjdbcUIDAO.java
index 5510caa0f..eac02da75 100644
--- a/apm-collector/apm-collector-storage/collector-storage-shardingjdbc-provider/src/main/java/org/apache/skywalking/apm/collector/storage/shardingjdbc/dao/ui/SegmentDurationShardingjdbcUIDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-shardingjdbc-provider/src/main/java/org/apache/skywalking/apm/collector/storage/shardingjdbc/dao/ui/SegmentDurationShardingjdbcUIDAO.java
@@ -141,7 +141,8 @@ public TraceBrief loadTop(long startSecondTimeBucket, long endSecondTimeBucket,
                 basicTrace.setSegmentId(rs.getString(SegmentDurationTable.SEGMENT_ID.getName()));
                 basicTrace.setDuration(rs.getInt(SegmentDurationTable.DURATION.getName()));
                 basicTrace.setStart(rs.getLong(SegmentDurationTable.START_TIME.getName()));
-                basicTrace.setOperationName(rs.getString(SegmentDurationTable.SERVICE_NAME.getName()));
+                //TODO linjiaqi operation name was changed to contains multiple values
+                //basicTrace.setOperationName(rs.getString(SegmentDurationTable.SERVICE_NAME.getName()));
                 basicTrace.setError(BooleanUtils.valueToBoolean(rs.getInt(SegmentDurationTable.IS_ERROR.getName())));
                 traceBrief.getTraces().add(basicTrace);
                 cnt++;
diff --git a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/test/java/org/apache/skywalking/apm/collector/ui/service/SegmentTopServiceTest.java b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/test/java/org/apache/skywalking/apm/collector/ui/service/SegmentTopServiceTest.java
index d7274652b..c9d3f6223 100644
--- a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/test/java/org/apache/skywalking/apm/collector/ui/service/SegmentTopServiceTest.java
+++ b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/test/java/org/apache/skywalking/apm/collector/ui/service/SegmentTopServiceTest.java
@@ -83,7 +83,7 @@ private TraceBrief getTrace() {
         BasicTrace basicTrace = new BasicTrace();
         basicTrace.setDuration(12);
         basicTrace.setError(false);
-        basicTrace.setOperationName("test");
+        basicTrace.getOperationName().add("test");
         basicTrace.setSegmentId("segmentId");
         basicTrace.setStart(System.currentTimeMillis());
         basicTrace.setTraceIds(Collections.singletonList("traceId"));
diff --git a/apm-protocol/apm-ui-protocol/src/main/resources/ui-graphql/trace.graphqls b/apm-protocol/apm-ui-protocol/src/main/resources/ui-graphql/trace.graphqls
index e6f336b28..9bb30beea 100644
--- a/apm-protocol/apm-ui-protocol/src/main/resources/ui-graphql/trace.graphqls
+++ b/apm-protocol/apm-ui-protocol/src/main/resources/ui-graphql/trace.graphqls
@@ -23,7 +23,7 @@ type TraceBrief {
 # Trace basic info
 type BasicTrace {
    segmentId: String!
-   operationName: String!
+   operationNames: [String!]!
    duration: Int!
    start: String!
    isError: Boolean


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services