You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/12/27 07:37:28 UTC

[1/3] kylin git commit: KYLIN-1233 Spill to disk when AggregationCache need too much memory

Repository: kylin
Updated Branches:
  refs/heads/2.x-staging 35a5d87af -> e58326999


KYLIN-1233 Spill to disk when AggregationCache need too much memory


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/eaed4f6b
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/eaed4f6b
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/eaed4f6b

Branch: refs/heads/2.x-staging
Commit: eaed4f6bdf63385590b02360e0f5834478088db2
Parents: 35a5d87
Author: lidongsjtu <do...@ebay.com>
Authored: Sun Dec 27 11:05:11 2015 +0800
Committer: lidongsjtu <do...@ebay.com>
Committed: Sun Dec 27 11:05:11 2015 +0800

----------------------------------------------------------------------
 .../common/hll/HyperLogLogPlusCounter.java      |   2 +-
 .../java/org/apache/kylin/common/util/Pair.java |  46 ++-
 .../cube/inmemcubing/InMemCubeBuilder.java      |   2 +-
 .../org/apache/kylin/cube/util/KryoUtils.java   |   2 +-
 .../kylin/gridtable/GTAggregateScanner.java     | 406 +++++++++++++++----
 .../apache/kylin/gridtable/GTScanRequest.java   |  17 +-
 .../coprocessor/endpoint/CubeVisitService.java  |  10 +-
 7 files changed, 377 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/eaed4f6b/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java b/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
index ef91509..11ae78f 100644
--- a/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
@@ -42,7 +42,7 @@ public class HyperLogLogPlusCounter implements Serializable, Comparable<HyperLog
 
     private final int p;
     private final int m;
-    private final HashFunction hashFunc;
+    private transient final HashFunction hashFunc;
     byte[] registers;
 
     public HyperLogLogPlusCounter() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/eaed4f6b/core-common/src/main/java/org/apache/kylin/common/util/Pair.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Pair.java b/core-common/src/main/java/org/apache/kylin/common/util/Pair.java
