You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ga...@apache.org on 2017/03/16 03:01:41 UTC

kylin git commit: KYLIN-2501 Stream Aggregate GTRecords at Query Server

Repository: kylin
Updated Branches:
  refs/heads/KYLIN-2501 [created] 10fc77660


KYLIN-2501 Stream Aggregate GTRecords at Query Server


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/10fc7766
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/10fc7766
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/10fc7766

Branch: refs/heads/KYLIN-2501
Commit: 10fc776604d199893d1ed42a1172ad258b560978
Parents: 5155994
Author: gaodayue <ga...@meituan.com>
Authored: Wed Mar 15 22:45:02 2017 +0800
Committer: gaodayue <ga...@meituan.com>
Committed: Wed Mar 15 22:45:02 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |   4 +
 .../kylin/common/util/ImmutableBitSet.java      |  29 ++-
 .../org/apache/kylin/GTForwardingScanner.java   |  56 +++++
 .../kylin/cube/gridtable/CubeGridTable.java     |  18 --
 .../gridtable/CuboidToGridTableMapping.java     |  18 ++
 .../cube/inmemcubing/InMemCubeBuilder.java      |   6 +-
 .../kylin/gridtable/GTAggregateScanner.java     |  16 +-
 .../apache/kylin/gridtable/GTFilterScanner.java |  22 +-
 .../org/apache/kylin/gridtable/GTRecord.java    |  80 +++----
 .../apache/kylin/gridtable/GTScanRequest.java   |  13 ++
 .../gridtable/GTStreamAggregateScanner.java     | 211 +++++++++++++++++++
 .../kylin/gridtable/GTScanReqSerDerTest.java    |   4 +-
 .../apache/kylin/storage/StorageContext.java    |  20 ++
 .../storage/gtrecord/CubeScanRangePlanner.java  |   3 +-
 .../storage/gtrecord/CubeSegmentScanner.java    |   7 +-
 .../storage/gtrecord/CubeTupleConverter.java    |  31 +--
 .../gtrecord/GTCubeStorageQueryBase.java        |  38 +++-
 .../kylin/storage/gtrecord/ITupleConverter.java |   3 +-
 .../gtrecord/PartitionResultIterator.java       |  59 ++++++
 .../storage/gtrecord/PartitionResultMerger.java | 100 +++++++++
 .../kylin/storage/gtrecord/ScannerWorker.java   |   5 +-
 .../gtrecord/SegmentCubeTupleIterator.java      |  72 ++++++-
 .../gtrecord/StorageResponseGTScatter.java      |  82 +++----
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     |   7 +-
 .../storage/hbase/cube/v2/CubeHBaseRPC.java     |   5 +-
 .../storage/hbase/cube/v2/CubeHBaseScanRPC.java |   5 +-
 26 files changed, 704 insertions(+), 210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/10fc7766/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 02349ad..9cd35c8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -802,6 +802,10 @@ abstract public class KylinConfigBase implements Serializable {
         return Boolean.valueOf(getOptional("kylin.query.skip-empty-segments", "true"));
     }
 
