You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ka...@apache.org on 2017/07/07 08:38:52 UTC

[50/50] [abbrv] kylin git commit: KYLIN-2606 Only return counter for precise count_distinct if query is exactAggregate

KYLIN-2606 Only return counter for precise count_distinct if query is exactAggregate


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1f40225d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1f40225d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1f40225d

Branch: refs/heads/KYLIN-2606
Commit: 1f40225d1a80ece1babf107b3ef5cc7de8baacb7
Parents: d509542
Author: kangkaisen <ka...@163.com>
Authored: Wed Feb 15 19:53:17 2017 +0800
Committer: 康凯森 <ka...@meituan.com>
Committed: Fri Jul 7 13:47:12 2017 +0800

----------------------------------------------------------------------
 .../kylin/cube/gridtable/CubeCodeSystem.java    |   4 +
 .../org/apache/kylin/gridtable/GTRecord.java    |   8 +
 .../kylin/gridtable/GTSampleCodeSystem.java     |   4 +
 .../apache/kylin/gridtable/IGTCodeSystem.java   |   3 +
 .../measure/bitmap/BitmapCounterFactory.java    |   2 +
 .../kylin/measure/bitmap/BitmapSerializer.java  |  37 ++++-
 .../measure/bitmap/RoaringBitmapCounter.java    |  10 ++
 .../bitmap/RoaringBitmapCounterFactory.java     |   5 +
 .../metadata/datatype/DataTypeSerializer.java   |  13 ++
 .../gtrecord/GTCubeStorageQueryBase.java        |  53 ++++++-
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     |   1 +
 .../storage/hbase/cube/v2/CubeHBaseScanRPC.java |   2 +-
 .../hbase/cube/v2/HBaseReadonlyStore.java       |  33 +++-
 .../coprocessor/endpoint/CubeVisitService.java  |   2 +-
 .../endpoint/generated/CubeVisitProtos.java     | 151 +++++++++++++++----
 .../endpoint/protobuf/CubeVisit.proto           |   1 +
 16 files changed, 291 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
index aaa12a7..9eae6f3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
@@ -177,4 +177,8 @@ public class CubeCodeSystem implements IGTCodeSystem {
         return result;
     }
 
+    @Override
+    public DataTypeSerializer<?> getSerializer(int col) {
+        return serializers[col];
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index 3e62ea7..f65e4b5 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -296,4 +296,12 @@ public class GTRecord implements Comparable<GTRecord>, Cloneable {
         }
     }
 
+    /** change pointers to point to data in given buffer, this
+     *  method allows to defined specific column to load */
+    public void loadColumns(int selectedCol, ByteBuffer buf) {
+        int pos = buf.position();
+        int len = info.codeSystem.codeLength(selectedCol, buf);
+        cols[selectedCol].set(buf.array(), buf.arrayOffset() + pos, len);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
index 3f3c844..2a5e791 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
@@ -118,4 +118,8 @@ public class GTSampleCodeSystem implements IGTCodeSystem {
         }
     };
 