index d28b05f..9e4e9ee 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/Pair.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/Pair.java
@@ -28,7 +28,7 @@ import java.io.Serializable;
  */
 public class Pair<T1, T2> implements Serializable {
     private static final long serialVersionUID = -3986244606585552569L;
-    
+
     protected T1 first = null;
     protected T2 second = null;
 
@@ -60,6 +60,18 @@ public class Pair<T1, T2> implements Serializable {
         return new Pair<T1, T2>(a, b);
     }
 
+    private static boolean equals(Object x, Object y) {
+        return (x == null && y == null) || (x != null && x.equals(y));
+    }
+
+    /**
+     * Return the first element stored in the pair.
+     * @return T1
+     */
+    public T1 getFirst() {
+        return first;
+    }
+
     /**
      * Replace the first element of the pair.
      * @param a operand
@@ -68,20 +80,12 @@ public class Pair<T1, T2> implements Serializable {
         this.first = a;
     }
 
-    /**
-     * Replace the second element of the pair.
-     * @param b operand
-     */
-    public void setSecond(T2 b) {
-        this.second = b;
+    public T1 getKey() {
+        return getFirst();
     }
 
-    /**
-     * Return the first element stored in the pair.
-     * @return T1
-     */
-    public T1 getFirst() {
-        return first;
+    public void setKey(T1 a) {
+        setFirst(a);
     }
 
     /**
@@ -92,8 +96,20 @@ public class Pair<T1, T2> implements Serializable {
         return second;
     }
 
-    private static boolean equals(Object x, Object y) {
-        return (x == null && y == null) || (x != null && x.equals(y));
+    /**
+     * Replace the second element of the pair.
+     * @param b operand
+     */
+    public void setSecond(T2 b) {
+        this.second = b;
+    }
+
+    public T2 getValue() {
+        return getSecond();
+    }
+
+    public void setValue(T2 b) {
+        setSecond(b);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/eaed4f6b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index 575583f..4bad818 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -330,7 +330,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
 
         Pair<ImmutableBitSet, ImmutableBitSet> dimensionMetricsBitSet = InMemCubeBuilderUtils.getDimensionAndMetricColumnBitSet(baseCuboidId, measureCount);
         GTScanRequest req = new GTScanRequest(baseCuboid.getInfo(), null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null);
-        GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req, true);
+        GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req);
         aggregationScanner.trackMemoryLevel(baseCuboidMemTracker);
 
         int count = 0;

http://git-wip-us.apache.org/repos/asf/kylin/blob/eaed4f6b/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java
index 9dbe0d2..48f925a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java
@@ -48,7 +48,7 @@ public class KryoUtils {
         return deserialize(bytes, clazz);
     }
 
-    private static Kryo getKryo() {
+    public static Kryo getKryo() {
         if (_Kryo.get() == null) {
             Kryo kryo = new Kryo();
             kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));

http://git-wip-us.apache.org/repos/asf/kylin/blob/eaed4f6b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index 658a08f..a760b92 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -1,26 +1,37 @@
 package org.apache.kylin.gridtable;
 
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Comparator;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map.Entry;
+import java.util.PriorityQueue;
 import java.util.SortedMap;
 
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.MemoryBudgetController;
 import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.util.KryoUtils;
 import org.apache.kylin.measure.MeasureAggregator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 @SuppressWarnings({ "rawtypes", "unchecked" })
 public class GTAggregateScanner implements IGTScanner {
 
-    @SuppressWarnings("unused")
     private static final Logger logger = LoggerFactory.getLogger(GTAggregateScanner.class);
 
     final GTInfo info;
@@ -30,13 +41,13 @@ public class GTAggregateScanner implements IGTScanner {
     final String[] metricsAggrFuncs;
     final IGTScanner inputScanner;
     final AggregationCache aggrCache;
-    final boolean enableMemCheck;
+    final long spillThreshold;
 
     private int aggregatedRowCount = 0;
     private MemoryWaterLevel memTracker;
 
-    public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req, boolean enableMemCheck) {
-        if (req.hasAggregation() == false)
+    public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req) {
+        if (!req.hasAggregation())
             throw new IllegalStateException();
 
         this.info = inputScanner.getInfo();
@@ -46,7 +57,31 @@ public class GTAggregateScanner implements IGTScanner {
         this.metricsAggrFuncs = req.getAggrMetricsFuncs();
         this.inputScanner = inputScanner;
         this.aggrCache = new AggregationCache();
-        this.enableMemCheck = enableMemCheck;
+        this.spillThreshold = (long) (req.getAggrCacheGB() * MemoryBudgetController.ONE_GB);
+    }
+
+    public static long estimateSizeOfAggrCache(byte[] keySample, MeasureAggregator<?>[] aggrSample, int size) {
+        // Aggregation cache is basically a tree map. The tree map entry overhead is
+        // - 40 according to http://java-performance.info/memory-consumption-of-java-data-types-2/
+        // - 41~52 according to AggregationCacheMemSizeTest
+        return (estimateSizeOf(keySample) + estimateSizeOf(aggrSample) + 64) * size;
+    }
+
+    public static long estimateSizeOf(MeasureAggregator[] aggrs) {
+        // size of array, AggregationCacheMemSizeTest reports 4 for [0], 12 for [1], 12 for [2], 20 for [3] etc..
+        // Memory alignment to 8 bytes
+        long est = (aggrs.length + 1) / 2 * 8 + 4 + (4 /* extra */);
+        for (MeasureAggregator aggr : aggrs) {
+            if (aggr != null)
+                est += aggr.getMemBytesEstimate();
+        }
+        return est;
+    }
+
+    public static long estimateSizeOf(byte[] bytes) {
+        // AggregationCacheMemSizeTest reports 20 for byte[10] and 20 again for byte[16]
+        // Memory alignment to 8 bytes
+        return (bytes.length + 7) / 8 * 8 + 4 + (4 /* extra */);
     }
 
     public void trackMemoryLevel(MemoryWaterLevel tracker) {
@@ -66,6 +101,7 @@ public class GTAggregateScanner implements IGTScanner {
     @Override
     public void close() throws IOException {
         inputScanner.close();
+        aggrCache.close();
     }
 
     @Override
@@ -76,40 +112,51 @@ public class GTAggregateScanner implements IGTScanner {
         return aggrCache.iterator();
     }
 
+    public int getNumOfSpills() {
+        return aggrCache.dumps.size();
+    }
+
     /** return the estimate memory size of aggregation cache */
     public long getEstimateSizeOfAggrCache() {
         return aggrCache.estimatedMemSize();
     }
 
-    class AggregationCache {
-        final SortedMap<byte[], MeasureAggregator[]> aggBufMap;
+    class AggregationCache implements Closeable {
+        final List<Dump> dumps;
         final int keyLength;
         final boolean[] compareMask;
 
-        public AggregationCache() {
-            compareMask = createCompareMask();
-            keyLength = compareMask.length;
-            aggBufMap = Maps.newTreeMap(new Comparator<byte[]>() {
-                @Override
-                public int compare(byte[] o1, byte[] o2) {
-                    int result = 0;
-                    // profiler shows this check is slow
-                    // Preconditions.checkArgument(keyLength == o1.length && keyLength == o2.length);
-                    for (int i = 0; i < keyLength; ++i) {
-                        if (compareMask[i]) {
-                            int a = (o1[i] & 0xff);
-                            int b = (o2[i] & 0xff);
-                            result = a - b;
-                            if (result == 0) {
-                                continue;
-                            } else {
-                                return result;
-                            }
+        final Kryo kryo = KryoUtils.getKryo();
+
+        final Comparator<byte[]> bytesComparator = new Comparator<byte[]>() {
+            @Override
+            public int compare(byte[] o1, byte[] o2) {
+                int result = 0;
+                // profiler shows this check is slow
+                // Preconditions.checkArgument(keyLength == o1.length && keyLength == o2.length);
+                for (int i = 0; i < keyLength; ++i) {
+                    if (compareMask[i]) {
+                        int a = (o1[i] & 0xff);
+                        int b = (o2[i] & 0xff);
+                        result = a - b;
+                        if (result == 0) {
+                            continue;
+                        } else {
+                            return result;
                         }
                     }
-                    return result;
                 }
-            });
+                return result;
+            }
+        };
+
+        SortedMap<byte[], MeasureAggregator[]> aggBufMap;
+
+        public AggregationCache() {
+            compareMask = createCompareMask();
+            keyLength = compareMask.length;
+            dumps = Lists.newArrayList();
+            aggBufMap = createBuffMap();
         }
 
         private boolean[] createCompareMask() {
@@ -133,6 +180,10 @@ public class GTAggregateScanner implements IGTScanner {
             return mask;
         }
 
+        private SortedMap<byte[], MeasureAggregator[]> createBuffMap() {
+            return Maps.newTreeMap(bytesComparator);
+        }
+
         private byte[] createKey(GTRecord record) {
             byte[] result = new byte[keyLength];
             int offset = 0;
@@ -148,13 +199,15 @@ public class GTAggregateScanner implements IGTScanner {
         }
 
         void aggregate(GTRecord r) {
-            if (enableMemCheck && (++aggregatedRowCount % 1000 == 0)) {
+            if (++aggregatedRowCount % 1000 == 0) {
                 if (memTracker != null) {
                     memTracker.markHigh();
                 }
-                long estimated = estimatedMemSize();
-                if (estimated > 10 * MemoryBudgetController.ONE_GB) {
-                    throw new RuntimeException("AggregationCache exceed 10GB, estimated size is: " + estimated);
+                if (spillThreshold > 0) {
+                    // spill to disk when aggBufMap used too large memory
+                    if (estimatedMemSize() > spillThreshold) {
+                        spillBuffMap();
+                    }
                 }
             }
 
@@ -171,6 +224,31 @@ public class GTAggregateScanner implements IGTScanner {
             }
         }
 
+        private void spillBuffMap() throws RuntimeException {
+            if (aggBufMap.isEmpty())
+                return;
+
+            try {
+                Dump dump = new Dump(aggBufMap);
+                dump.flush();
+                dumps.add(dump);
+                aggBufMap = createBuffMap();
+            } catch (Exception e) {
+                throw new RuntimeException("AggregationCache spill failed: " + e.getMessage());
+            }
+        }
+
+        @Override
+        public void close() throws RuntimeException {
+            try {
+                for (Dump dump : dumps) {
+                    dump.terminate();
+                }
+            } catch (Exception e) {
+                throw new RuntimeException("AggregationCache close failed: " + e.getMessage());
+            }
+        }
+
         private MeasureAggregator[] newAggregators() {
             return info.codeSystem.newMetricsAggregators(metrics, metricsAggrFuncs);
         }
@@ -185,71 +263,233 @@ public class GTAggregateScanner implements IGTScanner {
         }
 
         public Iterator<GTRecord> iterator() {
-            return new Iterator<GTRecord>() {
+            // the all-in-mem case
+            if (dumps.isEmpty()) {
+                return new Iterator<GTRecord>() {
+
+                    final Iterator<Entry<byte[], MeasureAggregator[]>> it = aggBufMap.entrySet().iterator();
+                    final ReturningRecord returningRecord = new ReturningRecord();
+
+                    @Override
+                    public boolean hasNext() {
+                        return it.hasNext();
+                    }
 
-                final Iterator<Entry<byte[], MeasureAggregator[]>> it = aggBufMap.entrySet().iterator();
+                    @Override
+                    public GTRecord next() {
+                        Entry<byte[], MeasureAggregator[]> entry = it.next();
+                        returningRecord.load(entry.getKey(), entry.getValue());
+                        return returningRecord.record;
+                    }
 
-                final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics));
-                final GTRecord secondRecord = new GTRecord(info);
+                    @Override
+                    public void remove() {
+                        throw new UnsupportedOperationException();
+                    }
+                };
+            }
+            // the spill case
+            else {
+                logger.info("Last spill, current AggregationCache memory estimated size is: " + getEstimateSizeOfAggrCache());
+                this.spillBuffMap();
+
+                return new Iterator<GTRecord>() {
+                    final DumpMerger merger = new DumpMerger(dumps);
+                    final Iterator<Pair<byte[], MeasureAggregator[]>> it = merger.iterator();
+                    final ReturningRecord returningRecord = new ReturningRecord();
+
+                    @Override
+                    public boolean hasNext() {
+                        return it.hasNext();
+                    }
 
-                @Override
-                public boolean hasNext() {
-                    return it.hasNext();
+                    @Override
+                    public GTRecord next() {
+                        Pair<byte[], MeasureAggregator[]> entry = it.next();
+                        returningRecord.load(entry.getKey(), entry.getValue());
+                        return returningRecord.record;
+                    }
+
+                    @Override
+                    public void remove() {
+                        throw new UnsupportedOperationException();
+                    }
+                };
+            }
+        }
+
+        class ReturningRecord {
+            final GTRecord record = new GTRecord(info);
+            final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics));
+
+            void load(byte[] key, MeasureAggregator[] value) {
+                int offset = 0;
+                for (int i = 0; i < dimensions.trueBitCount(); i++) {
+                    int c = dimensions.trueBitAt(i);
+                    final int columnLength = info.codeSystem.maxCodeLength(c);
+                    record.cols[c].set(key, offset, columnLength);
+                    offset += columnLength;
+                }
+                metricsBuf.clear();
+                for (int i = 0; i < value.length; i++) {
+                    int col = metrics.trueBitAt(i);
+                    int pos = metricsBuf.position();
+                    info.codeSystem.encodeColumnValue(col, value[i].getState(), metricsBuf);
+                    record.cols[col].set(metricsBuf.array(), pos, metricsBuf.position() - pos);
                 }
+            }
+        }
+
+        class Dump implements Iterable<Pair<byte[], MeasureAggregator[]>> {
+            File dumpedFile;
+            Input input;
+            SortedMap<byte[], MeasureAggregator[]> buffMap;
+
+            public Dump(SortedMap<byte[], MeasureAggregator[]> buffMap) throws IOException {
+                this.buffMap = buffMap;
+            }
+
+            @Override
+            public Iterator<Pair<byte[], MeasureAggregator[]>> iterator() {
+                try {
+                    if (dumpedFile == null || !dumpedFile.exists()) {
+                        throw new RuntimeException("Dumped file cannot be found at: " + (dumpedFile == null ? "<null>" : dumpedFile.getAbsolutePath()));
+                    }
+
+                    input = new Input(new FileInputStream(dumpedFile));
+
+                    final int count = kryo.readObject(input, Integer.class);
+                    return new Iterator<Pair<byte[], MeasureAggregator[]>>() {
+                        int cursorIdx = 0;
+
+                        @Override
+                        public boolean hasNext() {
+                            return cursorIdx < count;
+                        }
+
+                        @Override
+                        public Pair<byte[], MeasureAggregator[]> next() {
+                            try {
+                                cursorIdx++;
+                                return (Pair<byte[], MeasureAggregator[]>) kryo.readObject(input, Pair.class);
+                            } catch (Exception e) {
+                                throw new RuntimeException("Cannot read AggregationCache from dumped file: " + e.getMessage());
+                            }
+                        }
+
+                        @Override
+                        public void remove() {
+                            throw new UnsupportedOperationException();
+                        }
+                    };
+                } catch (Exception e) {
+                    throw new RuntimeException("Failed to read dumped file: " + e.getMessage());
+                }
+            }
 
-                @Override
-                public GTRecord next() {
-                    Entry<byte[], MeasureAggregator[]> entry = it.next();
-                    create(entry.getKey(), entry.getValue());
-                    return secondRecord;
+            public void flush() throws IOException {
+                if (buffMap != null) {
+                    Output output = null;
+                    try {
+                        dumpedFile = File.createTempFile("KYLIN_AGGR_", ".tmp");
+
+                        logger.info("AggregationCache will dump to file: " + dumpedFile.getAbsolutePath());
+                        output = new Output(new FileOutputStream(dumpedFile));
+                        kryo.writeObject(output, buffMap.size());
+                        for (Entry<byte[], MeasureAggregator[]> entry : buffMap.entrySet()) {
+                            kryo.writeObject(output, new Pair(entry.getKey(), entry.getValue()));
+                        }
+                    } finally {
+                        buffMap = null;
+                        if (output != null)
+                            output.close();
+                    }
                 }
+            }
 
-                private void create(byte[] key, MeasureAggregator[] value) {
-                    int offset = 0;
-                    for (int i = 0; i < dimensions.trueBitCount(); i++) {
-                        int c = dimensions.trueBitAt(i);
-                        final int columnLength = info.codeSystem.maxCodeLength(c);
-                        secondRecord.set(c, new ByteArray(key, offset, columnLength));
-                        offset += columnLength;
+            public void terminate() throws IOException {
+                buffMap = null;
+                if (input != null)
+                    input.close();
+                if (dumpedFile != null && dumpedFile.exists())
+                    dumpedFile.delete();
+            }
+        }
+
+        class DumpMerger implements Iterable<Pair<byte[], MeasureAggregator[]>> {
+            final PriorityQueue<Pair<byte[], Integer>> minHeap;
+            final List<Iterator<Pair<byte[], MeasureAggregator[]>>> dumpIterators;
+            final List<MeasureAggregator[]> dumpCurrentValues;
+
+            public DumpMerger(List<Dump> dumps) {
+                minHeap = new PriorityQueue<>(dumps.size(), new Comparator<Pair<byte[], Integer>>() {
+                    @Override
+                    public int compare(Pair<byte[], Integer> o1, Pair<byte[], Integer> o2) {
+                        return bytesComparator.compare(o1.getFirst(), o2.getFirst());
                     }
-                    metricsBuf.clear();
-                    for (int i = 0; i < value.length; i++) {
-                        int col = metrics.trueBitAt(i);
-                        int pos = metricsBuf.position();
-                        info.codeSystem.encodeColumnValue(col, value[i].getState(), metricsBuf);
-                        secondRecord.cols[col].set(metricsBuf.array(), pos, metricsBuf.position() - pos);
+                });
+                dumpIterators = Lists.newArrayListWithCapacity(dumps.size());
+                dumpCurrentValues = Lists.newArrayListWithCapacity(dumps.size());
+
+                Iterator<Pair<byte[], MeasureAggregator[]>> it;
+                for (int i = 0; i < dumps.size(); i++) {
+                    it = dumps.get(i).iterator();
+                    if (it.hasNext()) {
+                        dumpIterators.add(i, it);
+                        Pair<byte[], MeasureAggregator[]> entry = it.next();
+                        minHeap.offer(new Pair(entry.getKey(), i));
+                        dumpCurrentValues.add(i, entry.getValue());
+                    } else {
+                        dumpIterators.add(i, null);
+                        dumpCurrentValues.add(i, null);
                     }
                 }
+            }
 
-                @Override
-                public void remove() {
-                    throw new UnsupportedOperationException();
+            private void enqueueFromDump(int index) {
+                if (dumpIterators.get(index) != null && dumpIterators.get(index).hasNext()) {
+                    Pair<byte[], MeasureAggregator[]> entry = dumpIterators.get(index).next();
+                    minHeap.offer(new Pair(entry.getKey(), index));
+                    dumpCurrentValues.set(index, entry.getValue());
                 }
-            };
-        }
-    }
+            }
 
-    public static long estimateSizeOfAggrCache(byte[] keySample, MeasureAggregator<?>[] aggrSample, int size) {
-        // Aggregation cache is basically a tree map. The tree map entry overhead is
-        // - 40 according to http://java-performance.info/memory-consumption-of-java-data-types-2/
-        // - 41~52 according to AggregationCacheMemSizeTest
-        return (estimateSizeOf(keySample) + estimateSizeOf(aggrSample) + 64) * size;
-    }
+            @Override
+            public Iterator<Pair<byte[], MeasureAggregator[]>> iterator() {
+                return new Iterator<Pair<byte[], MeasureAggregator[]>>() {
+                    @Override
+                    public boolean hasNext() {
+                        return !minHeap.isEmpty();
+                    }
 
-    public static long estimateSizeOf(MeasureAggregator[] aggrs) {
-        // size of array, AggregationCacheMemSizeTest reports 4 for [0], 12 for [1], 12 for [2], 20 for [3] etc..
-        // Memory alignment to 8 bytes
-        long est = (aggrs.length + 1) / 2 * 8 + 4 + (4 /* extra */);
-        for (MeasureAggregator aggr : aggrs) {
-            if (aggr != null)
-                est += aggr.getMemBytesEstimate();
-        }
-        return est;
-    }
+                    @Override
+                    public Pair<byte[], MeasureAggregator[]> next() {
+                        // Use minimum heap to merge sort the keys,
+                        // also do aggregation for measures with same keys in different dumps
+                        Pair<byte[], Integer> peekEntry = minHeap.poll();
+                        MeasureAggregator[] mergedAggr = dumpCurrentValues.get(peekEntry.getValue());
+                        enqueueFromDump(peekEntry.getValue());
 
-    public static long estimateSizeOf(byte[] bytes) {
-        // AggregationCacheMemSizeTest reports 20 for byte[10] and 20 again for byte[16]
-        // Memory alignment to 8 bytes
-        return (bytes.length + 7) / 8 * 8 + 4 + (4 /* extra */);
+                        while (!minHeap.isEmpty() && bytesComparator.compare(peekEntry.getKey(), minHeap.peek().getKey()) == 0) {
+                            Pair<byte[], Integer> newPeek = minHeap.poll();
+
+                            MeasureAggregator[] newPeekAggr = dumpCurrentValues.get(newPeek.getValue());
+                            for (int i = 0; i < newPeekAggr.length; i++) {
+                                mergedAggr[i].aggregate(newPeekAggr[i].getState());
+                            }
+
+                            enqueueFromDump(newPeek.getValue());
+                        }
+
+                        return new Pair(peekEntry.getKey(), mergedAggr);
+                    }
+
+                    @Override
+                    public void remove() {
+                        throw new UnsupportedOperationException();
+                    }
+                };
+            }
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/eaed4f6b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index 2c284c9..ac99d4e 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -29,6 +29,7 @@ public class GTScanRequest {
 
     // hint to storage behavior
     private boolean allowPreAggregation = true;
+    private double aggrCacheGB = 0; // no limit
 
     public GTScanRequest(GTInfo info) {
         this(info, null, null, null);
@@ -118,7 +119,7 @@ public class GTScanRequest {
     }
 
     public IGTScanner decorateScanner(IGTScanner scanner) throws IOException {
-        return decorateScanner(scanner, true, true, false);//by default do not check mem
+        return decorateScanner(scanner, true, true);//by default do not check mem
     }
 
     /**
@@ -127,7 +128,7 @@ public class GTScanRequest {
      * 
      * Refer to CoprocessorBehavior for explanation
      */
-    public IGTScanner decorateScanner(IGTScanner scanner, boolean doFilter, boolean doAggr, boolean doMemCheck) throws IOException {
+    public IGTScanner decorateScanner(IGTScanner scanner, boolean doFilter, boolean doAggr) throws IOException {
         IGTScanner result = scanner;
         if (!doFilter) { //Skip reading this section if you're not profiling! 
             lookAndForget(result);
@@ -144,13 +145,13 @@ public class GTScanRequest {
             }
 
             if (this.allowPreAggregation && this.hasAggregation()) {
-                result = new GTAggregateScanner(result, this, doMemCheck);
+                result = new GTAggregateScanner(result, this);
             }
             return result;
         }
     }
 
-    //touch every byte of the cell so that the cost of scanning will be trully reflected
+    //touch every byte of the cell so that the cost of scanning will be truly reflected
     private void lookAndForget(IGTScanner scanner) {
         byte meaninglessByte = 0;
         for (GTRecord gtRecord : scanner) {
@@ -215,6 +216,14 @@ public class GTScanRequest {
         return aggrMetricsFuncs;
     }
 
+    public double getAggrCacheGB() {
+        return aggrCacheGB;
+    }
+
+    public void setAggrCacheGB(double gb) {
+        this.aggrCacheGB = gb;
+    }
+
     @Override
     public String toString() {
         return "GTScanRequest [range=" + range + ", columns=" + columns + ", filterPushDown=" + filterPushDown + ", aggrGroupBy=" + aggrGroupBy + ", aggrMetrics=" + aggrMetrics + ", aggrMetricsFuncs=" + Arrays.toString(aggrMetricsFuncs) + "]";

http://git-wip-us.apache.org/repos/asf/kylin/blob/eaed4f6b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 3759738..6feed33 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -147,14 +147,18 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
             innerScanner = region.getScanner(scan);
             InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner);
 
+            CoprocessorBehavior behavior = CoprocessorBehavior.valueOf(request.getBehavior());
+            if (behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
+                if (scanReq.getAggrCacheGB() <= 0)
+                    scanReq.setAggrCacheGB(10); // 10 GB threshold, inherit from v1.0
+            }
+
             IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScan.hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize());
             IGTScanner rawScanner = store.scan(scanReq);
 
-            CoprocessorBehavior behavior = CoprocessorBehavior.valueOf(request.getBehavior());
             IGTScanner finalScanner = scanReq.decorateScanner(rawScanner,//
                     behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER.ordinal(),//
-                    behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal(),//
-                    behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal());//
+                    behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal());
 
             ByteBuffer buffer = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
 


[2/3] kylin git commit: KYLIN-1233 Add testcases for spill and in-mem of AggregationCache

Posted by li...@apache.org.
KYLIN-1233 Add testcases for spill and in-mem of AggregationCache


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a46a25dd
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a46a25dd
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a46a25dd

Branch: refs/heads/2.x-staging
Commit: a46a25dd096ad54add369e9067401229a797d11f
Parents: eaed4f6
Author: lidongsjtu <do...@ebay.com>
Authored: Sun Dec 27 14:34:06 2015 +0800
Committer: lidongsjtu <do...@ebay.com>
Committed: Sun Dec 27 14:34:06 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/gridtable/UnitTestSupport.java |  44 +++++-
 .../gridtable/AggregationCacheSpillTest.java    | 140 +++++++++++++++++++
 2 files changed, 182 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/a46a25dd/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java b/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
index ff71b4f..08187e9 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
 
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.gridtable.GTInfo.Builder;
@@ -44,6 +45,23 @@ public class UnitTestSupport {
         return info;
     }
 
+    public static GTInfo hllInfo() {
+        Builder builder = GTInfo.builder();
+        builder.setCodeSystem(new GTSampleCodeSystem());
+        builder.setColumns( //
+                DataType.getType("varchar(10)"), //
+                DataType.getType("varchar(10)"), //
+                DataType.getType("varchar(10)"), //
+                DataType.getType("bigint"), //
+                DataType.getType("decimal"), //
+                DataType.getType("hllc14") //
+        );
+        builder.setPrimaryKey(setOf(0));
+        builder.setColumnPreferIndex(setOf(0));
+        GTInfo info = builder.build();
+        return info;
+    }
+
     private static Builder infoBuilder() {
         Builder builder = GTInfo.builder();
         builder.setCodeSystem(new GTSampleCodeSystem());
@@ -81,15 +99,37 @@ public class UnitTestSupport {
         return result;
     }
 
+    public static List<GTRecord> mockupHllData(GTInfo info, int nRows) {
+        List<GTRecord> result = new ArrayList<GTRecord>(nRows);
+        int round = nRows / 10;
+        for (int i = 0; i < round; i++) {
+            String d_01_14 = datePlus("2015-01-14", i * 4);
+            String d_01_15 = datePlus("2015-01-15", i * 4);
+            String d_01_16 = datePlus("2015-01-16", i * 4);
+            String d_01_17 = datePlus("2015-01-17", i * 4);
+            result.add(newRec(info, d_01_14, "Yang", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+            result.add(newRec(info, d_01_14, "Luke", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+            result.add(newRec(info, d_01_15, "Xu", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+            result.add(newRec(info, d_01_15, "Dong", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+            result.add(newRec(info, d_01_15, "Jason", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+            result.add(newRec(info, d_01_16, "Mahone", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+            result.add(newRec(info, d_01_16, "Shaofeng", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+            result.add(newRec(info, d_01_16, "Qianhao", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+            result.add(newRec(info, d_01_16, "George", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+            result.add(newRec(info, d_01_17, "Kejia", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+        }
+        return result;
+    }
+
     private static String datePlus(String date, int plusDays) {
         long millis = DateFormat.stringToMillis(date);
         millis += (1000L * 3600L * 24L) * plusDays;
         return DateFormat.formatToDateStr(millis);
     }
 
-    private static GTRecord newRec(GTInfo info, String date, String name, String category, LongMutable amount, BigDecimal price) {
+    private static GTRecord newRec(GTInfo info, Object... values) {
         GTRecord rec = new GTRecord(info);
-        return rec.setValues(date, name, category, amount, price);
+        return rec.setValues(values);
     }
 
     private static ImmutableBitSet setOf(int... values) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/a46a25dd/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
new file mode 100644
index 0000000..7fd5ce7
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.metadata.datatype.LongMutable;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Created by dongli on 12/16/15.
+ */
+public class AggregationCacheSpillTest {
+
+    final static int DATA_CARDINALITY = 40000;
+    final static int DATA_REPLICATION = 2;
+
+    final static GTInfo INFO = UnitTestSupport.hllInfo();
+    final static List<GTRecord> TEST_DATA = Lists.newArrayListWithCapacity(DATA_CARDINALITY * DATA_REPLICATION);;
+
+    @BeforeClass
+    public static void beforeClass() {
+        System.setProperty("log4j.configuration", "kylin-log4j.properties");
+
+        final List<GTRecord> data = UnitTestSupport.mockupHllData(INFO, DATA_CARDINALITY);
+        for (int i = 0; i < DATA_REPLICATION; i++)
+            TEST_DATA.addAll(data);
+    }
+
+    @Test
+    public void testAggregationCacheSpill() throws IOException {
+        IGTScanner inputScanner = new IGTScanner() {
+            @Override
+            public GTInfo getInfo() {
+                return INFO;
+            }
+
+            @Override
+            public int getScannedRowCount() {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public void close() throws IOException {
+            }
+
+            @Override
+            public Iterator<GTRecord> iterator() {
+                return TEST_DATA.iterator();
+            }
+        };
+
+        GTScanRequest scanRequest = new GTScanRequest(INFO, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(0, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null, true);
+        scanRequest.setAggrCacheGB(0.5); // 500 MB
+
+        GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest);
+
+        int count = 0;
+        for (GTRecord record : scanner) {
+            assertNotNull(record);
+            Object[] returnRecord = record.getValues();
+            assertEquals(20, ((LongMutable) returnRecord[3]).get());
+            assertEquals(21, ((BigDecimal) returnRecord[4]).longValue());
+            count++;
+
+            System.out.println(record);
+        }
+        assertEquals(DATA_CARDINALITY, count);
+        scanner.close();
+    }
+
+    @Test
+    public void testAggregationCacheInMem() throws IOException {
+        IGTScanner inputScanner = new IGTScanner() {
+            @Override
+            public GTInfo getInfo() {
+                return INFO;
+            }
+
+            @Override
+            public int getScannedRowCount() {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public void close() throws IOException {
+            }
+
+            @Override
+            public Iterator<GTRecord> iterator() {
+                return TEST_DATA.iterator();
+            }
+        };
+
+        // all-in-mem testcase
+        GTScanRequest scanRequest = new GTScanRequest(INFO, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(1, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null, true);
+        scanRequest.setAggrCacheGB(0.5); // 500 MB
+
+        GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest);
+
+        int count = 0;
+        for (GTRecord record : scanner) {
+            assertNotNull(record);
+            Object[] returnRecord = record.getValues();
+            assertEquals(80000, ((LongMutable) returnRecord[3]).get());
+            assertEquals(84000, ((BigDecimal) returnRecord[4]).longValue());
+            count++;
+
+            System.out.println(record);
+        }
+        assertEquals(10, count);
+        scanner.close();
+    }
+}
\ No newline at end of file


[3/3] kylin git commit: minor, fix typo

Posted by li...@apache.org.
minor, fix typo


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e5832699
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e5832699
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e5832699

Branch: refs/heads/2.x-staging
Commit: e58326999711bef09a823026038ec3adf0acf2ad
Parents: a46a25d
Author: lidongsjtu <do...@ebay.com>
Authored: Sun Dec 27 14:35:10 2015 +0800
Committer: lidongsjtu <do...@ebay.com>
Committed: Sun Dec 27 14:35:10 2015 +0800

----------------------------------------------------------------------
 webapp/app/partials/admin/admin.html | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/e5832699/webapp/app/partials/admin/admin.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/admin/admin.html b/webapp/app/partials/admin/admin.html
index 3e063a6..0ed5ab5 100644
--- a/webapp/app/partials/admin/admin.html
+++ b/webapp/app/partials/admin/admin.html
@@ -61,7 +61,7 @@
         </div>
       <div style="padding-top: 10px;width: 180px;">
         <button class="btn btn-primary btn-lg btn-block" ng-click="enableCache()">
-          <label tooltip="Disable Query Cache" style="cursor: pointer;">Enable Cache</label>
+          <label tooltip="Enable Query Cache" style="cursor: pointer;">Enable Cache</label>
         </button>
       </div>
       <div style="padding-top: 10px;width: 180px;">