+    public boolean isStreamAggregateEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.query.stream-aggregate-enabled", "true"));
+    }
+
     @Deprecated //Limit is good even it's large. This config is meaning less since we already have scan threshold 
     public int getStoragePushDownLimitMax() {
         return Integer.parseInt(getOptional("kylin.query.max-limit-pushdown", "10000"));

http://git-wip-us.apache.org/repos/asf/kylin/blob/10fc7766/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java b/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
index b417877..5cdf08c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
@@ -19,8 +19,9 @@ package org.apache.kylin.common.util;
 
 import java.nio.ByteBuffer;
 import java.util.BitSet;
+import java.util.Iterator;
 
-public class ImmutableBitSet {
+public class ImmutableBitSet implements Iterable<Integer> {
 
     public static final ImmutableBitSet EMPTY = new ImmutableBitSet(new BitSet());
 
@@ -168,4 +169,30 @@ public class ImmutableBitSet {
             return new ImmutableBitSet(bitSet);
         }
     };
+
+    /**
+     * Iterate over the positions of true value.
+     * @return the iterator
+     */
+    @Override
+    public Iterator<Integer> iterator() {
+        return new Iterator<Integer>() {
+            int index = 0;
+
+            @Override
+            public boolean hasNext() {
+                return index < arr.length;
+            }
+
+            @Override
+            public Integer next() {
+                return arr[index++];
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/10fc7766/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java b/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java
new file mode 100644
index 0000000..de8c88d
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/GTForwardingScanner.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin;
+
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.IGTScanner;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A {@link IGTScanner} which forwards all its method calls to another scanner.
+ *
+ * @see <a href="http://en.wikipedia.org/wiki/Decorator_pattern">decorator pattern</a>.
+ */
+public class GTForwardingScanner implements IGTScanner {
+    protected IGTScanner delegated;
+
+    protected GTForwardingScanner(IGTScanner delegated) {
+        this.delegated = checkNotNull(delegated, "delegated");
+    }
+
+    @Override
+    public GTInfo getInfo() {
+        return delegated.getInfo();
+    }
+
+    @Override
+    public void close() throws IOException {
+        delegated.close();
+    }
+
+    @Override
+    public Iterator<GTRecord> iterator() {
+        return delegated.iterator();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/10fc7766/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
index 563cf43..5cee9df 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
@@ -18,29 +18,11 @@
 
 package org.apache.kylin.cube.gridtable;
 
-import java.util.Map;
-
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.CubeDimEncMap;
-import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.dimension.IDimensionEncodingMap;
 import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.metadata.model.TblColRef;
 
 public class CubeGridTable {
-
-    public static GTInfo newGTInfo(CubeSegment cubeSeg, long cuboidId) {
-        Cuboid cuboid = Cuboid.findById(cubeSeg.getCubeDesc(), cuboidId);
-        return newGTInfo(cuboid, new CubeDimEncMap(cubeSeg));
-    }
-
-    public static GTInfo newGTInfo(CubeDesc cubeDesc, long cuboidId, Map<TblColRef, Dictionary<String>> dictionaryMap) {
-        Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
-        return newGTInfo(cuboid, new CubeDimEncMap(cubeDesc, dictionaryMap));
-    }
-
     public static GTInfo newGTInfo(Cuboid cuboid, IDimensionEncodingMap dimEncMap) {
         CuboidToGridTableMapping mapping = new CuboidToGridTableMapping(cuboid);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/10fc7766/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
index 2e5dd12..6879687 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
@@ -140,11 +140,29 @@ public class CuboidToGridTableMapping {
         return i == null ? -1 : i.intValue();
     }
 
+    public int[] getDimIndexes(Collection<TblColRef> dims) {
+        int[] result = new int[dims.size()];
+        int i = 0;
+        for (TblColRef dim : dims) {
+            result[i++] = getIndexOf(dim);
+        }
+        return result;
+    }
+
     public int getIndexOf(FunctionDesc metric) {
         Integer r = metrics2gt.get(metric);
         return r == null ? -1 : r;
     }
 
+    public int[] getMetricsIndexes(Collection<FunctionDesc> metrics) {
+        int[] result = new int[metrics.size()];
+        int i = 0;
+        for (FunctionDesc metric : metrics) {
+            result[i++] = getIndexOf(metric);
+        }
+        return result;
+    }
+
     public List<TblColRef> getCuboidDimensionsInGTOrder() {
         return cuboid.getColumns();
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/10fc7766/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index e08844e..a26e948 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -38,6 +38,7 @@ import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.gridtable.CubeGridTable;
+import org.apache.kylin.cube.kv.CubeDimEncMap;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.gridtable.GTAggregateScanner;
 import org.apache.kylin.gridtable.GTBuilder;
@@ -108,7 +109,10 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     }
 
     private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
-        GTInfo info = CubeGridTable.newGTInfo(cubeDesc, cuboidID, dictionaryMap);
+        GTInfo info = CubeGridTable.newGTInfo(
+                Cuboid.findById(cubeDesc, cuboidID),
+                new CubeDimEncMap(cubeDesc, dictionaryMap)
+        );
 
         // Below several store implementation are very similar in performance. The ConcurrentDiskStore is the simplest.
         // MemDiskStore store = new MemDiskStore(info, memBudget == null ? MemoryBudgetController.ZERO_BUDGET : memBudget);

http://git-wip-us.apache.org/repos/asf/kylin/blob/10fc7766/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index 7cdd4f5..0dd6fa9 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -45,7 +45,6 @@ import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
 import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.MeasureAggregator;
 import org.apache.kylin.measure.MeasureAggregators;
-import org.apache.kylin.metadata.datatype.DataType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,6 +62,7 @@ public class GTAggregateScanner implements IGTScanner {
     final ImmutableBitSet metrics;
     final String[] metricsAggrFuncs;
     final IGTScanner inputScanner;
+    final BufferedMeasureCodec measureCodec;
     final AggregationCache aggrCache;
     final long spillThreshold; // 0 means no memory control && no spill
     final int storagePushDownLimit;//default to be Int.MAX
@@ -86,6 +86,7 @@ public class GTAggregateScanner implements IGTScanner {
         this.metrics = req.getAggrMetrics();
         this.metricsAggrFuncs = req.getAggrMetricsFuncs();
         this.inputScanner = inputScanner;
+        this.measureCodec = req.createMeasureCodec();
         this.aggrCache = new AggregationCache();
         this.spillThreshold = (long) (req.getAggCacheMemThreshold() * MemoryBudgetController.ONE_GB);
         this.aggrMask = new boolean[metricsAggrFuncs.length];
@@ -175,7 +176,6 @@ public class GTAggregateScanner implements IGTScanner {
         final int keyLength;
         final boolean[] compareMask;
         boolean compareAll = true;
-        final BufferedMeasureCodec measureCodec;
 
         final Comparator<byte[]> bytesComparator = new Comparator<byte[]>() {
             @Override
@@ -213,18 +213,6 @@ public class GTAggregateScanner implements IGTScanner {
             keyLength = compareMask.length;
             dumps = Lists.newArrayList();
             aggBufMap = createBuffMap();
-            measureCodec = createMeasureCodec();
-        }
-
-        private BufferedMeasureCodec createMeasureCodec() {
-            DataType[] types = new DataType[metrics.trueBitCount()];
-            for (int i = 0; i < types.length; i++) {
-                types[i] = info.getColumnType(metrics.trueBitAt(i));
-            }
-
-            BufferedMeasureCodec result = new BufferedMeasureCodec(types);
-            result.setBufferSize(info.getMaxColumnLength(metrics));
-            return result;
         }
 
         private boolean[] createCompareMask() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/10fc7766/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
index 717f89c..cad0a04 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.Set;
 
+import org.apache.kylin.GTForwardingScanner;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.ImmutableBitSet;
@@ -33,17 +34,16 @@ import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
 
-public class GTFilterScanner implements IGTScanner {
+public class GTFilterScanner extends GTForwardingScanner {
 
-    final private IGTScanner inputScanner;
     final private TupleFilter filter;
     final private IFilterCodeSystem<ByteArray> filterCodeSystem;
     final private IEvaluatableTuple oneTuple; // avoid instance creation
 
     private GTRecord next = null;
 
-    public GTFilterScanner(IGTScanner inputScanner, GTScanRequest req) throws IOException {
-        this.inputScanner = inputScanner;
+    public GTFilterScanner(IGTScanner delegated, GTScanRequest req) throws IOException {
+        super(delegated);
         this.filter = req.getFilterPushDown();
         this.filterCodeSystem = GTUtil.wrap(getInfo().codeSystem.getComparator());
         this.oneTuple = new IEvaluatableTuple() {
@@ -53,25 +53,15 @@ public class GTFilterScanner implements IGTScanner {
             }
         };
 
-        if (TupleFilter.isEvaluableRecursively(filter) == false)
+        if (!TupleFilter.isEvaluableRecursively(filter))
             throw new IllegalArgumentException();
     }
 
     @Override
-    public GTInfo getInfo() {
-        return inputScanner.getInfo();
-    }
-
-    @Override
-    public void close() throws IOException {
-        inputScanner.close();
-    }
-
-    @Override
     public Iterator<GTRecord> iterator() {
         return new Iterator<GTRecord>() {
 
-            private Iterator<GTRecord> inputIterator = inputScanner.iterator();
+            private Iterator<GTRecord> inputIterator = delegated.iterator();
             private FilterResultCache resultCache = new FilterResultCache(getInfo(), filter);
 
             @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/10fc7766/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index f4480c8..3397adc 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -21,7 +21,6 @@ package org.apache.kylin.gridtable;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Comparator;
-import java.util.List;
 
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
@@ -46,18 +45,21 @@ public class GTRecord implements Comparable<GTRecord>, Cloneable {
         }
         this.info = info;
     }
-
-    public GTRecord(GTRecord other) {
-        this.info = other.info;
-        this.cols = new ByteArray[info.getColumnCount()];
-        for (int i = 0; i < other.cols.length; i++) {
-            this.cols[i] = other.cols[i].copy();
+    
+    @Override
+    public GTRecord clone() { // deep copy
+        ByteArray[] cols = new ByteArray[this.cols.length];
+        for (int i = 0; i < cols.length; i++) {
+            cols[i] = this.cols[i].copy();
         }
+        return new GTRecord(this.info, cols);
     }
 
-    @Override
-    public Object clone() {
-        return new GTRecord(this);
+    public void shallowCopyFrom(GTRecord source) {
+        assert info == source.info;
+        for (int i = 0; i < cols.length; i++) {
+            cols[i].set(source.cols[i]);
+        }
     }
 
     public GTInfo getInfo() {
@@ -106,30 +108,18 @@ public class GTRecord implements Comparable<GTRecord>, Cloneable {
     /** decode and return the values of this record */
     public Object[] getValues(ImmutableBitSet selectedCols, Object[] result) {
         assert selectedCols.cardinality() == result.length;
-
         for (int i = 0; i < selectedCols.trueBitCount(); i++) {
-            int c = selectedCols.trueBitAt(i);
-            if (cols[c] == null || cols[c].array() == null) {
-                result[i] = null;
-            } else {
-                result[i] = info.codeSystem.decodeColumnValue(c, cols[c].asBuffer());
-            }
+            result[i] = decodeValue(selectedCols.trueBitAt(i));
         }
         return result;
     }
 
-    /** decode and return the values of this record */
-    public Object[] getValues(int[] selectedColumns, Object[] result) {
-        assert selectedColumns.length <= result.length;
-        for (int i = 0; i < selectedColumns.length; i++) {
-            int c = selectedColumns[i];
-            if (cols[c].array() == null) {
-                result[i] = null;
-            } else {
-                result[i] = info.codeSystem.decodeColumnValue(c, cols[c].asBuffer());
-            }
+    public Object decodeValue(int c) {
+        ByteArray col = cols[c];
+        if (col != null && col.array() != null) {
+            return info.codeSystem.decodeColumnValue(c, col.asBuffer());
         }
-        return result;
+        return null;
     }
 
     public int sizeOf(ImmutableBitSet selectedCols) {
@@ -198,19 +188,13 @@ public class GTRecord implements Comparable<GTRecord>, Cloneable {
         return compareToInternal(o, info.colAll);
     }
 
-    public int compareToOnPrimaryKey(GTRecord o) {
-        return compareToInternal(o, info.primaryKey);
-    }
-
-    public static Comparator<GTRecord> getPrimaryKeyComparator() {
+    public static Comparator<GTRecord> getComparator(final ImmutableBitSet participateCols) {
         return new Comparator<GTRecord>() {
-            @Override
             public int compare(GTRecord o1, GTRecord o2) {
                 if (o1 == null || o2 == null) {
                     throw new IllegalStateException("Cannot handle null");
                 }
-
-                return o1.compareToOnPrimaryKey(o2);
+                return o1.compareToInternal(o2, participateCols);
             }
         };
     }
@@ -287,26 +271,14 @@ public class GTRecord implements Comparable<GTRecord>, Cloneable {
         loadColumns(info.colBlocks[c], buf);
     }
 
-    /** change pointers to point to data in given buffer, UNLIKE deserialize */
-    public void loadColumns(ImmutableBitSet selectedCols, ByteBuffer buf) {
-        int pos = buf.position();
-        for (int i = 0; i < selectedCols.trueBitCount(); i++) {
-            int c = selectedCols.trueBitAt(i);
-            int len = info.codeSystem.codeLength(c, buf);
-            cols[c].set(buf.array(), buf.arrayOffset() + pos, len);
-            pos += len;
-            buf.position(pos);
-        }
-    }
-
-    /** change pointers to point to data in given buffer, UNLIKE deserialize
-     *  unlike loadColumns(ImmutableBitSet selectedCols, ByteBuffer buf), this
-     *  method allows to defined specific columns(in order) to load
+    /**
+     * Change pointers to point to data in given buffer, UNLIKE deserialize
+     * @param selectedCols positions of column to load
+     * @param buf buffer containing continuous data of selected columns
      */
-    public void loadColumns(List<Integer> selectedCols, ByteBuffer buf) {
+    public void loadColumns(Iterable<Integer> selectedCols, ByteBuffer buf) {
         int pos = buf.position();
-        for (int i = 0; i < selectedCols.size(); i++) {
-            int c = selectedCols.get(i);
+        for (int c : selectedCols) {
             int len = info.codeSystem.codeLength(c, buf);
             cols[c].set(buf.array(), buf.arrayOffset() + pos, len);
             pos += len;

http://git-wip-us.apache.org/repos/asf/kylin/blob/10fc7766/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index 4629c8e..ae35d2b 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -31,6 +31,8 @@ import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.SerializeToByteBuffer;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
@@ -202,6 +204,17 @@ public class GTScanRequest {
 
     }
 
+    public BufferedMeasureCodec createMeasureCodec() {
+        DataType[] metricTypes = new DataType[aggrMetrics.trueBitCount()];
+        for (int i = 0; i < metricTypes.length; i++) {
+            metricTypes[i] = info.getColumnType(aggrMetrics.trueBitAt(i));
+        }
+
+        BufferedMeasureCodec codec = new BufferedMeasureCodec(metricTypes);
+        codec.setBufferSize(info.getMaxColumnLength(aggrMetrics));
+        return codec;
+    }
+
     public boolean isDoingStorageAggregation() {
         return doingStorageAggregation;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/10fc7766/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
new file mode 100644
index 0000000..1fde423
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTStreamAggregateScanner.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import org.apache.kylin.GTForwardingScanner;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.MeasureAggregator;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * GTStreamAggregateScanner requires input records to be sorted on group fields.
+ * In such cases, it's superior to hash/sort based aggregator because it can produce
+ * ordered outputs on the fly and the memory consumption is very low.
+ */
+public class GTStreamAggregateScanner extends GTForwardingScanner {
+    private final GTScanRequest req;
+    private final Comparator<GTRecord> keyComparator;
+
+    public GTStreamAggregateScanner(IGTScanner delegated,
+            GTScanRequest req, Comparator<GTRecord> keyComparator) {
+        super(delegated);
+        this.req = req;
+        this.keyComparator = keyComparator;
+    }
+
+    @Override
+    public Iterator<GTRecord> iterator() {
+        return new StreamMergeGTRecordIterator(delegated.iterator());
+    }
+
+    public Iterator<Object[]> valuesIterator(int[] gtDimsIdx, int[] gtMetricsIdx) {
+        return new StreamMergeValuesIterator(delegated.iterator(), gtDimsIdx, gtMetricsIdx);
+    }
+
+    private abstract class AbstractStreamMergeIterator<E> implements Iterator<E> {
+        final PeekingIterator<GTRecord> input;
+        final IGTCodeSystem codeSystem;
+        final ImmutableBitSet dimensions;
+        final ImmutableBitSet metrics;
+        final String[] metricFuncs;
+        final BufferedMeasureCodec measureCodec;
+
+        private final GTRecord first; // reuse to avoid object creation
+
+        AbstractStreamMergeIterator(Iterator<GTRecord> input) {
+            this.input = Iterators.peekingIterator(input);
+            this.codeSystem = req.getInfo().getCodeSystem();
+            this.dimensions = req.getDimensions();
+            this.metrics = req.getAggrMetrics();
+            this.metricFuncs = req.getAggrMetricsFuncs();
+            this.measureCodec = req.createMeasureCodec();
+
+            this.first = new GTRecord(req.getInfo());
+        }
+
+        @Override
+        public boolean hasNext() {
+            return input.hasNext();
+        }
+
+        private boolean isSameKey(GTRecord o1, GTRecord o2) {
+            return keyComparator.compare(o1, o2) == 0;
+        }
+
+        private boolean shouldMergeNext(GTRecord current) {
+            return input.hasNext() && isSameKey(current, input.peek());
+        }
+
+        protected abstract E finalizeResult(GTRecord record);
+
+        protected abstract E finalizeResult(GTRecord record, Object[] aggStates);
+
+        @Override
+        public E next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            // WATCH OUT! record returned by "input" scanner could be changed later,
+            // so we must make a shallow copy of it.
+            first.shallowCopyFrom(input.next());
+
+            // shortcut to avoid extra deserialize/serialize cost
+            if (!shouldMergeNext(first)) {
+                return finalizeResult(first);
+            }
+            // merge records with the same key
+            MeasureAggregator[] aggrs = codeSystem.newMetricsAggregators(metrics, metricFuncs);
+            aggregate(aggrs, first);
+            aggregate(aggrs, input.next()); // no need to copy record because it's not referred to later
+            while (shouldMergeNext(first)) {
+                aggregate(aggrs, input.next());
+            }
+
+            Object[] aggStates = new Object[aggrs.length];
+            for (int i = 0; i < aggStates.length; i++) {
+                aggStates[i] = aggrs[i].getState();
+            }
+            return finalizeResult(first, aggStates);
+        }
+
+        @SuppressWarnings("unchecked")
+        protected void aggregate(MeasureAggregator[] aggregators, GTRecord record) {
+            for (int i = 0; i < aggregators.length; i++) {
+                int c = metrics.trueBitAt(i);
+                Object metric = codeSystem.decodeColumnValue(c, record.cols[c].asBuffer());
+                aggregators[i].aggregate(metric);
+            }
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("remove");
+        }
+    }
+
+    private class StreamMergeGTRecordIterator extends AbstractStreamMergeIterator<GTRecord> {
+
+        private GTRecord returnRecord; // avoid object creation
+
+        StreamMergeGTRecordIterator(Iterator<GTRecord> input) {
+            super(input);
+            this.returnRecord = new GTRecord(req.getInfo());
+        }
+
+        @Override
+        protected GTRecord finalizeResult(GTRecord record) {
+            return record;
+        }
+
+        @Override
+        protected GTRecord finalizeResult(GTRecord record, Object[] aggStates) {
+            // 1. load dimensions
+            for (int c : dimensions) {
+                returnRecord.cols[c] = record.cols[c];
+            }
+            // 2. serialize metrics
+            byte[] bytes = measureCodec.encode(aggStates).array();
+            int[] sizes = measureCodec.getMeasureSizes();
+            // 3. load metrics
+            int offset = 0;
+            for (int i = 0; i < metrics.trueBitCount(); i++) {
+                int c = metrics.trueBitAt(i);
+                returnRecord.cols[c].set(bytes, offset, sizes[i]);
+                offset += sizes[i];
+            }
+            return returnRecord;
+        }
+    }
+
+    private class StreamMergeValuesIterator extends AbstractStreamMergeIterator<Object[]> {
+
+        private int[] gtDimsIdx;
+        private int[] gtMetricsIdx;
+        private Object[] result; // avoid object creation
+
+        StreamMergeValuesIterator(Iterator<GTRecord> input, int[] gtDimsIdx, int[] gtMetricsIdx) {
+            super(input);
+            this.gtDimsIdx = gtDimsIdx;
+            this.gtMetricsIdx = gtMetricsIdx;
+            result = new Object[gtDimsIdx.length + gtMetricsIdx.length];
+        }
+
+        private void decodeAndSetDimensions(GTRecord record) {
+            for (int i = 0; i < gtDimsIdx.length; i++) {
+                result[i] = record.decodeValue(gtDimsIdx[i]);
+            }
+        }
+
+        @Override
+        protected Object[] finalizeResult(GTRecord record) {
+            decodeAndSetDimensions(record);
+            // decode metrics
+            for (int i = 0; i < gtMetricsIdx.length; i++) {
+                result[gtDimsIdx.length + i] = record.decodeValue(gtMetricsIdx[i]);
+            }
+            return result;
+        }
+
+        @Override
+        protected Object[] finalizeResult(GTRecord record, Object[] aggStates) {
+            decodeAndSetDimensions(record);
+            // set metrics
+            for (int i = 0; i < gtMetricsIdx.length; i++) {
+                result[gtDimsIdx.length + i] = aggStates[i];
+            }
+            return result;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/10fc7766/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java
index 77cc2d8..1ae229a 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/GTScanReqSerDerTest.java
@@ -29,6 +29,7 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.gridtable.CubeGridTable;
+import org.apache.kylin.cube.kv.CubeDimEncMap;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -90,7 +91,8 @@ public class GTScanReqSerDerTest extends LocalFileMetadataTestCase {
         CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube("test_kylin_cube_with_slr_ready");
         CubeSegment segment = cube.getFirstSegment();
 
-        GTInfo info = CubeGridTable.newGTInfo(segment, Cuboid.getBaseCuboidId(cube.getDescriptor()));
+        Cuboid baseCuboid = Cuboid.getBaseCuboid(cube.getDescriptor());
+        GTInfo info = CubeGridTable.newGTInfo(baseCuboid, new CubeDimEncMap(segment));
         GTInfo.serializer.serialize(info, buffer);
         buffer.flip();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/10fc7766/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index 998f1db..bb17054 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -18,10 +18,12 @@
 
 package org.apache.kylin.storage;
 
+import java.util.Comparator;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase;
 import org.slf4j.Logger;
@@ -48,6 +50,9 @@ public class StorageContext {
     private boolean needStorageAggregation = false;
     private boolean enableCoprocessor = false;
 
+    private boolean enableStreamAggregate = false;
+    private Comparator<GTRecord> groupKeyComparator;
+
     private IStorageQuery storageQuery;
     private AtomicLong processedRowCount = new AtomicLong();
     private Cuboid cuboid;
@@ -230,4 +235,19 @@ public class StorageContext {
         this.storageQuery = storageQuery;
     }
 
+    public boolean isStreamAggregateEnabled() {
+        return enableStreamAggregate;
+    }
+
+    public void enableStreamAggregate() {
+        this.enableStreamAggregate = true;
+    }
+
+    public Comparator<GTRecord> getGroupKeyComparator() {
+        return groupKeyComparator;
+    }
+
+    public void setGroupKeyComparator(Comparator<GTRecord> groupKeyComparator) {
+        this.groupKeyComparator = groupKeyComparator;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/10fc7766/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
index 6911827..c3cc858 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
@@ -38,6 +38,7 @@ import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
 import org.apache.kylin.cube.gridtable.RecordComparators;
 import org.apache.kylin.cube.gridtable.ScanRangePlannerBase;
 import org.apache.kylin.cube.gridtable.SegmentGTStartAndEnd;
+import org.apache.kylin.cube.kv.CubeDimEncMap;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
@@ -85,7 +86,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
         Set<TblColRef> filterDims = Sets.newHashSet();
         TupleFilter.collectColumns(filter, filterDims);
 
-        this.gtInfo = CubeGridTable.newGTInfo(cubeSegment, cuboid.getId());
+        this.gtInfo = CubeGridTable.newGTInfo(cuboid, new CubeDimEncMap(cubeSegment));
         CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
 
         IGTComparator comp = gtInfo.getCodeSystem().getComparator();

http://git-wip-us.apache.org/repos/asf/kylin/blob/10fc7766/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
index 4f206d4..31a9f99 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
@@ -78,7 +78,7 @@ public class CubeSegmentScanner implements IGTScanner {
         }
         scanRequest = scanRangePlanner.planScanRequest();
         String gtStorage = ((GTCubeStorageQueryBase) context.getStorageQuery()).getGTStorage();
-        scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage);
+        scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage, context);
     }
 
     @Override
@@ -96,8 +96,7 @@ public class CubeSegmentScanner implements IGTScanner {
         return scanRequest == null ? null : scanRequest.getInfo();
     }
 
-    public CubeSegment getSegment() {
-        return this.cubeSeg;
+    public GTScanRequest getScanRequest() {
+        return scanRequest;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/10fc7766/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
index 280718f..b762e5c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
@@ -28,10 +28,8 @@ import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.dict.lookup.LookupStringTable;
-import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
 import org.apache.kylin.metadata.model.FunctionDesc;
@@ -43,7 +41,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 /**
- * convert GTRecord to tuple
+ * Convert Object[] (decoded GTRecord) to tuple
  */
 public class CubeTupleConverter implements ITupleConverter {
 
@@ -54,7 +52,6 @@ public class CubeTupleConverter implements ITupleConverter {
 
     private final int[] gtColIdx;
     private final int[] tupleIdx;
-    private final Object[] gtValues;
     private final MeasureType<?>[] measureTypes;
 
     private final List<IAdvMeasureFiller> advMeasureFillers;
@@ -63,19 +60,16 @@ public class CubeTupleConverter implements ITupleConverter {
     private final int nSelectedDims;
 
     public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, //
-                              Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo) {
+                              Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, int[] gtColIdx,
+                              TupleInfo returnTupleInfo) {
         this.cubeSeg = cubeSeg;
         this.cuboid = cuboid;
+        this.gtColIdx = gtColIdx;
         this.tupleInfo = returnTupleInfo;
         this.derivedColFillers = Lists.newArrayList();
 
-        List<TblColRef> cuboidDims = cuboid.getColumns();
-        CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
-
         nSelectedDims = selectedDimensions.size();
-        gtColIdx = new int[selectedDimensions.size() + selectedMetrics.size()];
         tupleIdx = new int[selectedDimensions.size() + selectedMetrics.size()];
-        gtValues = new Object[selectedDimensions.size() + selectedMetrics.size()];
 
         // measure types don't have this many, but aligned length make programming easier
         measureTypes = new MeasureType[selectedDimensions.size() + selectedMetrics.size()];
@@ -89,21 +83,11 @@ public class CubeTupleConverter implements ITupleConverter {
 
         // pre-calculate dimension index mapping to tuple
         for (TblColRef dim : selectedDimensions) {
-            int dimIndex = mapping.getIndexOf(dim);
-            gtColIdx[i] = dimIndex;
             tupleIdx[i] = tupleInfo.hasColumn(dim) ? tupleInfo.getColumnIndex(dim) : -1;
-
-            //            if (tupleIdx[iii] == -1) {
-            //                throw new IllegalStateException("dim not used in tuple:" + dim);
-            //            }
-
             i++;
         }
 
         for (FunctionDesc metric : selectedMetrics) {
-            int metricIndex = mapping.getIndexOf(metric);
-            gtColIdx[i] = metricIndex;
-
             if (metric.needRewrite()) {
                 String rewriteFieldName = metric.getRewriteFieldName();
                 tupleIdx[i] = tupleInfo.hasField(rewriteFieldName) ? tupleInfo.getFieldIndex(rewriteFieldName) : -1;
@@ -126,7 +110,7 @@ public class CubeTupleConverter implements ITupleConverter {
         }
 
         // prepare derived columns and filler
-        Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCubeDesc().getHostToDerivedInfo(cuboidDims, null);
+        Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCubeDesc().getHostToDerivedInfo(cuboid.getColumns(), null);
         for (Entry<Array<TblColRef>, List<DeriveInfo>> entry : hostToDerivedInfo.entrySet()) {
             TblColRef[] hostCols = entry.getKey().data;
             for (DeriveInfo deriveInfo : entry.getValue()) {
@@ -148,9 +132,8 @@ public class CubeTupleConverter implements ITupleConverter {
     }
 
     @Override
-    public List<IAdvMeasureFiller> translateResult(GTRecord record, Tuple tuple) {
-
-        record.getValues(gtColIdx, gtValues);
+    public List<IAdvMeasureFiller> translateResult(Object[] gtValues, Tuple tuple) {
+        assert gtValues.length == gtColIdx.length;
 
         // dimensions
         for (int i = 0; i < nSelectedDims; i++) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/10fc7766/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index ecf1ad3..82590a2 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -26,15 +26,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.RawQueryLastHacker;
 import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
@@ -120,6 +123,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
 
         // set limit push down
         enableStorageLimitIfPossible(cuboid, groups, derivedPostAggregation, groupsD, filter, loosenedColumnD, sqlDigest.aggregations, context);
+        // set whether to aggregate results from multiple partitions
+        enableStreamAggregateIfBeneficial(cuboid, groupsD, context);
         // set query deadline
         context.setDeadline(cubeInstance);
 
@@ -144,8 +149,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
 
     protected abstract String getGTStorage();
     
-    protected ITupleConverter newCubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo tupleInfo) {
-        return new CubeTupleConverter(cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo);
+    protected ITupleConverter newCubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, int[] gtColIdx, TupleInfo tupleInfo) {
+        return new CubeTupleConverter(cubeSeg, cuboid, selectedDimensions, selectedMetrics, gtColIdx, tupleInfo);
     }
 
     protected void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
@@ -366,6 +371,35 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         }
     }
 
+    private void enableStreamAggregateIfBeneficial(Cuboid cuboid, Set<TblColRef> groupsD, StorageContext context) {
+        CubeDesc cubeDesc = cuboid.getCubeDesc();
+        boolean enabled = cubeDesc.getConfig().isStreamAggregateEnabled();
+
+        Set<TblColRef> shardByInGroups = Sets.newHashSet();
+        for (TblColRef col : cubeDesc.getShardByColumns()) {
+            if (groupsD.contains(col)) {
+                shardByInGroups.add(col);
+            }
+        }
+        if (!shardByInGroups.isEmpty()) {
+            enabled = false;
+            logger.debug("Aggregate partition results is not beneficial because shard by columns in groupD: " + shardByInGroups);
+        }
+
+        if (!context.isNeedStorageAggregation()) {
+            enabled = false;
+            logger.debug("Aggregate partition results is not beneficial because no storage aggregation");
+        }
+
+        if (enabled) {
+            CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
+            ImmutableBitSet cols = mapping.makeGridTableColumns(groupsD);
+
+            context.enableStreamAggregate();
+            context.setGroupKeyComparator(GTRecord.getComparator(cols));
+        }
+    }
+
     protected void notifyBeforeStorageQuery(SQLDigest sqlDigest) {
         Map<String, List<MeasureDesc>> map = Maps.newHashMap();
         for (MeasureDesc measure : cubeDesc.getMeasures()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/10fc7766/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java
index 9c50d0c..dd48e4d 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java
@@ -20,11 +20,10 @@ package org.apache.kylin.storage.gtrecord;
 
 import java.util.List;
 
-import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
 import org.apache.kylin.metadata.tuple.Tuple;
 
 public interface ITupleConverter {
 
-    public List<IAdvMeasureFiller> translateResult(GTRecord record, Tuple tuple);
+    public List<IAdvMeasureFiller> translateResult(Object[] gtValues, Tuple tuple);
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/10fc7766/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultIterator.java
new file mode 100644
index 0000000..474e1e0
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultIterator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import com.google.common.collect.UnmodifiableIterator;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+
+import java.nio.ByteBuffer;
+import java.util.NoSuchElementException;
+
+/**
+ * Support iterate over {@code GTRecord}s in storage partition result.
+ *
+ * <p>Note that the implementation returns the same object for next().
+ * Client needs to copy the returned record when needed.
+ */
+public class PartitionResultIterator extends UnmodifiableIterator<GTRecord> {
+    private final ByteBuffer buffer;
+    private final ImmutableBitSet cols;
+    private final GTRecord record; // reuse to avoid object creation
+
+    public PartitionResultIterator(byte[] data, GTInfo info, ImmutableBitSet cols) {
+        this.buffer = ByteBuffer.wrap(data);
+        this.cols = cols;
+        this.record = new GTRecord(info);
+    }
+
+    @Override
+    public boolean hasNext() {
+        return buffer.hasRemaining();
+    }
+
+    @Override
+    public GTRecord next() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+        record.loadColumns(cols, buffer);
+        return record;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/10fc7766/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java
new file mode 100644
index 0000000..52029d3
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PartitionResultMerger.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.UnmodifiableIterator;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.PriorityQueue;
+
+/**
+ * Merge-sort {@code GTRecord}s in all partitions, assume each partition contains sorted elements.
+ */
+public class PartitionResultMerger implements Iterable<GTRecord> {
+    private final ImmutableList<PartitionResultIterator> partitionResults;
+    private final GTInfo info;
+    private final Comparator<GTRecord> comparator;
+
+    public PartitionResultMerger(
+            Iterable<PartitionResultIterator> partitionResults,
+            GTInfo info, Comparator<GTRecord> comparator) {
+        this.partitionResults = ImmutableList.copyOf(partitionResults);
+        this.info = info;
+        this.comparator = comparator;
+    }
+
+    @Override
+    public Iterator<GTRecord> iterator() {
+        if (partitionResults.size() == 1) {
+            return partitionResults.get(0);
+        }
+        return new MergingResultsIterator();
+    }
+
+    private class MergingResultsIterator extends UnmodifiableIterator<GTRecord> {
+        final GTRecord record = new GTRecord(info); // reuse to avoid object creation
+
+        PriorityQueue<PeekingIterator<GTRecord>> heap;
+
+        MergingResultsIterator() {
+            Comparator<PeekingIterator<GTRecord>> heapComparator = new Comparator<PeekingIterator<GTRecord>>() {
+                public int compare(PeekingIterator<GTRecord> o1, PeekingIterator<GTRecord> o2) {
+                    return comparator.compare(o1.peek(), o2.peek());
+                }
+            };
+            this.heap = new PriorityQueue<>(partitionResults.size(), heapComparator);
+
+            for (PartitionResultIterator it : partitionResults) {
+                if (it.hasNext()) {
+                    heap.offer(Iterators.peekingIterator(it));
+                }
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            return !heap.isEmpty();
+        }
+
+        @Override
+        public GTRecord next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            // get smallest record
+            PeekingIterator<GTRecord> it = heap.poll();
+            // WATCH OUT! record got from PartitionResultIterator.next() may changed later,
+            // so we must make a shallow copy of it.
+            record.shallowCopyFrom(it.next());
+
+            if (it.hasNext()) {
+                heap.offer(it);
+            }
+
+            return record;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/10fc7766/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java
index 9e89227..fe22e9c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ScannerWorker.java
@@ -30,6 +30,7 @@ import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.gridtable.IGTStorage;
 import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.storage.StorageContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,7 +39,7 @@ public class ScannerWorker {
     private static final Logger logger = LoggerFactory.getLogger(ScannerWorker.class);
     private IGTScanner internal = null;
 
-    public ScannerWorker(ISegment segment, Cuboid cuboid, GTScanRequest scanRequest, String gtStorage) {
+    public ScannerWorker(ISegment segment, Cuboid cuboid, GTScanRequest scanRequest, String gtStorage, StorageContext context) {
         if (scanRequest == null) {
             logger.info("Segment {} will be skipped", segment);
             internal = new EmptyGTScanner();
@@ -48,7 +49,7 @@ public class ScannerWorker {
         final GTInfo info = scanRequest.getInfo();
 
         try {
-            IGTStorage rpc = (IGTStorage) Class.forName(gtStorage).getConstructor(ISegment.class, Cuboid.class, GTInfo.class).newInstance(segment, cuboid, info); // default behavior
+            IGTStorage rpc = (IGTStorage) Class.forName(gtStorage).getConstructor(ISegment.class, Cuboid.class, GTInfo.class, StorageContext.class).newInstance(segment, cuboid, info, context); // default behavior
             internal = rpc.getGTScanner(scanRequest);
         } catch (IOException | InstantiationException | InvocationTargetException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException e) {
             throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/kylin/blob/10fc7766/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
index 37699a3..11f766c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
@@ -24,8 +24,14 @@ import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Set;
 
+import com.google.common.collect.UnmodifiableIterator;
 import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
+import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTStreamAggregateScanner;
+import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -49,7 +55,7 @@ public class SegmentCubeTupleIterator implements ITupleIterator {
     protected final Tuple tuple;
     protected final StorageContext context;
 
-    protected Iterator<GTRecord> gtItr;
+    protected Iterator<Object[]> gtValues;
     protected ITupleConverter cubeTupleConverter;
     protected Tuple next;
 
@@ -66,12 +72,62 @@ public class SegmentCubeTupleIterator implements ITupleIterator {
         this.tupleInfo = returnTupleInfo;
         this.tuple = new Tuple(returnTupleInfo);
         this.context = context;
-        this.gtItr = getGTItr(scanner);
-        this.cubeTupleConverter = ((GTCubeStorageQueryBase) context.getStorageQuery()).newCubeTupleConverter(scanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo);
+
+        CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
+        int[] gtDimsIdx = mapping.getDimIndexes(selectedDimensions);
+        int[] gtMetricsIdx = mapping.getMetricsIndexes(selectedMetrics);
+        // gtColIdx = gtDimsIdx + gtMetricsIdx
+        int[] gtColIdx = new int[gtDimsIdx.length + gtMetricsIdx.length];
+        System.arraycopy(gtDimsIdx, 0, gtColIdx, 0, gtDimsIdx.length);
+        System.arraycopy(gtMetricsIdx, 0, gtColIdx, gtDimsIdx.length, gtMetricsIdx.length);
+
+        this.gtValues = getGTValuesIterator(scanner.iterator(), scanner.getScanRequest(), gtDimsIdx, gtMetricsIdx);
+        this.cubeTupleConverter = ((GTCubeStorageQueryBase) context.getStorageQuery()).newCubeTupleConverter(
+                scanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, gtColIdx, tupleInfo);
     }
 
-    private Iterator<GTRecord> getGTItr(CubeSegmentScanner scanner) {
-        return scanner.iterator();
+    private Iterator<Object[]> getGTValuesIterator(
+            final Iterator<GTRecord> records, final GTScanRequest scanRequest,
+            final int[] gtDimsIdx, final int[] gtMetricsIdx) {
+
+        boolean singlePartitionResult = records instanceof PartitionResultIterator;
+        if (context.isStreamAggregateEnabled() && !singlePartitionResult) {
+            // input records are ordered, leverage stream aggregator to produce possibly fewer records
+            IGTScanner inputScanner = new IGTScanner() {
+                public GTInfo getInfo() {
+                    return scanRequest.getInfo();
+                }
+
+                public void close() throws IOException {}
+
+                public Iterator<GTRecord> iterator() {
+                    return records;
+                }
+            };
+            GTStreamAggregateScanner aggregator = new GTStreamAggregateScanner(
+                    inputScanner, scanRequest, context.getGroupKeyComparator());
+            return aggregator.valuesIterator(gtDimsIdx, gtMetricsIdx);
+        }
+
+        // simply decode records
+        return new UnmodifiableIterator<Object[]>() {
+            Object[] result = new Object[gtDimsIdx.length + gtMetricsIdx.length];
+
+            public boolean hasNext() {
+                return records.hasNext();
+            }
+
+            public Object[] next() {
+                GTRecord record = records.next();
+                for (int i = 0; i < gtDimsIdx.length; i++) {
+                    result[i] = record.decodeValue(gtDimsIdx[i]);
+                }
+                for (int i = 0; i < gtMetricsIdx.length; i++) {
+                    result[gtDimsIdx.length + i] = record.decodeValue(gtMetricsIdx[i]);
+                }
+                return result;
+            }
+        };
     }
 
     @Override
@@ -91,13 +147,13 @@ public class SegmentCubeTupleIterator implements ITupleIterator {
         }
 
         // now we have a GTRecord
-        if (!gtItr.hasNext()) {
+        if (!gtValues.hasNext()) {
             return false;
         }
-        GTRecord curRecord = gtItr.next();
+        Object[] gtValues = this.gtValues.next();
 
         // translate into tuple
-        advMeasureFillers = cubeTupleConverter.translateResult(curRecord, tuple);
+        advMeasureFillers = cubeTupleConverter.translateResult(gtValues, tuple);
 
         // the simple case
         if (advMeasureFillers == null) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/10fc7766/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
index 1a80bbf..0f1e191 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/StorageResponseGTScatter.java
@@ -18,22 +18,20 @@
 
 package org.apache.kylin.storage.gtrecord;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-
-import javax.annotation.Nullable;
-
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.storage.StorageContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Iterators;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
 
 /**
  * scatter the blob returned from region server to a iterable of gtrecords
@@ -42,18 +40,20 @@ public class StorageResponseGTScatter implements IGTScanner {
 
     private static final Logger logger = LoggerFactory.getLogger(StorageResponseGTScatter.class);
 
-    private GTInfo info;
+    private final GTInfo info;
     private IPartitionStreamer partitionStreamer;
-    private Iterator<byte[]> blocks;
-    private ImmutableBitSet columns;
-    private int storagePushDownLimit = -1;
+    private final Iterator<byte[]> blocks;
+    private final ImmutableBitSet columns;
+    private final StorageContext context;
+    private final boolean needSorted; // whether scanner should return sorted records
 
-    public StorageResponseGTScatter(GTInfo info, IPartitionStreamer partitionStreamer, ImmutableBitSet columns, int storagePushDownLimit) {
-        this.info = info;
+    public StorageResponseGTScatter(GTScanRequest scanRequest, IPartitionStreamer partitionStreamer, StorageContext context) {
+        this.info = scanRequest.getInfo();
         this.partitionStreamer = partitionStreamer;
         this.blocks = partitionStreamer.asByteArrayIterator();
-        this.columns = columns;
-        this.storagePushDownLimit = storagePushDownLimit;
+        this.columns = scanRequest.getColumns();
+        this.context = context;
+        this.needSorted = (context.getFinalPushDownLimit() != Integer.MAX_VALUE) || context.isStreamAggregateEnabled();
     }
 
     @Override
@@ -69,48 +69,18 @@ public class StorageResponseGTScatter implements IGTScanner {
 
     @Override
     public Iterator<GTRecord> iterator() {
-        Iterator<Iterator<GTRecord>> shardSubsets = Iterators.transform(blocks, new EndpointResponseGTScatterFunc());
-        if (StorageContext.mergeSortPartitionResults(storagePushDownLimit)) {
-            logger.info("Using SortedIteratorMergerWithLimit to merge partition results");
-            return new SortedIteratorMergerWithLimit<GTRecord>(shardSubsets, storagePushDownLimit, GTRecord.getPrimaryKeyComparator()).getIterator();
-        } else {
-            logger.info("Using Iterators.concat to merge partition results");
-            return Iterators.concat(shardSubsets);
+        List<PartitionResultIterator> partitionResults = Lists.newArrayList();
+        while (blocks.hasNext()) {
+            partitionResults.add(new PartitionResultIterator(blocks.next(), info, columns));
         }
-    }
-
-    class EndpointResponseGTScatterFunc implements Function<byte[], Iterator<GTRecord>> {
-        @Nullable
-        @Override
-        public Iterator<GTRecord> apply(@Nullable final byte[] input) {
-
-            return new Iterator<GTRecord>() {
-                private ByteBuffer inputBuffer = null;
-                //rotate between two buffer GTRecord to support SortedIteratorMergerWithLimit, which will peek one more GTRecord
-                private GTRecord firstRecord = null;
-
-                @Override
-                public boolean hasNext() {
-                    if (inputBuffer == null) {
-                        inputBuffer = ByteBuffer.wrap(input);
-                        firstRecord = new GTRecord(info);
-                    }
 
-                    return inputBuffer.position() < inputBuffer.limit();
-                }
-
-                @Override
-                public GTRecord next() {
-                    firstRecord.loadColumns(columns, inputBuffer);
-                    return firstRecord;
-                }
-
-                @Override
-                public void remove() {
-                    throw new UnsupportedOperationException();
-                }
-            };
+        if (!needSorted) {
+            logger.debug("Using Iterators.concat to merge partition results");
+            return Iterators.concat(partitionResults.iterator());
         }
-    }
 
+        logger.debug("Using PartitionResultMerger to merge partition results");
+        PartitionResultMerger merger = new PartitionResultMerger(partitionResults, info, context.getGroupKeyComparator());
+        return merger.iterator();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/10fc7766/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 82b67b6..e822ada 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -47,6 +47,7 @@ import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.gtrecord.DummyPartitionStreamer;
 import org.apache.kylin.storage.gtrecord.StorageResponseGTScatter;
 import org.apache.kylin.storage.hbase.HBaseConnection;
@@ -69,8 +70,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
     private static ExecutorService executorService = new LoggableCachedThreadPool();
 
-    public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo) {
-        super(segment, cuboid, fullGTInfo);
+    public CubeHBaseEndpointRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo, StorageContext context) {
+        super(segment, cuboid, fullGTInfo, context);
     }
 
     private byte[] getByteArrayForShort(short v) {
@@ -245,7 +246,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
             });
         }
 
-        return new StorageResponseGTScatter(fullGTInfo, new DummyPartitionStreamer(epResultItr), scanRequest.getColumns(), scanRequest.getStoragePushDownLimit());
+        return new StorageResponseGTScatter(scanRequest, new DummyPartitionStreamer(epResultItr), storageContext);
     }
 
     private ByteString serializeGTScanReq(GTScanRequest scanRequest) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/10fc7766/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 88e7176..db81646 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -49,6 +49,7 @@ import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRange;
 import org.apache.kylin.gridtable.IGTStorage;
 import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,17 +65,19 @@ public abstract class CubeHBaseRPC implements IGTStorage {
     final protected Cuboid cuboid;
     final protected GTInfo fullGTInfo;
     final protected QueryContext queryContext;
+    final protected StorageContext storageContext;
 
     final private RowKeyEncoder fuzzyKeyEncoder;
     final private RowKeyEncoder fuzzyMaskEncoder;
 
-    public CubeHBaseRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo) {
+    public CubeHBaseRPC(ISegment segment, Cuboid cuboid, GTInfo fullGTInfo, StorageContext context) {
         Preconditions.checkArgument(segment instanceof CubeSegment, "segment must be CubeSegment");
         
         this.cubeSeg = (CubeSegment) segment;
         this.cuboid = cuboid;
         this.fullGTInfo = fullGTInfo;
         this.queryContext = QueryContext.current();
+        this.storageContext = context;
 
         this.fuzzyKeyEncoder = new FuzzyKeyEncoder(cubeSeg, cuboid);
         this.fuzzyMaskEncoder = new FuzzyMaskEncoder(cubeSeg, cuboid);

http://git-wip-us.apache.org/repos/asf/kylin/blob/10fc7766/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
index 33f8d90..951e2ef 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -42,6 +42,7 @@ import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.gridtable.IGTStore;
 import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -87,8 +88,8 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
         }
     }
 
-    public CubeHBaseScanRPC(ISegment segment, Cuboid cuboid, final GTInfo fullGTInfo) {
-        super(segment, cuboid, fullGTInfo);
+    public CubeHBaseScanRPC(ISegment segment, Cuboid cuboid, final GTInfo fullGTInfo, StorageContext context) {
+        super(segment, cuboid, fullGTInfo, context);
     }
 
     @Override