+    @Override
+    public DataTypeSerializer<?> getSerializer(int col) {
+        return serializers[col];
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java
index 89dfc99..9c8ad6b 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.dimension.DimensionEncoding;
 import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
 
 public interface IGTCodeSystem {
 
@@ -62,4 +63,6 @@ public interface IGTCodeSystem {
     /** Return aggregators for metrics */
     MeasureAggregator<?>[] newMetricsAggregators(ImmutableBitSet columns, String[] aggrFunctions);
 
+    /** Return specific DataTypeSerializer */
+    DataTypeSerializer<?> getSerializer(int col);
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounterFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounterFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounterFactory.java
index da7748e..39aa1be 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounterFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounterFactory.java
@@ -26,5 +26,7 @@ public interface BitmapCounterFactory {
 
     BitmapCounter newBitmap(int... values);
 
+    BitmapCounter newBitmap(long counter);
+
     BitmapCounter newBitmap(ByteBuffer in) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
index c1b260d..e2ec4cc 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java
@@ -28,6 +28,9 @@ public class BitmapSerializer extends DataTypeSerializer<BitmapCounter> {
     private static final BitmapCounterFactory factory = RoaringBitmapCounterFactory.INSTANCE;
     private static final BitmapCounter DELEGATE = factory.newBitmap();
 
+    private static final int IS_RESULT_FLAG = 1;
+    private static final int RESULT_SIZE = 12;
+
     // called by reflection
     public BitmapSerializer(DataType type) {
     }
@@ -44,8 +47,12 @@ public class BitmapSerializer extends DataTypeSerializer<BitmapCounter> {
     @Override
     public BitmapCounter deserialize(ByteBuffer in) {
         try {
-            return factory.newBitmap(in);
-
+            if (peekLength(in) == RESULT_SIZE) {
+                int flag = in.getInt();
+                return factory.newBitmap(in.getLong());
+            } else {
+                return factory.newBitmap(in);
+            }
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
@@ -53,7 +60,12 @@ public class BitmapSerializer extends DataTypeSerializer<BitmapCounter> {
 
     @Override
     public int peekLength(ByteBuffer in) {
-        return DELEGATE.peekLength(in);
+        ByteBuffer buffer = in.slice();
+        if (buffer.getInt(0) == IS_RESULT_FLAG) {
+            return RESULT_SIZE;
+        } else {
+            return DELEGATE.peekLength(in);
+        }
     }
 
     @Override
@@ -71,4 +83,23 @@ public class BitmapSerializer extends DataTypeSerializer<BitmapCounter> {
         // It's difficult to decide the size before data was ingested, comparing with HLLCounter(16) as 64KB, here is assumption
         return 8 * 1024;
     }
+
+    @Override
+    public boolean supportDirectReturnResult() {
+        return true;
+    }
+
+    @Override
+    public ByteBuffer getFinalResult(ByteBuffer in) {
+        ByteBuffer out = ByteBuffer.allocate(RESULT_SIZE);
+        try {
+            BitmapCounter counter = factory.newBitmap(in);
+            out.putInt(IS_RESULT_FLAG);
+            out.putLong(counter.getCount());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        out.flip();
+        return out;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java
index eec45f2..9929e24 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java
@@ -35,6 +35,7 @@ import java.util.Iterator;
 public class RoaringBitmapCounter implements BitmapCounter, Serializable {
 
     private ImmutableRoaringBitmap bitmap;
+    private Long counter;
 
     RoaringBitmapCounter() {
         bitmap = new MutableRoaringBitmap();
@@ -44,6 +45,11 @@ public class RoaringBitmapCounter implements BitmapCounter, Serializable {
         this.bitmap = bitmap;
     }
 
+    RoaringBitmapCounter(long counter) {
+        this.counter = counter;
+    }
+
+
     private MutableRoaringBitmap getMutableBitmap() {
         if (bitmap instanceof MutableRoaringBitmap) {
             return (MutableRoaringBitmap) bitmap;
@@ -86,6 +92,10 @@ public class RoaringBitmapCounter implements BitmapCounter, Serializable {
 
     @Override
     public long getCount() {
+        if (counter != null) {
+            return counter;
+        }
+
         return bitmap.getCardinality();
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java
index 822afa2..8ab908a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java
@@ -40,6 +40,11 @@ public class RoaringBitmapCounterFactory implements BitmapCounterFactory, Serial
     }
 
     @Override
+    public BitmapCounter newBitmap(long counter) {
+        return new RoaringBitmapCounter(counter);
+    }
+
+    @Override
     public BitmapCounter newBitmap(ByteBuffer in) throws IOException {
         RoaringBitmapCounter counter = new RoaringBitmapCounter();
         counter.readFields(in);

http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
index a4a35a4..2de38c0 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
@@ -89,6 +89,19 @@ abstract public class DataTypeSerializer<T> implements BytesSerializer<T>, java.
         throw new UnsupportedOperationException();
     }
 
+    /** If the query is exactAggregation and has some memory hungry measures,
+     * we could directly return final result to speed up the query.
+     * If the DataTypeSerializer support this,
+     * which should override the getFinalResult method, besides that, the deserialize and peekLength method should also support it, like {@link org.apache.kylin.measure.bitmap.BitmapSerializer} */
+    public boolean supportDirectReturnResult() {
+        return false;
+    }
+
+    /** An optional method that converts a expensive buffer to lightweight buffer containing final result (for memory hungry measures) */
+    public ByteBuffer getFinalResult(ByteBuffer in) {
+        throw new UnsupportedOperationException();
+    }
+
     /** Convert from obj to string */
     public String toString(T value) {
         if (value == null)

http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index f89fc47..a21446a 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -44,6 +44,7 @@ import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.PartitionDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.Segments;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -135,6 +136,10 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
         context.setNeedStorageAggregation(isNeedStorageAggregation(cuboid, groupsD, singleValuesD));
 
+        // exactAggregation mean: needn't aggregation at storage and query engine both.
+        boolean exactAggregation = isExactAggregation(context, cuboid, groups, otherDimsD, singleValuesD, derivedPostAggregation, sqlDigest.aggregations);
+        context.setExactAggregation(exactAggregation);
+
         // replace derived columns in filter with host columns; columns on loosened condition must be added to group by
         Set<TblColRef> loosenedColumnD = Sets.newHashSet();
         Set<TblColRef> filterColumnD = Sets.newHashSet();
@@ -428,7 +433,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
             sublist.get(0).getFunction().getMeasureType().adjustSqlDigest(sublist, sqlDigest);
         }
     }
-
+    
     private TupleFilter checkHavingCanPushDown(TupleFilter havingFilter, Set<TblColRef> groupsD, List<FunctionDesc> aggregations, Set<FunctionDesc> metrics) {
         // must have only one segment
         Segments<CubeSegment> readySegs = cubeInstance.getSegments(SegmentStatusEnum.READY);
@@ -465,4 +470,50 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         return havingFilter;
     }
 
+    private boolean isExactAggregation(StorageContext context, Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation, Collection<FunctionDesc> functionDescs) {
+        if (context.isNeedStorageAggregation()) {
+            logger.info("exactAggregation is false because need storage aggregation");
+            return false;
+        }
+
+        if (cuboid.requirePostAggregation()) {
+            logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId());
+            return false;
+        }
+
+        // derived aggregation is bad, unless expanded columns are already in group by
+        if (groups.containsAll(derivedPostAggregation) == false) {
+            logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation);
+            return false;
+        }
+
+        // other columns (from filter) is bad, unless they are ensured to have single value
+        if (singleValuesD.containsAll(othersD) == false) {
+            logger.info("exactAggregation is false because some column not on group by: " + othersD //
+                    + " (single value column: " + singleValuesD + ")");
+            return false;
+        }
+
+        //for DimensionAsMetric like max(cal_dt), the dimension column maybe not in real group by
+        for (FunctionDesc functionDesc : functionDescs) {
+            if (functionDesc.isDimensionAsMetric()) {
+                logger.info("exactAggregation is false because has DimensionAsMetric");
+                return false;
+            }
+        }
+
+        // for partitioned cube, the partition column must belong to group by or has single value
+        PartitionDesc partDesc = cuboid.getCubeDesc().getModel().getPartitionDesc();
+        if (partDesc.isPartitioned()) {
+            TblColRef col = partDesc.getPartitionDateColumnRef();
+            if (!groups.contains(col) && !singleValuesD.contains(col)) {
+                logger.info("exactAggregation is false because cube is partitioned and " + col + " is not on group by");
+                return false;
+            }
+        }
+
+        logger.info("exactAggregation is true, cuboid id is " + cuboid.getId());
+        return true;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index e822ada..af8754d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -163,6 +163,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         }
         builder.setSpillEnabled(cubeSeg.getConfig().getQueryCoprocessorSpillEnabled());
         builder.setMaxScanBytes(cubeSeg.getConfig().getPartitionMaxScanBytes());
+        builder.setIsExactAggregate(storageContext.isExactAggregation());
 
         for (final Pair<byte[], byte[]> epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) {
             executorService.submit(new Runnable() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
index 951e2ef..a8f4fd8 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -193,7 +193,7 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
             }
         };
 
-        IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, rawScans.get(0).hbaseColumns, hbaseColumnsToGT, cubeSeg.getRowKeyPreambleSize(), false);
+        IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, rawScans.get(0).hbaseColumns, hbaseColumnsToGT, cubeSeg.getRowKeyPreambleSize(), false, storageContext.isExactAggregation());
         IGTScanner rawScanner = store.scan(scanRequest);
 
         final IGTScanner decorateScanner = scanRequest.decorateScanner(rawScanner);

http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
index 631e8e8..4ec0c9d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
@@ -20,6 +20,7 @@ package org.apache.kylin.storage.hbase.cube.v2;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
@@ -44,18 +45,19 @@ public class HBaseReadonlyStore implements IGTStore {
     private List<List<Integer>> hbaseColumnsToGT;
     private int rowkeyPreambleSize;
     private boolean withDelay = false;
-
+    private boolean isExactAggregation;
 
     /**
      * @param withDelay is for test use
      */
-    public HBaseReadonlyStore(CellListIterator cellListIterator, GTScanRequest gtScanRequest, List<Pair<byte[], byte[]>> hbaseColumns, List<List<Integer>> hbaseColumnsToGT, int rowkeyPreambleSize, boolean withDelay) {
+    public HBaseReadonlyStore(CellListIterator cellListIterator, GTScanRequest gtScanRequest, List<Pair<byte[], byte[]>> hbaseColumns, List<List<Integer>> hbaseColumnsToGT, int rowkeyPreambleSize, boolean withDelay, boolean isExactAggregation) {
         this.cellListIterator = cellListIterator;
         this.info = gtScanRequest.getInfo();
         this.hbaseColumns = hbaseColumns;
         this.hbaseColumnsToGT = hbaseColumnsToGT;
         this.rowkeyPreambleSize = rowkeyPreambleSize;
         this.withDelay = withDelay;
+        this.isExactAggregation = isExactAggregation;
     }
 
     @Override
@@ -132,6 +134,12 @@ public class HBaseReadonlyStore implements IGTStore {
                             buf = byteBuffer(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                             oneRecord.loadColumns(hbaseColumnsToGT.get(i), buf);
                         }
+
+
+                        if (isExactAggregation && getDirectReturnResultColumns().size() > 0) {
+                            trimGTRecord(oneRecord);
+                        }
+
                         return oneRecord;
 
                     }
@@ -145,6 +153,27 @@ public class HBaseReadonlyStore implements IGTStore {
                         return ByteBuffer.wrap(array, offset, length);
                     }
 
+                    private List<Integer> getDirectReturnResultColumns() {
+                        List<Integer> columns = new ArrayList<>();
+                        for (int i = 0; i < info.getColumnCount(); i++) {
+                            if (info.getCodeSystem().getSerializer(i).supportDirectReturnResult()) {
+                                columns.add(i);
+                            }
+                        }
+                        return columns;
+                    }
+
+                    private void trimGTRecord(GTRecord record) {
+                        List<Integer> directReturnResultColumns = getDirectReturnResultColumns();
+                        for (Integer i : directReturnResultColumns) {
+                            ByteBuffer recordBuffer = record.get(i).asBuffer();
+                            if (recordBuffer!= null) {
+                                ByteBuffer trimmedBuffer = info.getCodeSystem().getSerializer(i).getFinalResult(recordBuffer);
+                                record.loadColumns(i, trimmedBuffer);
+                            }
+                        }
+                    }
+
                 };
             }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 3b26b71..bb6555c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -295,7 +295,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
                     !request.hasMaxScanBytes() ? Long.MAX_VALUE : request.getMaxScanBytes(), // for new client
                     scanReq.getTimeout());
 
-            IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize(), behavior.delayToggledOn());
+            IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize(), behavior.delayToggledOn(), request.getIsExactAggregate());
 
             IGTScanner rawScanner = store.scan(scanReq);
             IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, behavior.filterToggledOn(), behavior.aggrToggledOn(), false, request.getSpillEnabled());

http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
index b9f2771..4c662c9 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/generated/CubeVisitProtos.java
@@ -123,7 +123,7 @@ public final class CubeVisitProtos {
      * <code>optional int64 maxScanBytes = 8;</code>
      *
      * <pre>
-     * 0 means no limit
+     * must be positive
      * </pre>
      */
     boolean hasMaxScanBytes();
@@ -131,10 +131,20 @@ public final class CubeVisitProtos {
      * <code>optional int64 maxScanBytes = 8;</code>
      *
      * <pre>
-     * 0 means no limit
+     * must be positive
      * </pre>
      */
     long getMaxScanBytes();
+
+    // optional bool isExactAggregate = 9 [default = false];
+    /**
+     * <code>optional bool isExactAggregate = 9 [default = false];</code>
+     */
+    boolean hasIsExactAggregate();
+    /**
+     * <code>optional bool isExactAggregate = 9 [default = false];</code>
+     */
+    boolean getIsExactAggregate();
   }
   /**
    * Protobuf type {@code CubeVisitRequest}
@@ -230,6 +240,11 @@ public final class CubeVisitProtos {
               maxScanBytes_ = input.readInt64();
               break;
             }
+            case 72: {
+              bitField0_ |= 0x00000080;
+              isExactAggregate_ = input.readBool();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -982,7 +997,7 @@ public final class CubeVisitProtos {
      * <code>optional int64 maxScanBytes = 8;</code>
      *
      * <pre>
-     * 0 means no limit
+     * must be positive
      * </pre>
      */
     public boolean hasMaxScanBytes() {
@@ -992,13 +1007,29 @@ public final class CubeVisitProtos {
      * <code>optional int64 maxScanBytes = 8;</code>
      *
      * <pre>
-     * 0 means no limit
+     * must be positive
      * </pre>
      */
     public long getMaxScanBytes() {
       return maxScanBytes_;
     }
 
+    // optional bool isExactAggregate = 9 [default = false];
+    public static final int ISEXACTAGGREGATE_FIELD_NUMBER = 9;
+    private boolean isExactAggregate_;
+    /**
+     * <code>optional bool isExactAggregate = 9 [default = false];</code>
+     */
+    public boolean hasIsExactAggregate() {
+      return ((bitField0_ & 0x00000080) == 0x00000080);
+    }
+    /**
+     * <code>optional bool isExactAggregate = 9 [default = false];</code>
+     */
+    public boolean getIsExactAggregate() {
+      return isExactAggregate_;
+    }
+
     private void initFields() {
       gtScanRequest_ = com.google.protobuf.ByteString.EMPTY;
       hbaseRawScan_ = com.google.protobuf.ByteString.EMPTY;
@@ -1008,6 +1039,7 @@ public final class CubeVisitProtos {
       queryId_ = "";
       spillEnabled_ = true;
       maxScanBytes_ = 0L;
+      isExactAggregate_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -1061,6 +1093,9 @@ public final class CubeVisitProtos {
       if (((bitField0_ & 0x00000040) == 0x00000040)) {
         output.writeInt64(8, maxScanBytes_);
       }
+      if (((bitField0_ & 0x00000080) == 0x00000080)) {
+        output.writeBool(9, isExactAggregate_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -1102,6 +1137,10 @@ public final class CubeVisitProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeInt64Size(8, maxScanBytes_);
       }
+      if (((bitField0_ & 0x00000080) == 0x00000080)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(9, isExactAggregate_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1162,6 +1201,11 @@ public final class CubeVisitProtos {
         result = result && (getMaxScanBytes()
             == other.getMaxScanBytes());
       }
+      result = result && (hasIsExactAggregate() == other.hasIsExactAggregate());
+      if (hasIsExactAggregate()) {
+        result = result && (getIsExactAggregate()
+            == other.getIsExactAggregate());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -1207,6 +1251,10 @@ public final class CubeVisitProtos {
         hash = (37 * hash) + MAXSCANBYTES_FIELD_NUMBER;
         hash = (53 * hash) + hashLong(getMaxScanBytes());
       }
+      if (hasIsExactAggregate()) {
+        hash = (37 * hash) + ISEXACTAGGREGATE_FIELD_NUMBER;
+        hash = (53 * hash) + hashBoolean(getIsExactAggregate());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -1337,6 +1385,8 @@ public final class CubeVisitProtos {
         bitField0_ = (bitField0_ & ~0x00000040);
         maxScanBytes_ = 0L;
         bitField0_ = (bitField0_ & ~0x00000080);
+        isExactAggregate_ = false;
+        bitField0_ = (bitField0_ & ~0x00000100);
         return this;
       }
 
@@ -1402,6 +1452,10 @@ public final class CubeVisitProtos {
           to_bitField0_ |= 0x00000040;
         }
         result.maxScanBytes_ = maxScanBytes_;
+        if (((from_bitField0_ & 0x00000100) == 0x00000100)) {
+          to_bitField0_ |= 0x00000080;
+        }
+        result.isExactAggregate_ = isExactAggregate_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1469,6 +1523,9 @@ public final class CubeVisitProtos {
         if (other.hasMaxScanBytes()) {
           setMaxScanBytes(other.getMaxScanBytes());
         }
+        if (other.hasIsExactAggregate()) {
+          setIsExactAggregate(other.getIsExactAggregate());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -2068,7 +2125,7 @@ public final class CubeVisitProtos {
        * <code>optional int64 maxScanBytes = 8;</code>
        *
        * <pre>
-       * 0 means no limit
+       * must be positive
        * </pre>
        */
       public boolean hasMaxScanBytes() {
@@ -2078,7 +2135,7 @@ public final class CubeVisitProtos {
        * <code>optional int64 maxScanBytes = 8;</code>
        *
        * <pre>
-       * 0 means no limit
+       * must be positive
        * </pre>
        */
       public long getMaxScanBytes() {
@@ -2088,7 +2145,7 @@ public final class CubeVisitProtos {
        * <code>optional int64 maxScanBytes = 8;</code>
        *
        * <pre>
-       * 0 means no limit
+       * must be positive
        * </pre>
        */
       public Builder setMaxScanBytes(long value) {
@@ -2101,7 +2158,7 @@ public final class CubeVisitProtos {
        * <code>optional int64 maxScanBytes = 8;</code>
        *
        * <pre>
-       * 0 means no limit
+       * must be positive
        * </pre>
        */
       public Builder clearMaxScanBytes() {
@@ -2111,6 +2168,39 @@ public final class CubeVisitProtos {
         return this;
       }
 
+      // optional bool isExactAggregate = 9 [default = false];
+      private boolean isExactAggregate_ ;
+      /**
+       * <code>optional bool isExactAggregate = 9 [default = false];</code>
+       */
+      public boolean hasIsExactAggregate() {
+        return ((bitField0_ & 0x00000100) == 0x00000100);
+      }
+      /**
+       * <code>optional bool isExactAggregate = 9 [default = false];</code>
+       */
+      public boolean getIsExactAggregate() {
+        return isExactAggregate_;
+      }
+      /**
+       * <code>optional bool isExactAggregate = 9 [default = false];</code>
+       */
+      public Builder setIsExactAggregate(boolean value) {
+        bitField0_ |= 0x00000100;
+        isExactAggregate_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bool isExactAggregate = 9 [default = false];</code>
+       */
+      public Builder clearIsExactAggregate() {
+        bitField0_ = (bitField0_ & ~0x00000100);
+        isExactAggregate_ = false;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:CubeVisitRequest)
     }
 
@@ -5516,32 +5606,33 @@ public final class CubeVisitProtos {
     java.lang.String[] descriptorData = {
       "\npstorage-hbase/src/main/java/org/apache" +
       "/kylin/storage/hbase/cube/v2/coprocessor" +
-      "/endpoint/protobuf/CubeVisit.proto\"\205\002\n\020C" +
+      "/endpoint/protobuf/CubeVisit.proto\"\246\002\n\020C" +
       "ubeVisitRequest\022\025\n\rgtScanRequest\030\001 \002(\014\022\024" +
       "\n\014hbaseRawScan\030\002 \002(\014\022\032\n\022rowkeyPreambleSi" +
       "ze\030\003 \002(\005\0223\n\020hbaseColumnsToGT\030\004 \003(\0132\031.Cub" +
       "eVisitRequest.IntList\022\027\n\017kylinProperties" +
       "\030\005 \002(\t\022\017\n\007queryId\030\006 \001(\t\022\032\n\014spillEnabled\030" +
-      "\007 \001(\010:\004true\022\024\n\014maxScanBytes\030\010 \001(\003\032\027\n\007Int" +
-      "List\022\014\n\004ints\030\001 \003(\005\"\253\004\n\021CubeVisitResponse",
-      "\022\026\n\016compressedRows\030\001 \002(\014\022\'\n\005stats\030\002 \002(\0132" +
-      "\030.CubeVisitResponse.Stats\022/\n\terrorInfo\030\003" +
-      " \001(\0132\034.CubeVisitResponse.ErrorInfo\032\220\002\n\005S" +
-      "tats\022\030\n\020serviceStartTime\030\001 \001(\003\022\026\n\016servic" +
-      "eEndTime\030\002 \001(\003\022\027\n\017scannedRowCount\030\003 \001(\003\022" +
-      "\032\n\022aggregatedRowCount\030\004 \001(\003\022\025\n\rsystemCpu" +
-      "Load\030\005 \001(\001\022\036\n\026freePhysicalMemorySize\030\006 \001" +
-      "(\001\022\031\n\021freeSwapSpaceSize\030\007 \001(\001\022\020\n\010hostnam" +
-      "e\030\010 \001(\t\022\016\n\006etcMsg\030\t \001(\t\022\026\n\016normalComplet" +
-      "e\030\n \001(\005\022\024\n\014scannedBytes\030\013 \001(\003\032H\n\tErrorIn",
-      "fo\022*\n\004type\030\001 \002(\0162\034.CubeVisitResponse.Err" +
-      "orType\022\017\n\007message\030\002 \002(\t\"G\n\tErrorType\022\020\n\014" +
-      "UNKNOWN_TYPE\020\000\022\013\n\007TIMEOUT\020\001\022\033\n\027RESOURCE_" +
-      "LIMIT_EXCEEDED\020\0022F\n\020CubeVisitService\0222\n\t" +
-      "visitCube\022\021.CubeVisitRequest\032\022.CubeVisit" +
-      "ResponseB`\nEorg.apache.kylin.storage.hba" +
-      "se.cube.v2.coprocessor.endpoint.generate" +
-      "dB\017CubeVisitProtosH\001\210\001\001\240\001\001"
+      "\007 \001(\010:\004true\022\024\n\014maxScanBytes\030\010 \001(\003\022\037\n\020isE" +
+      "xactAggregate\030\t \001(\010:\005false\032\027\n\007IntList\022\014\n",
+      "\004ints\030\001 \003(\005\"\253\004\n\021CubeVisitResponse\022\026\n\016com" +
+      "pressedRows\030\001 \002(\014\022\'\n\005stats\030\002 \002(\0132\030.CubeV" +
+      "isitResponse.Stats\022/\n\terrorInfo\030\003 \001(\0132\034." +
+      "CubeVisitResponse.ErrorInfo\032\220\002\n\005Stats\022\030\n" +
+      "\020serviceStartTime\030\001 \001(\003\022\026\n\016serviceEndTim" +
+      "e\030\002 \001(\003\022\027\n\017scannedRowCount\030\003 \001(\003\022\032\n\022aggr" +
+      "egatedRowCount\030\004 \001(\003\022\025\n\rsystemCpuLoad\030\005 " +
+      "\001(\001\022\036\n\026freePhysicalMemorySize\030\006 \001(\001\022\031\n\021f" +
+      "reeSwapSpaceSize\030\007 \001(\001\022\020\n\010hostname\030\010 \001(\t" +
+      "\022\016\n\006etcMsg\030\t \001(\t\022\026\n\016normalComplete\030\n \001(\005",
+      "\022\024\n\014scannedBytes\030\013 \001(\003\032H\n\tErrorInfo\022*\n\004t" +
+      "ype\030\001 \002(\0162\034.CubeVisitResponse.ErrorType\022" +
+      "\017\n\007message\030\002 \002(\t\"G\n\tErrorType\022\020\n\014UNKNOWN" +
+      "_TYPE\020\000\022\013\n\007TIMEOUT\020\001\022\033\n\027RESOURCE_LIMIT_E" +
+      "XCEEDED\020\0022F\n\020CubeVisitService\0222\n\tvisitCu" +
+      "be\022\021.CubeVisitRequest\032\022.CubeVisitRespons" +
+      "eB`\nEorg.apache.kylin.storage.hbase.cube" +
+      ".v2.coprocessor.endpoint.generatedB\017Cube" +
+      "VisitProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -5553,7 +5644,7 @@ public final class CubeVisitProtos {
           internal_static_CubeVisitRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_CubeVisitRequest_descriptor,
-              new java.lang.String[] { "GtScanRequest", "HbaseRawScan", "RowkeyPreambleSize", "HbaseColumnsToGT", "KylinProperties", "QueryId", "SpillEnabled", "MaxScanBytes", });
+              new java.lang.String[] { "GtScanRequest", "HbaseRawScan", "RowkeyPreambleSize", "HbaseColumnsToGT", "KylinProperties", "QueryId", "SpillEnabled", "MaxScanBytes", "IsExactAggregate", });
           internal_static_CubeVisitRequest_IntList_descriptor =
             internal_static_CubeVisitRequest_descriptor.getNestedTypes().get(0);
           internal_static_CubeVisitRequest_IntList_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/kylin/blob/1f40225d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
index aa83595..8ca8756 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/protobuf/CubeVisit.proto
@@ -38,6 +38,7 @@ message CubeVisitRequest {
     optional string queryId = 6;
     optional bool spillEnabled = 7 [default = true];
     optional int64 maxScanBytes = 8; // must be positive
+    optional bool isExactAggregate = 9 [default = false];
     message IntList {
         repeated int32 ints = 1;
     }