You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/05/25 12:39:49 UTC

incubator-kylin git commit: KYLIN-770 optimize InMemCubeBuilder memory usage

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 9c0b3a954 -> 47c6ccd5c


KYLIN-770 optimize InMemCubeBuilder memory usage


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/47c6ccd5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/47c6ccd5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/47c6ccd5

Branch: refs/heads/0.8.0
Commit: 47c6ccd5c573ae791eccdf084217a1742eefa35b
Parents: 9c0b3a9
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue May 19 14:48:41 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Mon May 25 18:39:23 2015 +0800

----------------------------------------------------------------------
 .../job/hadoop/cubev2/InMemCubeBuilder.java     | 142 +++++++++++-------
 .../cubev2/InMemCubeBuilderBenchmarkTest.java   | 117 +++++++++++++++
 pom.xml                                         |   8 +
 .../storage/gridtable/GTAggregateScanner.java   | 150 ++++++++++++++++---
 .../kylin/storage/gridtable/GTRecord.java       |  39 +++--
 .../kylin/storage/gridtable/MemoryChecker.java  |  29 ++++
 .../gridtable/memstore/GTSimpleMemStore.java    |   2 +
 7 files changed, 403 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/47c6ccd5/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
index cff3474..56989a6 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
@@ -56,22 +56,20 @@ import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.cube.CubeGridTable;
 import org.apache.kylin.storage.gridtable.*;
-import org.apache.kylin.storage.util.SizeOfUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.*;
 
 /**
  */
 public class InMemCubeBuilder implements Runnable {
 
-    //estimation of (size of aggregation cache) / (size of mem store)
-    private static final double AGGREGATION_CACHE_FACTOR = 3;
     private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
+    private static final int DEFAULT_TIMEOUT = 25;
 
     private BlockingQueue<List<String>> queue;
     private CubeDesc desc = null;
@@ -86,7 +84,6 @@ public class InMemCubeBuilder implements Runnable {
     private int[] hbaseMeasureRefIndex;
     private MeasureDesc[] measureDescs;
     private int measureCount;
-    private boolean hasDependentMeasure = false;
 
     protected IGTRecordWriter gtRecordWriter;
 
@@ -148,8 +145,6 @@ public class InMemCubeBuilder implements Runnable {
             }
         }
 
-        this.hasDependentMeasure = dependentMeasures.size() > 0;
-
         this.measureDescs = desc.getMeasures().toArray(new MeasureDesc[measureCount]);
     }
 
@@ -161,8 +156,7 @@ public class InMemCubeBuilder implements Runnable {
         return gridTable;
     }
 
