You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/12/20 05:16:40 UTC

kylin git commit: KYLIN-1233 Spill to disk when AggregationCache need too much memory

Repository: kylin
Updated Branches:
  refs/heads/2.0-rc e6b55540a -> cd30e48e3


KYLIN-1233 Spill to disk when AggregationCache need too much memory


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cd30e48e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cd30e48e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cd30e48e

Branch: refs/heads/2.0-rc
Commit: cd30e48e387e4de44428eb508740dff18642a9c4
Parents: e6b5554
Author: lidongsjtu <do...@ebay.com>
Authored: Sat Dec 19 22:19:13 2015 +0800
Committer: lidongsjtu <do...@ebay.com>
Committed: Sun Dec 20 11:47:42 2015 +0800

----------------------------------------------------------------------
 .../common/hll/HyperLogLogPlusCounter.java      |   2 +-
 .../org/apache/kylin/cube/util/KryoUtils.java   |   2 +-
 .../kylin/gridtable/GTAggregateScanner.java     | 306 ++++++++++++++++---
 .../apache/kylin/gridtable/UnitTestSupport.java |  44 ++-
 .../gridtable/AggregationCacheSpillTest.java    |  83 +++++
 5 files changed, 384 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/cd30e48e/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java b/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
index ef91509..11ae78f 100644
--- a/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
@@ -42,7 +42,7 @@ public class HyperLogLogPlusCounter implements Serializable, Comparable<HyperLog
 
     private final int p;
     private final int m;
-    private final HashFunction hashFunc;
+    private transient final HashFunction hashFunc;
     byte[] registers;
 
     public HyperLogLogPlusCounter() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd30e48e/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java
index 9dbe0d2..48f925a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java
@@ -48,7 +48,7 @@ public class KryoUtils {
         return deserialize(bytes, clazz);
     }
 
