You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/12/20 05:16:40 UTC
kylin git commit: KYLIN-1233 Spill to disk when AggregationCache need
too much memory
Repository: kylin
Updated Branches:
refs/heads/2.0-rc e6b55540a -> cd30e48e3
KYLIN-1233 Spill to disk when AggregationCache need too much memory
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/cd30e48e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/cd30e48e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/cd30e48e
Branch: refs/heads/2.0-rc
Commit: cd30e48e387e4de44428eb508740dff18642a9c4
Parents: e6b5554
Author: lidongsjtu <do...@ebay.com>
Authored: Sat Dec 19 22:19:13 2015 +0800
Committer: lidongsjtu <do...@ebay.com>
Committed: Sun Dec 20 11:47:42 2015 +0800
----------------------------------------------------------------------
.../common/hll/HyperLogLogPlusCounter.java | 2 +-
.../org/apache/kylin/cube/util/KryoUtils.java | 2 +-
.../kylin/gridtable/GTAggregateScanner.java | 306 ++++++++++++++++---
.../apache/kylin/gridtable/UnitTestSupport.java | 44 ++-
.../gridtable/AggregationCacheSpillTest.java | 83 +++++
5 files changed, 384 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/cd30e48e/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java b/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
index ef91509..11ae78f 100644
--- a/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
@@ -42,7 +42,7 @@ public class HyperLogLogPlusCounter implements Serializable, Comparable<HyperLog
private final int p;
private final int m;
- private final HashFunction hashFunc;
+ private transient final HashFunction hashFunc;
byte[] registers;
public HyperLogLogPlusCounter() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/cd30e48e/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java
index 9dbe0d2..48f925a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java
@@ -48,7 +48,7 @@ public class KryoUtils {
return deserialize(bytes, clazz);
}
- private static Kryo getKryo() {
+ public static Kryo getKryo() {
if (_Kryo.get() == null) {
Kryo kryo = new Kryo();
kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
http://git-wip-us.apache.org/repos/asf/kylin/blob/cd30e48e/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index 0ed320c..46d51a4 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -1,20 +1,33 @@
package org.apache.kylin.gridtable;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.Iterator;
+import java.util.List;
import java.util.Map.Entry;
+import java.util.PriorityQueue;
import java.util.SortedMap;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.MemoryBudgetController;
import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
+import org.apache.kylin.cube.util.KryoUtils;
import org.apache.kylin.metadata.measure.MeasureAggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -36,7 +49,7 @@ public class GTAggregateScanner implements IGTScanner {
private MemoryWaterLevel memTracker;
public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req, boolean enableMemCheck) {
- if (req.hasAggregation() == false)
+ if (!req.hasAggregation())
throw new IllegalStateException();
this.info = inputScanner.getInfo();
@@ -49,6 +62,30 @@ public class GTAggregateScanner implements IGTScanner {
this.enableMemCheck = enableMemCheck;
}
+ public static long estimateSizeOfAggrCache(byte[] keySample, MeasureAggregator<?>[] aggrSample, int size) {
+ // Aggregation cache is basically a tree map. The tree map entry overhead is
+ // - 40 according to http://java-performance.info/memory-consumption-of-java-data-types-2/
+ // - 41~52 according to AggregationCacheMemSizeTest
+ return (estimateSizeOf(keySample) + estimateSizeOf(aggrSample) + 64) * size;
+ }
+
+ public static long estimateSizeOf(MeasureAggregator[] aggrs) {
+ // size of array, AggregationCacheMemSizeTest reports 4 for [0], 12 for [1], 12 for [2], 20 for [3] etc..
+ // Memory alignment to 8 bytes
+ long est = (aggrs.length + 1) / 2 * 8 + 4 + (4 /* extra */);
+ for (MeasureAggregator aggr : aggrs) {
+ if (aggr != null)
+ est += aggr.getMemBytesEstimate();
+ }
+ return est;
+ }
+
+ public static long estimateSizeOf(byte[] bytes) {
+ // AggregationCacheMemSizeTest reports 20 for byte[10] and 20 again for byte[16]
+ // Memory alignment to 8 bytes
+ return (bytes.length + 7) / 8 * 8 + 4 + (4 /* extra */);
+ }
+
public void trackMemoryLevel(MemoryWaterLevel tracker) {
this.memTracker = tracker;
}
@@ -66,6 +103,7 @@ public class GTAggregateScanner implements IGTScanner {
@Override
public void close() throws IOException {
inputScanner.close();
+ aggrCache.close();
}
@Override
@@ -73,6 +111,8 @@ public class GTAggregateScanner implements IGTScanner {
for (GTRecord r : inputScanner) {
aggrCache.aggregate(r);
}
+ logger.info("Last spill, current AggregationCache memory estimated size is: " + getEstimateSizeOfAggrCache());
+ aggrCache.spillBuffMap();
return aggrCache.iterator();
}
@@ -81,35 +121,44 @@ public class GTAggregateScanner implements IGTScanner {
return aggrCache.estimatedMemSize();
}
- class AggregationCache {
- final SortedMap<byte[], MeasureAggregator[]> aggBufMap;
+ class AggregationCache implements Closeable {
+ final static double SPILL_THRESHOLD_GB = 0.5;
+
+ final List<Dump> dumps;
final int keyLength;
final boolean[] compareMask;
- public AggregationCache() {
- compareMask = createCompareMask();
- keyLength = compareMask.length;
- aggBufMap = Maps.newTreeMap(new Comparator<byte[]>() {
- @Override
- public int compare(byte[] o1, byte[] o2) {
- int result = 0;
- // profiler shows this check is slow
- // Preconditions.checkArgument(keyLength == o1.length && keyLength == o2.length);
- for (int i = 0; i < keyLength; ++i) {
- if (compareMask[i]) {
- int a = (o1[i] & 0xff);
- int b = (o2[i] & 0xff);
- result = a - b;
- if (result == 0) {
- continue;
- } else {
- return result;
- }
+ final Kryo kryo = KryoUtils.getKryo();
+
+ final Comparator<byte[]> bytesComparator = new Comparator<byte[]>() {
+ @Override
+ public int compare(byte[] o1, byte[] o2) {
+ int result = 0;
+ // profiler shows this check is slow
+ // Preconditions.checkArgument(keyLength == o1.length && keyLength == o2.length);
+ for (int i = 0; i < keyLength; ++i) {
+ if (compareMask[i]) {
+ int a = (o1[i] & 0xff);
+ int b = (o2[i] & 0xff);
+ result = a - b;
+ if (result == 0) {
+ continue;
+ } else {
+ return result;
}
}
- return result;
}
- });
+ return result;
+ }
+ };
+
+ SortedMap<byte[], MeasureAggregator[]> aggBufMap;
+
+ public AggregationCache() {
+ compareMask = createCompareMask();
+ keyLength = compareMask.length;
+ dumps = Lists.newArrayList();
+ aggBufMap = createBuffMap();
}
private boolean[] createCompareMask() {
@@ -133,6 +182,10 @@ public class GTAggregateScanner implements IGTScanner {
return mask;
}
+ private SortedMap<byte[], MeasureAggregator[]> createBuffMap() {
+ return Maps.newTreeMap(bytesComparator);
+ }
+
private byte[] createKey(GTRecord record) {
byte[] result = new byte[keyLength];
int offset = 0;
@@ -152,10 +205,13 @@ public class GTAggregateScanner implements IGTScanner {
if (memTracker != null) {
memTracker.markHigh();
}
- long estimated = estimatedMemSize();
- if (estimated > 10 * MemoryBudgetController.ONE_GB) {
- throw new RuntimeException("AggregationCache exceed 10GB, estimated size is: " + estimated);
- }
+ }
+
+ // Here will spill to disk when aggBufMap used too large memory
+ long estimated = estimatedMemSize();
+ if (estimated > SPILL_THRESHOLD_GB * MemoryBudgetController.ONE_GB) {
+ logger.info("AggregationCache memory estimated size is: " + estimated);
+ spillBuffMap();
}
final byte[] key = createKey(r);
@@ -171,6 +227,28 @@ public class GTAggregateScanner implements IGTScanner {
}
}
+ private void spillBuffMap() throws RuntimeException {
+ try {
+ Dump dump = new Dump(aggBufMap);
+ dump.flush();
+ dumps.add(dump);
+ aggBufMap = createBuffMap();
+ } catch (Exception e) {
+ throw new RuntimeException("AggregationCache spill failed: " + e.getMessage());
+ }
+ }
+
+ @Override
+ public void close() throws RuntimeException {
+ try {
+ for (Dump dump : dumps) {
+ dump.terminate();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("AggregationCache close failed: " + e.getMessage());
+ }
+ }
+
private MeasureAggregator[] newAggregators() {
return info.codeSystem.newMetricsAggregators(metrics, metricsAggrFuncs);
}
@@ -185,9 +263,11 @@ public class GTAggregateScanner implements IGTScanner {
}
public Iterator<GTRecord> iterator() {
+ final DumpMerger merger = new DumpMerger(dumps);
+
return new Iterator<GTRecord>() {
- final Iterator<Entry<byte[], MeasureAggregator[]>> it = aggBufMap.entrySet().iterator();
+ final Iterator<Entry<byte[], MeasureAggregator[]>> it = merger.iterator();
final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics));
final GTRecord secondRecord = new GTRecord(info);
@@ -227,29 +307,157 @@ public class GTAggregateScanner implements IGTScanner {
}
};
}
- }
- public static long estimateSizeOfAggrCache(byte[] keySample, MeasureAggregator<?>[] aggrSample, int size) {
- // Aggregation cache is basically a tree map. The tree map entry overhead is
- // - 40 according to http://java-performance.info/memory-consumption-of-java-data-types-2/
- // - 41~52 according to AggregationCacheMemSizeTest
- return (estimateSizeOf(keySample) + estimateSizeOf(aggrSample) + 64) * size;
- }
+ class Dump implements Iterable<Entry<byte[], MeasureAggregator[]>> {
+ File dumpedFile;
+ Input input;
+ SortedMap<byte[], MeasureAggregator[]> buffMap;
- public static long estimateSizeOf(MeasureAggregator[] aggrs) {
- // size of array, AggregationCacheMemSizeTest reports 4 for [0], 12 for [1], 12 for [2], 20 for [3] etc..
- // Memory alignment to 8 bytes
- long est = (aggrs.length + 1) / 2 * 8 + 4 + (4 /* extra */);
- for (MeasureAggregator aggr : aggrs) {
- if (aggr != null)
- est += aggr.getMemBytesEstimate();
+ public Dump(SortedMap<byte[], MeasureAggregator[]> buffMap) throws IOException {
+ this.buffMap = buffMap;
+ }
+
+ @Override
+ public Iterator<Entry<byte[], MeasureAggregator[]>> iterator() {
+ try {
+ if (dumpedFile == null || !dumpedFile.exists()) {
+ throw new RuntimeException("Dumped file cannot be found at: " + (dumpedFile == null ? "<null>" : dumpedFile.getAbsolutePath()));
+ }
+
+ input = new Input(new FileInputStream(dumpedFile));
+
+ final int count = kryo.readObject(input, Integer.class);
+ return new Iterator<Entry<byte[], MeasureAggregator[]>>() {
+ int cursorIdx = 0;
+
+ @Override
+ public boolean hasNext() {
+ return cursorIdx < count;
+ }
+
+ @Override
+ public Entry<byte[], MeasureAggregator[]> next() {
+ try {
+ cursorIdx++;
+ return (ImmutablePair<byte[], MeasureAggregator[]>) kryo.readObject(input, ImmutablePair.class);
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot read AggregationCache from dumped file: " + e.getMessage());
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to read dumped file: " + e.getMessage());
+ }
+ }
+
+ public void flush() throws IOException {
+ if (buffMap != null) {
+ Output output = null;
+ try {
+ dumpedFile = File.createTempFile("KYLIN_AGGR_", ".tmp");
+
+ logger.info("AggregationCache will dump to file: " + dumpedFile.getAbsolutePath());
+ output = new Output(new FileOutputStream(dumpedFile));
+ kryo.writeObject(output, buffMap.size());
+ for (Entry<byte[], MeasureAggregator[]> entry : buffMap.entrySet()) {
+ kryo.writeObject(output, new ImmutablePair(entry.getKey(), entry.getValue()));
+ }
+ } finally {
+ buffMap = null;
+ if (output != null)
+ output.close();
+ }
+ }
+ }
+
+ public void terminate() throws IOException {
+ buffMap = null;
+ if (input != null)
+ input.close();
+ if (dumpedFile != null && dumpedFile.exists())
+ dumpedFile.delete();
+ }
}
- return est;
- }
- public static long estimateSizeOf(byte[] bytes) {
- // AggregationCacheMemSizeTest reports 20 for byte[10] and 20 again for byte[16]
- // Memory alignment to 8 bytes
- return (bytes.length + 7) / 8 * 8 + 4 + (4 /* extra */);
+ class DumpMerger implements Iterable<Entry<byte[], MeasureAggregator[]>> {
+ final PriorityQueue<Entry<byte[], Integer>> minHeap;
+ final List<Iterator<Entry<byte[], MeasureAggregator[]>>> dumpIterators;
+ final List<MeasureAggregator[]> dumpCurrentValues;
+
+ public DumpMerger(List<Dump> dumps) {
+ minHeap = new PriorityQueue<>(dumps.size(), new Comparator<Entry<byte[], Integer>>() {
+ @Override
+ public int compare(Entry<byte[], Integer> o1, Entry<byte[], Integer> o2) {
+ return bytesComparator.compare(o1.getKey(), o2.getKey());
+ }
+ });
+ dumpIterators = Lists.newArrayListWithCapacity(dumps.size());
+ dumpCurrentValues = Lists.newArrayListWithCapacity(dumps.size());
+
+ Iterator<Entry<byte[], MeasureAggregator[]>> it;
+ for (int i = 0; i < dumps.size(); i++) {
+ it = dumps.get(i).iterator();
+ if (it.hasNext()) {
+ dumpIterators.add(i, it);
+ Entry<byte[], MeasureAggregator[]> entry = it.next();
+ minHeap.offer(new ImmutablePair(entry.getKey(), i));
+ dumpCurrentValues.add(i, entry.getValue());
+ } else {
+ dumpIterators.add(i, null);
+ dumpCurrentValues.add(i, null);
+ }
+ }
+ }
+
+ private void enqueueFromDump(int index) {
+ if (dumpIterators.get(index) != null && dumpIterators.get(index).hasNext()) {
+ Entry<byte[], MeasureAggregator[]> entry = dumpIterators.get(index).next();
+ minHeap.offer(new ImmutablePair(entry.getKey(), index));
+ dumpCurrentValues.set(index, entry.getValue());
+ }
+ }
+
+ @Override
+ public Iterator<Entry<byte[], MeasureAggregator[]>> iterator() {
+ return new Iterator<Entry<byte[], MeasureAggregator[]>>() {
+ @Override
+ public boolean hasNext() {
+ return !CollectionUtils.isEmpty(minHeap);
+ }
+
+ @Override
+ public Entry<byte[], MeasureAggregator[]> next() {
+ // Use minimum heap to merge sort the keys,
+ // also do aggregation for measures with same keys in different dumps
+ Entry<byte[], Integer> peekEntry = minHeap.poll();
+ MeasureAggregator[] mergedAggr = dumpCurrentValues.get(peekEntry.getValue());
+ enqueueFromDump(peekEntry.getValue());
+
+ while (!minHeap.isEmpty() && bytesComparator.compare(peekEntry.getKey(), minHeap.peek().getKey()) == 0) {
+ Entry<byte[], Integer> newPeek = minHeap.poll();
+
+ MeasureAggregator[] newPeekAggr = dumpCurrentValues.get(newPeek.getValue());
+ for (int i = 0; i < newPeekAggr.length; i++) {
+ mergedAggr[i].aggregate(newPeekAggr[i].getState());
+ }
+
+ enqueueFromDump(newPeek.getValue());
+ }
+
+ return new ImmutablePair(peekEntry.getKey(), mergedAggr);
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/cd30e48e/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java b/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
index 1a9f637..aea627a 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.gridtable.GTInfo.Builder;
@@ -44,6 +45,23 @@ public class UnitTestSupport {
return info;
}
+ public static GTInfo hllInfo() {
+ Builder builder = GTInfo.builder();
+ builder.setCodeSystem(new GTSampleCodeSystem());
+ builder.setColumns( //
+ DataType.getInstance("varchar(10)"), //
+ DataType.getInstance("varchar(10)"), //
+ DataType.getInstance("varchar(10)"), //
+ DataType.getInstance("bigint"), //
+ DataType.getInstance("decimal"), //
+ DataType.getInstance("hllc14") //
+ );
+ builder.setPrimaryKey(setOf(0));
+ builder.setColumnPreferIndex(setOf(0));
+ GTInfo info = builder.build();
+ return info;
+ }
+
private static Builder infoBuilder() {
Builder builder = GTInfo.builder();
builder.setCodeSystem(new GTSampleCodeSystem());
@@ -81,15 +99,37 @@ public class UnitTestSupport {
return result;
}
+ public static List<GTRecord> mockupHllData(GTInfo info, int nRows) {
+ List<GTRecord> result = new ArrayList<GTRecord>(nRows);
+ int round = nRows / 10;
+ for (int i = 0; i < round; i++) {
+ String d_01_14 = datePlus("2015-01-14", i * 4);
+ String d_01_15 = datePlus("2015-01-15", i * 4);
+ String d_01_16 = datePlus("2015-01-16", i * 4);
+ String d_01_17 = datePlus("2015-01-17", i * 4);
+ result.add(newRec(info, d_01_14, "Yang", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+ result.add(newRec(info, d_01_14, "Luke", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+ result.add(newRec(info, d_01_15, "Xu", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+ result.add(newRec(info, d_01_15, "Dong", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+ result.add(newRec(info, d_01_15, "Jason", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+ result.add(newRec(info, d_01_16, "Mahone", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+ result.add(newRec(info, d_01_16, "Shaofeng", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+ result.add(newRec(info, d_01_16, "Qianhao", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+ result.add(newRec(info, d_01_16, "George", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+ result.add(newRec(info, d_01_17, "Kejia", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+ }
+ return result;
+ }
+
private static String datePlus(String date, int plusDays) {
long millis = DateFormat.stringToMillis(date);
millis += (1000L * 3600L * 24L) * plusDays;
return DateFormat.formatToDateStr(millis);
}
- private static GTRecord newRec(GTInfo info, String date, String name, String category, LongMutable amount, BigDecimal price) {
+ private static GTRecord newRec(GTInfo info, Object... values) {
GTRecord rec = new GTRecord(info);
- return rec.setValues(date, name, category, amount, price);
+ return rec.setValues(values);
}
private static ImmutableBitSet setOf(int... values) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/cd30e48e/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
new file mode 100644
index 0000000..606b591
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Created by dongli on 12/16/15.
+ */
+public class AggregationCacheSpillTest {
+
+ final GTInfo info = UnitTestSupport.hllInfo();
+ final List<GTRecord> data = UnitTestSupport.mockupHllData(info, 40000); // converts to about 34 MB data
+
+ @BeforeClass
+ public static void beforeClass() {
+ System.setProperty("log4j.configuration", "kylin-log4j.properties");
+ }
+
+ @Test
+ public void testAggregationCacheSpill() {
+ final List<GTRecord> testData = Lists.newArrayListWithCapacity(data.size() * 2);
+ testData.addAll(data);
+ testData.addAll(data);
+
+ IGTScanner inputScanner = new IGTScanner() {
+ @Override
+ public GTInfo getInfo() {
+ return info;
+ }
+
+ @Override
+ public int getScannedRowCount() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public Iterator<GTRecord> iterator() {
+ return testData.iterator();
+ }
+ };
+
+ GTScanRequest scanRequest = new GTScanRequest(info, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(0, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null, true);
+
+ GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest, false);
+
+ int count = 0;
+ for (GTRecord record : scanner) {
+ count++;
+ }
+ assertEquals(data.size(), count);
+ }
+}
\ No newline at end of file