-    private GridTable aggregateCuboid(GridTable parentCuboid, long parentCuboidId, long cuboidId, boolean inMem) throws IOException {
-        logger.info("Calculating cuboid " + cuboidId + " from parent " + parentCuboidId);
+    private GridTable aggregateCuboid(GridTable parentCuboid, long parentCuboidId, long cuboidId) throws IOException {
         Pair<BitSet, BitSet> columnBitSets = getDimensionAndMetricColumnBitSet(parentCuboidId);
         BitSet parentDimensions = columnBitSets.getFirst();
         BitSet measureColumns = columnBitSets.getSecond();
@@ -183,14 +177,14 @@ public class InMemCubeBuilder implements Runnable {
             mask = mask >> 1;
         }
 
-        return scanAndAggregateGridTable(parentCuboid, cuboidId, childDimensions, measureColumns, inMem);
+        return scanAndAggregateGridTable(parentCuboid, cuboidId, childDimensions, measureColumns);
 
     }
 
-    private GridTable scanAndAggregateGridTable(GridTable gridTable, long cuboidId, BitSet aggregationColumns, BitSet measureColumns, boolean inMem) throws IOException {
+    private GridTable scanAndAggregateGridTable(GridTable gridTable, long cuboidId, BitSet aggregationColumns, BitSet measureColumns) throws IOException {
         GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, aggregationColumns, measureColumns, metricsAggrFuncs, null);
         IGTScanner scanner = gridTable.scan(req);
-        GridTable newGridTable = newGridTableByCuboidID(cuboidId, inMem);
+        GridTable newGridTable = newGridTableByCuboidID(cuboidId, true);
         GTBuilder builder = newGridTable.rebuild();
 
         BitSet allNeededColumns = new BitSet();
@@ -215,7 +209,7 @@ public class InMemCubeBuilder implements Runnable {
                     newRecord.set(index, record.get(i));
                 }
 
-                if(hasDependentMeasure) {
+                if(dependentMeasures.size() > 0) {
                     // update measures which have 'dependent_measure_ref'
                     newRecord.getValues(dependentMetrics, hllObjects);
 
@@ -309,7 +303,7 @@ public class InMemCubeBuilder implements Runnable {
     public void run() {
         try {
             logger.info("Create base cuboid " + baseCuboidId);
-            final GridTable baseCuboidGT = newGridTableByCuboidID(baseCuboidId, true);
+            final GridTable baseCuboidGT = newGridTableByCuboidID(baseCuboidId, false);
 
             GTBuilder baseGTBuilder = baseCuboidGT.rebuild();
             final GTRecord baseGTRecord = new GTRecord(baseCuboidGT.getInfo());
@@ -392,6 +386,7 @@ public class InMemCubeBuilder implements Runnable {
                     createNDCuboidGT(tree, baseCuboidId, childId);
                 }
             }
+            outputGT(baseCuboidId, baseCuboidGT);
             dropStore(baseCuboidGT);
 
         } catch (IOException e) {
@@ -411,55 +406,96 @@ public class InMemCubeBuilder implements Runnable {
         record.setValues(recordValues);
     }
 
-    private long checkMemory(long threshold) {
-        final long freeMemory = Runtime.getRuntime().freeMemory();
-        logger.info("available memory:" + (freeMemory >> 10) + " KB, memory needed:" + (threshold >> 10) + " KB");
-        return freeMemory - threshold;
-    }
-
     private boolean gc(TreeNode<GridTable> parentNode) {
-        final long parentCuboidMem = SizeOfUtil.deepSizeOf(parentNode.data.getStore());
-        long threshold = (long) (parentCuboidMem * (AGGREGATION_CACHE_FACTOR + 1));
         final List<TreeNode<GridTable>> gridTables = parentNode.getAncestorList();
-        long memoryLeft = checkMemory(threshold);
+        logger.info("trying to select node to flush to disk, from:" + StringUtils.join(",", gridTables));
         for (TreeNode<GridTable> gridTable : gridTables) {
-            if (memoryLeft >= 0) {
+            final GTComboStore store = (GTComboStore) gridTable.data.getStore();
+            if (store.memoryUsage() > 0) {
+                logger.info("cuboid id:" + gridTable.id + " flush to disk");
+                long t = System.currentTimeMillis();
+                store.switchToDiskStore();
+                logger.info("switch to disk store cost:" + (System.currentTimeMillis() - t) + "ms");
+                waitForGc();
                 return true;
+            }
+        }
+        logger.warn("all ancestor nodes of " + parentNode.id + " has been flushed to disk");
+        return false;
+
+    }
+
+    private GridTable createChildCuboid(final GridTable parentCuboid, final long parentCuboidId, final long cuboidId) {
+        final ExecutorService executorService = Executors.newSingleThreadExecutor();
+        final Future<GridTable> task = executorService.submit(new Callable<GridTable>() {
+            @Override
+            public GridTable call() throws Exception {
+                return aggregateCuboid(parentCuboid, parentCuboidId, cuboidId);
+            }
+        });
+        try {
+            final GridTable gridTable = task.get(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
+            return gridTable;
+        } catch (InterruptedException e) {
+            throw new RuntimeException("this should not happen", e);
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof OutOfMemoryError) {
+                logger.warn("Future.get() OutOfMemory, stop the thread");
             } else {
-                logger.info("memory is low, try to select one node to flush to disk from:" + StringUtils.join(",", gridTables));
-                final GTComboStore store = (GTComboStore) gridTable.data.getStore();
-                if (store.memoryUsage() > 0) {
-                    final long storeSize = SizeOfUtil.deepSizeOf(store);
-                    memoryLeft += storeSize;
-                    logger.info("cuboid id:" + gridTable.id + " selected, memory used:" + (storeSize >> 10) + " KB");
-                    long t = System.currentTimeMillis();
-                    ((GTComboStore) store).switchToDiskStore();
-                    logger.info("switch to disk store cost:" + (System.currentTimeMillis() - t) + "ms");
-                }
+                throw new RuntimeException("this should not happen", e);
             }
+        } catch (TimeoutException e) {
+            logger.warn("Future.get() timeout, stop the thread");
         }
-        if (memoryLeft >= 0) {
-            return true;
-        } else {
-            logger.warn("all ancestor nodes of " + parentNode.id + " has been flushed to disk, memory is still insufficient, usually due to jvm gc not finished, forced to use memory store");
-            return true;
+        logger.info("shutdown executor service");
+        final List<Runnable> runnables = executorService.shutdownNow();
+        try {
+            executorService.awaitTermination(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
+            waitForGc();
+        } catch (InterruptedException e) {
+            throw new RuntimeException("this should not happen", e);
         }
+        return null;
+
+    }
 
+    private void waitForGc() {
+        System.gc();
+        logger.info("wait 5 seconds for gc");
+        try {
+            Thread.sleep(5000);
+        } catch (InterruptedException e) {
+            throw new RuntimeException("should not happen", e);
+        }
     }
 
     private void createNDCuboidGT(SimpleGridTableTree parentNode, long parentCuboidId, long cuboidId) throws IOException {
 
         long startTime = System.currentTimeMillis();
 
-        GTComboStore parentStore = (GTComboStore) parentNode.data.getStore();
-        if (parentStore.memoryUsage() <= 0) {
-            long t = System.currentTimeMillis();
-            parentStore.switchToMemStore();
-            logger.info("node " + parentNode.id + " switch to mem store cost:" + (System.currentTimeMillis() - t) + "ms");
+//        GTComboStore parentStore = (GTComboStore) parentNode.data.getStore();
+//        if (parentStore.memoryUsage() <= 0) {
+//            long t = System.currentTimeMillis();
+//            parentStore.switchToMemStore();
+//            logger.info("node " + parentNode.id + " switch to mem store cost:" + (System.currentTimeMillis() - t) + "ms");
+//        }
+
+        GridTable currentCuboid;
+        while (true) {
+            logger.info("Calculating cuboid " + cuboidId + " from parent " + parentCuboidId);
+            currentCuboid = createChildCuboid(parentNode.data, parentCuboidId, cuboidId);
+            if (currentCuboid != null) {
+                break;
+            } else {
+                logger.warn("create child cuboid:" + cuboidId + " from parent:" + parentCuboidId + " failed, prepare to gc");
+                if (gc(parentNode)) {
+                    continue;
+                } else {
+                    logger.warn("all parent node has been flushed into disk, memory is still insufficient");
+                    throw new RuntimeException("all parent node has been flushed into disk, memory is still insufficient");
+                }
+            }
         }
-
-        boolean inMem = gc(parentNode);
-        GridTable currentCuboid = aggregateCuboid(parentNode.data, parentCuboidId, cuboidId, inMem);
         SimpleGridTableTree node = new SimpleGridTableTree();
         node.parent = parentNode;
         node.data = currentCuboid;
@@ -476,13 +512,15 @@ public class InMemCubeBuilder implements Runnable {
             }
         }
 
-        startTime = System.currentTimeMillis();
+
         //output the grid table
         outputGT(cuboidId, currentCuboid);
         dropStore(currentCuboid);
         parentNode.children.remove(node);
-        logger.info("Cuboid" + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
-
+        if (parentNode.children.size() > 0) {
+            logger.info("cuboid:" + cuboidId + " has finished, parent node:" + parentNode.id + " need to switch to mem store");
+            ((GTComboStore) parentNode.data.getStore()).switchToMemStore();
+        }
     }
 
     private void dropStore(GridTable gt) throws IOException {
@@ -491,11 +529,13 @@ public class InMemCubeBuilder implements Runnable {
 
 
     private void outputGT(Long cuboidId, GridTable gridTable) throws IOException {
+        long startTime = System.currentTimeMillis();
         GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
         IGTScanner scanner = gridTable.scan(req);
         for (GTRecord record : scanner) {
             this.gtRecordWriter.write(cuboidId, record);
         }
+        logger.info("Cuboid" + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
     }
 
     private static class TreeNode<T> {
@@ -506,7 +546,7 @@ public class InMemCubeBuilder implements Runnable {
 
         List<TreeNode<T>> getAncestorList() {
             ArrayList<TreeNode<T>> result = Lists.newArrayList();
-            TreeNode<T> parent = this.parent;
+            TreeNode<T> parent = this;
             while (parent != null) {
                 result.add(parent);
                 parent = parent.parent;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/47c6ccd5/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilderBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilderBenchmarkTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilderBenchmarkTest.java
new file mode 100644
index 0000000..e90a41b
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilderBenchmarkTest.java
@@ -0,0 +1,117 @@
+package org.apache.kylin.job.hadoop.cubev2;
+
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ */
+public class InMemCubeBuilderBenchmarkTest extends LocalFileMetadataTestCase {
+
+    private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderBenchmarkTest.class);
+
+    private static final int BENCHMARK_RECORD_LIMIT = 2000000;
+    private static final String CUBE_NAME = "test_kylin_cube_with_slr_1_new_segment";
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    private Map<TblColRef, Dictionary<?>> getDictionaryMap(CubeSegment cubeSegment) {
+        final Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
+        final CubeDesc desc = cubeSegment.getCubeDesc();
+        for (DimensionDesc dim : desc.getDimensions()) {
+            // dictionary
+            for (TblColRef col : dim.getColumnRefs()) {
+                if (desc.getRowkey().isUseDictionary(col)) {
+                    Dictionary dict = cubeSegment.getDictionary(col);
+                    if (dict == null) {
+                        throw new IllegalArgumentException("Dictionary for " + col + " was not found.");
+                    }
+                    logger.info("Dictionary for " + col + " was put into dictionary map.");
+                    dictionaryMap.put(col, cubeSegment.getDictionary(col));
+                }
+            }
+        }
+        return dictionaryMap;
+    }
+
+    private static class ConsoleGTRecordWriter implements IGTRecordWriter {
+
+        boolean verbose = false;
+
+        @Override
+        public void write(Long cuboidId, GTRecord record) throws IOException {
+            if (verbose)
+                System.out.println(record.toString());
+        }
+    }
+
+    private void loadDataFromLocalFile(LinkedBlockingQueue queue) throws IOException, InterruptedException {
+        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("../table.txt")));
+        String line;
+        int counter = 0;
+        while ((line = br.readLine()) != null) {
+            queue.put(Arrays.asList(line.split("\t")));
+            counter++;
+            if (counter == BENCHMARK_RECORD_LIMIT) {
+                break;
+            }
+        }
+        queue.put(Collections.emptyList());
+    }
+
+    private void loadDataFromRandom(LinkedBlockingQueue queue) throws IOException, InterruptedException {
+        queue.put(Collections.emptyList());
+    }
+
+
+    @Test
+    public void test() throws Exception {
+        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        final CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+        final CubeInstance cube = cubeManager.getCube(CUBE_NAME);
+        final CubeSegment cubeSegment = cube.getFirstSegment();
+
+        LinkedBlockingQueue queue = new LinkedBlockingQueue<List<String>>();
+
+        InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube, getDictionaryMap(cubeSegment), new ConsoleGTRecordWriter());
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        Future<?> future = executorService.submit(cubeBuilder);
+        loadDataFromLocalFile(queue);
+        future.get();
+        logger.info("stream build finished");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/47c6ccd5/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index cb091a9..2e9b4f9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -68,6 +68,8 @@
         <commons-configuration.version>1.9</commons-configuration.version>
         <commons-daemon.version>1.0.15</commons-daemon.version>
         <commons-httpclient.version>3.1</commons-httpclient.version>
+        <commons-collections4.version>4.0</commons-collections4.version>
+
 
         <!-- Utility -->
         <log4j.version>1.2.17</log4j.version>
@@ -338,6 +340,11 @@
                 <version>${commons-httpclient.version}</version>
             </dependency>
             <dependency>
+                <groupId>org.apache.commons</groupId>
+                <artifactId>commons-collections4</artifactId>
+                <version>${commons-collections4.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>com.google.guava</groupId>
                 <artifactId>guava</artifactId>
                 <version>${guava.version}</version>
@@ -609,6 +616,7 @@
 
                                 <exclude>**/Kafka*Test.java</exclude>
                                 <exclude>**/RequesterTest.java</exclude>
+                                <exclude>**/InMemCubeBuilderBenchmarkTest.java</exclude>
                             </excludes>
                             <systemProperties>
                                 <property>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/47c6ccd5/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
index 14a3efa..f3de4f6 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
@@ -1,18 +1,23 @@
 package org.apache.kylin.storage.gridtable;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.BitSet;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.SortedMap;
 
-import org.apache.kylin.metadata.measure.MeasureAggregator;
-
-import com.google.common.collect.Maps;
-
 public class GTAggregateScanner implements IGTScanner {
 
+    private static final Logger logger = LoggerFactory.getLogger(GTAggregateScanner.class);
     final GTInfo info;
     final BitSet dimensions; // dimensions to return, can be more than group by
     final BitSet groupBy;
@@ -20,6 +25,7 @@ public class GTAggregateScanner implements IGTScanner {
     final String[] metricsAggrFuncs;
     final IGTScanner inputScanner;
 
+
     public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req) {
         if (req.hasAggregation() == false)
             throw new IllegalStateException();
@@ -55,13 +61,119 @@ public class GTAggregateScanner implements IGTScanner {
 
     @Override
     public Iterator<GTRecord> iterator() {
-        AggregationCache aggrCache = new AggregationCache();
+        AggregationCacheWithBytesKey aggregationCacheWithBytesKey = new AggregationCacheWithBytesKey();
         for (GTRecord r : inputScanner) {
-            aggrCache.aggregate(r);
+            aggregationCacheWithBytesKey.aggregate(r);
+            MemoryChecker.checkMemory();
         }
-        return aggrCache.iterator();
+        return aggregationCacheWithBytesKey.iterator();
     }
 
+    class AggregationCacheWithBytesKey {
+        final SortedMap<byte[], MeasureAggregator[]> aggBufMap;
+
+        public AggregationCacheWithBytesKey() {
+            aggBufMap = Maps.newTreeMap(new Comparator<byte[]>() {
+                @Override
+                public int compare(byte[] o1, byte[] o2) {
+                    int result = 0;
+                    Preconditions.checkArgument(o1.length == o2.length);
+                    final int length = o1.length;
+                    for (int i = 0; i < length; ++i) {
+                        result = o1[i] - o2[i];
+                        if (result == 0) {
+                            continue;
+                        } else {
+                            return result;
+                        }
+                    }
+                    return result;
+                }
+            });
+        }
+
+        private byte[] createKey(GTRecord record) {
+            byte[] result = new byte[info.getMaxColumnLength(groupBy)];
+            int offset = 0;
+            for (int i = groupBy.nextSetBit(0); i >= 0; i = groupBy.nextSetBit(i + 1)) {
+                final ByteArray byteArray = record.cols[i];
+                final int columnLength = info.codeSystem.maxCodeLength(i);
+                System.arraycopy(byteArray.array(), byteArray.offset(), result, offset, columnLength);
+                offset += columnLength;
+            }
+            assert offset == result.length;
+            return result;
+        }
+
+        void aggregate(GTRecord r) {
+            final byte[] key = createKey(r);
+            MeasureAggregator[] aggrs = aggBufMap.get(key);
+            if (aggrs == null) {
+                aggrs = new MeasureAggregator[metricsAggrFuncs.length];
+                for (int i = 0, col = -1; i < aggrs.length; i++) {
+                    col = metrics.nextSetBit(col + 1);
+                    aggrs[i] = info.codeSystem.newMetricsAggregator(metricsAggrFuncs[i], col);
+                }
+                aggBufMap.put(key, aggrs);
+            }
+            for (int i = 0, col = -1; i < aggrs.length; i++) {
+                col = metrics.nextSetBit(col + 1);
+                Object metrics = info.codeSystem.decodeColumnValue(col, r.cols[col].asBuffer());
+                aggrs[i].aggregate(metrics);
+            }
+        }
+
+        public Iterator<GTRecord> iterator() {
+            return new Iterator<GTRecord>() {
+
+                final Iterator<Entry<byte[], MeasureAggregator[]>> it = aggBufMap.entrySet().iterator();
+
+                final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics));
+                final GTRecord secondRecord;
+
+                {
+                    BitSet dimensionsAndMetrics = (BitSet) groupBy.clone();
+                    dimensionsAndMetrics.or(metrics);
+                    secondRecord = new GTRecord(info, dimensionsAndMetrics);
+                }
+
+                @Override
+                public boolean hasNext() {
+                    return it.hasNext();
+                }
+
+                @Override
+                public GTRecord next() {
+                    Entry<byte[], MeasureAggregator[]> entry = it.next();
+                    create(entry.getKey(), entry.getValue());
+                    return secondRecord;
+                }
+
+                private void create(byte[] key, MeasureAggregator[] value) {
+                    int offset = 0;
+                    for (int i = groupBy.nextSetBit(0); i >= 0; i = groupBy.nextSetBit(i + 1)) {
+                        final int columnLength = info.codeSystem.maxCodeLength(i);
+                        secondRecord.set(i, new ByteArray(key, offset, columnLength));
+                        offset += columnLength;
+                    }
+                    metricsBuf.clear();
+                    for (int i = 0, col = -1; i < value.length; i++) {
+                        col = metrics.nextSetBit(col + 1);
+                        int pos = metricsBuf.position();
+                        info.codeSystem.encodeColumnValue(col, value[i].getState(), metricsBuf);
+                        secondRecord.cols[col].set(metricsBuf.array(), pos, metricsBuf.position() - pos);
+                    }
+                }
+
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            };
+        }
+    }
+
+    /*
     @SuppressWarnings({ "rawtypes", "unchecked" })
     class AggregationCache {
         final SortedMap<GTRecord, MeasureAggregator[]> aggBufMap;
@@ -92,9 +204,16 @@ public class GTAggregateScanner implements IGTScanner {
         public Iterator<GTRecord> iterator() {
             return new Iterator<GTRecord>() {
                 
-                Iterator<Entry<GTRecord, MeasureAggregator[]>> it = aggBufMap.entrySet().iterator();
-                ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics));
-                GTRecord oneRecord = new GTRecord(info); // avoid instance creation
+                final Iterator<Entry<GTRecord, MeasureAggregator[]>> it = aggBufMap.entrySet().iterator();
+
+                final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics));
+                final GTRecord oneRecord;  // avoid instance creation
+
+                {
+                    BitSet dimensionsAndMetrics = (BitSet) groupBy.clone();
+                    dimensionsAndMetrics.or(metrics);
+                    oneRecord = new GTRecord(info, dimensionsAndMetrics);
+                }
 
                 @Override
                 public boolean hasNext() {
@@ -104,7 +223,7 @@ public class GTAggregateScanner implements IGTScanner {
                 @Override
                 public GTRecord next() {
                     Entry<GTRecord, MeasureAggregator[]> entry = it.next();
-                    
+
                     GTRecord dims = entry.getKey();
                     for (int i = dimensions.nextSetBit(0); i >= 0; i = dimensions.nextSetBit(i + 1)) {
                         oneRecord.cols[i].set(dims.cols[i]);
@@ -118,7 +237,6 @@ public class GTAggregateScanner implements IGTScanner {
                         info.codeSystem.encodeColumnValue(col, aggrs[i].getState(), metricsBuf);
                         oneRecord.cols[col].set(metricsBuf.array(), pos, metricsBuf.position() - pos);
                     }
-                    
                     return oneRecord;
                 }
 
@@ -129,12 +247,6 @@ public class GTAggregateScanner implements IGTScanner {
             };
         }
 
-        public long getSize() {
-            return aggBufMap.size();
-        }
-
-        // ============================================================================
-        
         transient int rowMemBytes;
         static final int MEMORY_USAGE_CAP = 500 * 1024 * 1024; // 500 MB
 
@@ -158,4 +270,6 @@ public class GTAggregateScanner implements IGTScanner {
         }
     }
 
+    */
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/47c6ccd5/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
index 3d0f9bb..0e5c1b7 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
@@ -1,12 +1,12 @@
 package org.apache.kylin.storage.gridtable;
 
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.metadata.filter.IFilterCodeSystem;
+
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.BitSet;
 
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.metadata.filter.IFilterCodeSystem;
-
 public class GTRecord implements Comparable<GTRecord> {
 
     final GTInfo info;
@@ -14,12 +14,19 @@ public class GTRecord implements Comparable<GTRecord> {
 
     private BitSet maskForEqualHashComp;
 
-    public GTRecord(GTInfo info) {
+    public GTRecord(GTInfo info, BitSet maskForEqualHashComp) {
         this.info = info;
         this.cols = new ByteArray[info.getColumnCount()];
-        for (int i = 0; i < cols.length; i++)
-            this.cols[i] = new ByteArray();
-        this.maskForEqualHashComp = info.colAll;
+        for (int i = 0; i < cols.length; i++) {
+            if (maskForEqualHashComp.get(i)) {
+                this.cols[i] = new ByteArray();
+            }
+        }
+        this.maskForEqualHashComp = maskForEqualHashComp;
+    }
+
+    public GTRecord(GTInfo info) {
+        this(info, info.colAll);
     }
 
     public GTInfo getInfo() {
@@ -62,10 +69,11 @@ public class GTRecord implements Comparable<GTRecord> {
     public Object[] getValues(BitSet selectedColumns, Object[] result) {
         assert selectedColumns.cardinality() <= result.length;
         for (int i = 0, c = selectedColumns.nextSetBit(0); c >= 0; i++, c = selectedColumns.nextSetBit(c + 1)) {
-            if (cols[c].array() == null)
+            if (cols[c] == null || cols[c].array() == null) {
                 result[i] = null;
-            else
+            } else {
                 result[i] = info.codeSystem.decodeColumnValue(c, cols[c].asBuffer());
+            }
         }
         return result;
     }
@@ -74,10 +82,11 @@ public class GTRecord implements Comparable<GTRecord> {
         assert selectedColumns.length <= result.length;
         for (int i = 0; i < selectedColumns.length; i++) {
             int c = selectedColumns[i];
-            if (cols[c].array() == null)
+            if (cols[c].array() == null) {
                 result[i] = null;
-            else
+            } else {
                 result[i] = info.codeSystem.decodeColumnValue(c, cols[c].asBuffer());
+            }
         }
         return result;
     }
@@ -94,8 +103,7 @@ public class GTRecord implements Comparable<GTRecord> {
 
         byte[] space = new byte[len];
 
-        GTRecord copy = new GTRecord(info);
-        copy.maskForEqualHashComp = this.maskForEqualHashComp;
+        GTRecord copy = new GTRecord(info, this.maskForEqualHashComp);
         int pos = 0;
         for (int i = selectedCols.nextSetBit(0); i >= 0; i = selectedCols.nextSetBit(i + 1)) {
             System.arraycopy(cols[i].array(), cols[i].offset(), space, pos, cols[i].length());
@@ -129,8 +137,9 @@ public class GTRecord implements Comparable<GTRecord> {
         if (this.maskForEqualHashComp != o.maskForEqualHashComp)
             return false;
         for (int i = maskForEqualHashComp.nextSetBit(0); i >= 0; i = maskForEqualHashComp.nextSetBit(i + 1)) {
-            if (this.cols[i].equals(o.cols[i]) == false)
+            if (this.cols[i].equals(o.cols[i]) == false) {
                 return false;
+            }
         }
         return true;
     }
@@ -161,7 +170,7 @@ public class GTRecord implements Comparable<GTRecord> {
 
     @Override
     public String toString() {
-        return toString(info.colAll);
+        return toString(maskForEqualHashComp);
     }
     
     public String toString(BitSet selectedColumns) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/47c6ccd5/storage/src/main/java/org/apache/kylin/storage/gridtable/MemoryChecker.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/MemoryChecker.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/MemoryChecker.java
new file mode 100644
index 0000000..2dac8fe
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/MemoryChecker.java
@@ -0,0 +1,29 @@
+package org.apache.kylin.storage.gridtable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public final class MemoryChecker {
+
+    private static Logger logger = LoggerFactory.getLogger(MemoryChecker.class);
+
+    private static final int MEMORY_THRESHOLD = 80 << 20;
+
+    private MemoryChecker() {
+    }
+
+    public static final void checkMemory() {
+        if (!Thread.currentThread().isInterrupted()) {
+            final long freeMem = Runtime.getRuntime().freeMemory();
+            if (freeMem <= MEMORY_THRESHOLD) {
+                throw new OutOfMemoryError("free memory:" + freeMem + " is lower than " + MEMORY_THRESHOLD);
+            }
+        } else {
+            logger.info("thread interrupted");
+            throw new OutOfMemoryError("thread interrupted");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/47c6ccd5/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
index a4d0b8d..c97fe39 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
@@ -11,6 +11,7 @@ import org.apache.kylin.storage.gridtable.GTRecord;
 import org.apache.kylin.storage.gridtable.GTRowBlock;
 import org.apache.kylin.storage.gridtable.GTScanRequest;
 import org.apache.kylin.storage.gridtable.IGTStore;
+import org.apache.kylin.storage.gridtable.*;
 
 public class GTSimpleMemStore implements IGTStore {
 
@@ -73,6 +74,7 @@ public class GTSimpleMemStore implements IGTStore {
             } else {
                 assert id == rowBlockList.size();
                 rowBlockList.add(copy);
+                MemoryChecker.checkMemory();
             }
         }
     }