-    private static Kryo getKryo() {
+    public static Kryo getKryo() {
         if (_Kryo.get() == null) {
             Kryo kryo = new Kryo();
             kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd30e48e/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index 0ed320c..46d51a4 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -1,20 +1,33 @@
 package org.apache.kylin.gridtable;
 
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Comparator;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map.Entry;
+import java.util.PriorityQueue;
 import java.util.SortedMap;
 
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.MemoryBudgetController;
 import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
+import org.apache.kylin.cube.util.KryoUtils;
 import org.apache.kylin.metadata.measure.MeasureAggregator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 @SuppressWarnings({ "rawtypes", "unchecked" })
@@ -36,7 +49,7 @@ public class GTAggregateScanner implements IGTScanner {
     private MemoryWaterLevel memTracker;
 
     public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req, boolean enableMemCheck) {
-        if (req.hasAggregation() == false)
+        if (!req.hasAggregation())
             throw new IllegalStateException();
 
         this.info = inputScanner.getInfo();
@@ -49,6 +62,30 @@ public class GTAggregateScanner implements IGTScanner {
         this.enableMemCheck = enableMemCheck;
     }
 
+    public static long estimateSizeOfAggrCache(byte[] keySample, MeasureAggregator<?>[] aggrSample, int size) {
+        // Aggregation cache is basically a tree map. The tree map entry overhead is
+        // - 40 according to http://java-performance.info/memory-consumption-of-java-data-types-2/
+        // - 41~52 according to AggregationCacheMemSizeTest
+        return (estimateSizeOf(keySample) + estimateSizeOf(aggrSample) + 64) * size;
+    }
+
+    public static long estimateSizeOf(MeasureAggregator[] aggrs) {
+        // size of array, AggregationCacheMemSizeTest reports 4 for [0], 12 for [1], 12 for [2], 20 for [3] etc..
+        // Memory alignment to 8 bytes
+        long est = (aggrs.length + 1) / 2 * 8 + 4 + (4 /* extra */);
+        for (MeasureAggregator aggr : aggrs) {
+            if (aggr != null)
+                est += aggr.getMemBytesEstimate();
+        }
+        return est;
+    }
+
+    public static long estimateSizeOf(byte[] bytes) {
+        // AggregationCacheMemSizeTest reports 20 for byte[10] and 20 again for byte[16]
+        // Memory alignment to 8 bytes
+        return (bytes.length + 7) / 8 * 8 + 4 + (4 /* extra */);
+    }
+
     public void trackMemoryLevel(MemoryWaterLevel tracker) {
         this.memTracker = tracker;
     }
@@ -66,6 +103,7 @@ public class GTAggregateScanner implements IGTScanner {
     @Override
     public void close() throws IOException {
         inputScanner.close();
+        aggrCache.close();
     }
 
     @Override
@@ -73,6 +111,8 @@ public class GTAggregateScanner implements IGTScanner {
         for (GTRecord r : inputScanner) {
             aggrCache.aggregate(r);
         }
+        logger.info("Last spill, current AggregationCache memory estimated size is: " + getEstimateSizeOfAggrCache());
+        aggrCache.spillBuffMap();
         return aggrCache.iterator();
     }
 
@@ -81,35 +121,44 @@ public class GTAggregateScanner implements IGTScanner {
         return aggrCache.estimatedMemSize();
     }
 
-    class AggregationCache {
-        final SortedMap<byte[], MeasureAggregator[]> aggBufMap;
+    class AggregationCache implements Closeable {
+        final static double SPILL_THRESHOLD_GB = 0.5;
+
+        final List<Dump> dumps;
         final int keyLength;
         final boolean[] compareMask;
 
-        public AggregationCache() {
-            compareMask = createCompareMask();
-            keyLength = compareMask.length;
-            aggBufMap = Maps.newTreeMap(new Comparator<byte[]>() {
-                @Override
-                public int compare(byte[] o1, byte[] o2) {
-                    int result = 0;
-                    // profiler shows this check is slow
-                    // Preconditions.checkArgument(keyLength == o1.length && keyLength == o2.length);
-                    for (int i = 0; i < keyLength; ++i) {
-                        if (compareMask[i]) {
-                            int a = (o1[i] & 0xff);
-                            int b = (o2[i] & 0xff);
-                            result = a - b;
-                            if (result == 0) {
-                                continue;
-                            } else {
-                                return result;
-                            }
+        final Kryo kryo = KryoUtils.getKryo();
+
+        final Comparator<byte[]> bytesComparator = new Comparator<byte[]>() {
+            @Override
+            public int compare(byte[] o1, byte[] o2) {
+                int result = 0;
+                // profiler shows this check is slow
+                // Preconditions.checkArgument(keyLength == o1.length && keyLength == o2.length);
+                for (int i = 0; i < keyLength; ++i) {
+                    if (compareMask[i]) {
+                        int a = (o1[i] & 0xff);
+                        int b = (o2[i] & 0xff);
+                        result = a - b;
+                        if (result == 0) {
+                            continue;
+                        } else {
+                            return result;
                         }
                     }
-                    return result;
                 }
-            });
+                return result;
+            }
+        };
+
+        SortedMap<byte[], MeasureAggregator[]> aggBufMap;
+
+        public AggregationCache() {
+            compareMask = createCompareMask();
+            keyLength = compareMask.length;
+            dumps = Lists.newArrayList();
+            aggBufMap = createBuffMap();
         }
 
         private boolean[] createCompareMask() {
@@ -133,6 +182,10 @@ public class GTAggregateScanner implements IGTScanner {
             return mask;
         }
 
+        private SortedMap<byte[], MeasureAggregator[]> createBuffMap() {
+            return Maps.newTreeMap(bytesComparator);
+        }
+
         private byte[] createKey(GTRecord record) {
             byte[] result = new byte[keyLength];
             int offset = 0;
@@ -152,10 +205,13 @@ public class GTAggregateScanner implements IGTScanner {
                 if (memTracker != null) {
                     memTracker.markHigh();
                 }
-                long estimated = estimatedMemSize();
-                if (estimated > 10 * MemoryBudgetController.ONE_GB) {
-                    throw new RuntimeException("AggregationCache exceed 10GB, estimated size is: " + estimated);
-                }
+            }
+
+            // Here will spill to disk when aggBufMap used too large memory
+            long estimated = estimatedMemSize();
+            if (estimated > SPILL_THRESHOLD_GB * MemoryBudgetController.ONE_GB) {
+                logger.info("AggregationCache memory estimated size is: " + estimated);
+                spillBuffMap();
             }
 
             final byte[] key = createKey(r);
@@ -171,6 +227,28 @@ public class GTAggregateScanner implements IGTScanner {
             }
         }
 
+        private void spillBuffMap() throws RuntimeException {
+            try {
+                Dump dump = new Dump(aggBufMap);
+                dump.flush();
+                dumps.add(dump);
+                aggBufMap = createBuffMap();
+            } catch (Exception e) {
+                throw new RuntimeException("AggregationCache spill failed: " + e.getMessage());
+            }
+        }
+
+        @Override
+        public void close() throws RuntimeException {
+            try {
+                for (Dump dump : dumps) {
+                    dump.terminate();
+                }
+            } catch (Exception e) {
+                throw new RuntimeException("AggregationCache close failed: " + e.getMessage());
+            }
+        }
+
         private MeasureAggregator[] newAggregators() {
             return info.codeSystem.newMetricsAggregators(metrics, metricsAggrFuncs);
         }
@@ -185,9 +263,11 @@ public class GTAggregateScanner implements IGTScanner {
         }
 
         public Iterator<GTRecord> iterator() {
+            final DumpMerger merger = new DumpMerger(dumps);
+
             return new Iterator<GTRecord>() {
 
-                final Iterator<Entry<byte[], MeasureAggregator[]>> it = aggBufMap.entrySet().iterator();
+                final Iterator<Entry<byte[], MeasureAggregator[]>> it = merger.iterator();
 
                 final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics));
                 final GTRecord secondRecord = new GTRecord(info);
@@ -227,29 +307,157 @@ public class GTAggregateScanner implements IGTScanner {
                 }
             };
         }
-    }
 
-    public static long estimateSizeOfAggrCache(byte[] keySample, MeasureAggregator<?>[] aggrSample, int size) {
-        // Aggregation cache is basically a tree map. The tree map entry overhead is
-        // - 40 according to http://java-performance.info/memory-consumption-of-java-data-types-2/
-        // - 41~52 according to AggregationCacheMemSizeTest
-        return (estimateSizeOf(keySample) + estimateSizeOf(aggrSample) + 64) * size;
-    }
+        class Dump implements Iterable<Entry<byte[], MeasureAggregator[]>> {
+            File dumpedFile;
+            Input input;
+            SortedMap<byte[], MeasureAggregator[]> buffMap;
 
-    public static long estimateSizeOf(MeasureAggregator[] aggrs) {
-        // size of array, AggregationCacheMemSizeTest reports 4 for [0], 12 for [1], 12 for [2], 20 for [3] etc..
-        // Memory alignment to 8 bytes
-        long est = (aggrs.length + 1) / 2 * 8 + 4 + (4 /* extra */);
-        for (MeasureAggregator aggr : aggrs) {
-            if (aggr != null)
-                est += aggr.getMemBytesEstimate();
+            public Dump(SortedMap<byte[], MeasureAggregator[]> buffMap) throws IOException {
+                this.buffMap = buffMap;
+            }
+
+            @Override
+            public Iterator<Entry<byte[], MeasureAggregator[]>> iterator() {
+                try {
+                    if (dumpedFile == null || !dumpedFile.exists()) {
+                        throw new RuntimeException("Dumped file cannot be found at: " + (dumpedFile == null ? "<null>" : dumpedFile.getAbsolutePath()));
+                    }
+
+                    input = new Input(new FileInputStream(dumpedFile));
+
+                    final int count = kryo.readObject(input, Integer.class);
+                    return new Iterator<Entry<byte[], MeasureAggregator[]>>() {
+                        int cursorIdx = 0;
+
+                        @Override
+                        public boolean hasNext() {
+                            return cursorIdx < count;
+                        }
+
+                        @Override
+                        public Entry<byte[], MeasureAggregator[]> next() {
+                            try {
+                                cursorIdx++;
+                                return (ImmutablePair<byte[], MeasureAggregator[]>) kryo.readObject(input, ImmutablePair.class);
+                            } catch (Exception e) {
+                                throw new RuntimeException("Cannot read AggregationCache from dumped file: " + e.getMessage());
+                            }
+                        }
+
+                        @Override
+                        public void remove() {
+                            throw new UnsupportedOperationException();
+                        }
+                    };
+                } catch (Exception e) {
+                    throw new RuntimeException("Failed to read dumped file: " + e.getMessage());
+                }
+            }
+
+            public void flush() throws IOException {
+                if (buffMap != null) {
+                    Output output = null;
+                    try {
+                        dumpedFile = File.createTempFile("KYLIN_AGGR_", ".tmp");
+
+                        logger.info("AggregationCache will dump to file: " + dumpedFile.getAbsolutePath());
+                        output = new Output(new FileOutputStream(dumpedFile));
+                        kryo.writeObject(output, buffMap.size());
+                        for (Entry<byte[], MeasureAggregator[]> entry : buffMap.entrySet()) {
+                            kryo.writeObject(output, new ImmutablePair(entry.getKey(), entry.getValue()));
+                        }
+                    } finally {
+                        buffMap = null;
+                        if (output != null)
+                            output.close();
+                    }
+                }
+            }
+
+            public void terminate() throws IOException {
+                buffMap = null;
+                if (input != null)
+                    input.close();
+                if (dumpedFile != null && dumpedFile.exists())
+                    dumpedFile.delete();
+            }
         }
-        return est;
-    }
 
-    public static long estimateSizeOf(byte[] bytes) {
-        // AggregationCacheMemSizeTest reports 20 for byte[10] and 20 again for byte[16]
-        // Memory alignment to 8 bytes
-        return (bytes.length + 7) / 8 * 8 + 4 + (4 /* extra */);
+        class DumpMerger implements Iterable<Entry<byte[], MeasureAggregator[]>> {
+            final PriorityQueue<Entry<byte[], Integer>> minHeap;
+            final List<Iterator<Entry<byte[], MeasureAggregator[]>>> dumpIterators;
+            final List<MeasureAggregator[]> dumpCurrentValues;
+
+            public DumpMerger(List<Dump> dumps) {
+                minHeap = new PriorityQueue<>(dumps.size(), new Comparator<Entry<byte[], Integer>>() {
+                    @Override
+                    public int compare(Entry<byte[], Integer> o1, Entry<byte[], Integer> o2) {
+                        return bytesComparator.compare(o1.getKey(), o2.getKey());
+                    }
+                });
+                dumpIterators = Lists.newArrayListWithCapacity(dumps.size());
+                dumpCurrentValues = Lists.newArrayListWithCapacity(dumps.size());
+
+                Iterator<Entry<byte[], MeasureAggregator[]>> it;
+                for (int i = 0; i < dumps.size(); i++) {
+                    it = dumps.get(i).iterator();
+                    if (it.hasNext()) {
+                        dumpIterators.add(i, it);
+                        Entry<byte[], MeasureAggregator[]> entry = it.next();
+                        minHeap.offer(new ImmutablePair(entry.getKey(), i));
+                        dumpCurrentValues.add(i, entry.getValue());
+                    } else {
+                        dumpIterators.add(i, null);
+                        dumpCurrentValues.add(i, null);
+                    }
+                }
+            }
+
+            private void enqueueFromDump(int index) {
+                if (dumpIterators.get(index) != null && dumpIterators.get(index).hasNext()) {
+                    Entry<byte[], MeasureAggregator[]> entry = dumpIterators.get(index).next();
+                    minHeap.offer(new ImmutablePair(entry.getKey(), index));
+                    dumpCurrentValues.set(index, entry.getValue());
+                }
+            }
+
+            @Override
+            public Iterator<Entry<byte[], MeasureAggregator[]>> iterator() {
+                return new Iterator<Entry<byte[], MeasureAggregator[]>>() {
+                    @Override
+                    public boolean hasNext() {
+                        return !CollectionUtils.isEmpty(minHeap);
+                    }
+
+                    @Override
+                    public Entry<byte[], MeasureAggregator[]> next() {
+                        // Use minimum heap to merge sort the keys,
+                        // also do aggregation for measures with same keys in different dumps
+                        Entry<byte[], Integer> peekEntry = minHeap.poll();
+                        MeasureAggregator[] mergedAggr = dumpCurrentValues.get(peekEntry.getValue());
+                        enqueueFromDump(peekEntry.getValue());
+
+                        while (!minHeap.isEmpty() && bytesComparator.compare(peekEntry.getKey(), minHeap.peek().getKey()) == 0) {
+                            Entry<byte[], Integer> newPeek = minHeap.poll();
+
+                            MeasureAggregator[] newPeekAggr = dumpCurrentValues.get(newPeek.getValue());
+                            for (int i = 0; i < newPeekAggr.length; i++) {
+                                mergedAggr[i].aggregate(newPeekAggr[i].getState());
+                            }
+
+                            enqueueFromDump(newPeek.getValue());
+                        }
+
+                        return new ImmutablePair(peekEntry.getKey(), mergedAggr);
+                    }
+
+                    @Override
+                    public void remove() {
+                        throw new UnsupportedOperationException();
+                    }
+                };
+            }
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd30e48e/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java b/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
index 1a9f637..aea627a 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
 
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.gridtable.GTInfo.Builder;
@@ -44,6 +45,23 @@ public class UnitTestSupport {
         return info;
     }
 
+    public static GTInfo hllInfo() {
+        Builder builder = GTInfo.builder();
+        builder.setCodeSystem(new GTSampleCodeSystem());
+        builder.setColumns( //
+                DataType.getInstance("varchar(10)"), //
+                DataType.getInstance("varchar(10)"), //
+                DataType.getInstance("varchar(10)"), //
+                DataType.getInstance("bigint"), //
+                DataType.getInstance("decimal"), //
+                DataType.getInstance("hllc14") //
+        );
+        builder.setPrimaryKey(setOf(0));
+        builder.setColumnPreferIndex(setOf(0));
+        GTInfo info = builder.build();
+        return info;
+    }
+
     private static Builder infoBuilder() {
         Builder builder = GTInfo.builder();
         builder.setCodeSystem(new GTSampleCodeSystem());
@@ -81,15 +99,37 @@ public class UnitTestSupport {
         return result;
     }
 
+    public static List<GTRecord> mockupHllData(GTInfo info, int nRows) {
+        List<GTRecord> result = new ArrayList<GTRecord>(nRows);
+        int round = nRows / 10;
+        for (int i = 0; i < round; i++) {
+            String d_01_14 = datePlus("2015-01-14", i * 4);
+            String d_01_15 = datePlus("2015-01-15", i * 4);
+            String d_01_16 = datePlus("2015-01-16", i * 4);
+            String d_01_17 = datePlus("2015-01-17", i * 4);
+            result.add(newRec(info, d_01_14, "Yang", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+            result.add(newRec(info, d_01_14, "Luke", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+            result.add(newRec(info, d_01_15, "Xu", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+            result.add(newRec(info, d_01_15, "Dong", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+            result.add(newRec(info, d_01_15, "Jason", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+            result.add(newRec(info, d_01_16, "Mahone", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+            result.add(newRec(info, d_01_16, "Shaofeng", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+            result.add(newRec(info, d_01_16, "Qianhao", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+            result.add(newRec(info, d_01_16, "George", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+            result.add(newRec(info, d_01_17, "Kejia", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+        }
+        return result;
+    }
+
     private static String datePlus(String date, int plusDays) {
         long millis = DateFormat.stringToMillis(date);
         millis += (1000L * 3600L * 24L) * plusDays;
         return DateFormat.formatToDateStr(millis);
     }
 
-    private static GTRecord newRec(GTInfo info, String date, String name, String category, LongMutable amount, BigDecimal price) {
+    private static GTRecord newRec(GTInfo info, Object... values) {
         GTRecord rec = new GTRecord(info);
-        return rec.setValues(date, name, category, amount, price);
+        return rec.setValues(values);
     }
 
     private static ImmutableBitSet setOf(int... values) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/cd30e48e/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
new file mode 100644
index 0000000..606b591
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Created by dongli on 12/16/15.
+ */
+public class AggregationCacheSpillTest {
+
+    final GTInfo info = UnitTestSupport.hllInfo();
+    final List<GTRecord> data = UnitTestSupport.mockupHllData(info, 40000); // converts to about 34 MB data
+
+    @BeforeClass
+    public static void beforeClass() {
+        System.setProperty("log4j.configuration", "kylin-log4j.properties");
+    }
+
+    @Test
+    public void testAggregationCacheSpill() {
+        final List<GTRecord> testData = Lists.newArrayListWithCapacity(data.size() * 2);
+        testData.addAll(data);
+        testData.addAll(data);
+
+        IGTScanner inputScanner = new IGTScanner() {
+            @Override
+            public GTInfo getInfo() {
+                return info;
+            }
+
+            @Override
+            public int getScannedRowCount() {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public void close() throws IOException {
+            }
+
+            @Override
+            public Iterator<GTRecord> iterator() {
+                return testData.iterator();
+            }
+        };
+
+        GTScanRequest scanRequest = new GTScanRequest(info, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(0, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null, true);
+
+        GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest, false);
+
+        int count = 0;
+        for (GTRecord record : scanner) {
+            count++;
+        }
+        assertEquals(data.size(), count);
+    }
+}
\ No newline at end of file