You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/12/27 07:37:28 UTC
[1/3] kylin git commit: KYLIN-1233 Spill to disk when
AggregationCache need too much memory
Repository: kylin
Updated Branches:
refs/heads/2.x-staging 35a5d87af -> e58326999
KYLIN-1233 Spill to disk when AggregationCache need too much memory
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/eaed4f6b
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/eaed4f6b
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/eaed4f6b
Branch: refs/heads/2.x-staging
Commit: eaed4f6bdf63385590b02360e0f5834478088db2
Parents: 35a5d87
Author: lidongsjtu <do...@ebay.com>
Authored: Sun Dec 27 11:05:11 2015 +0800
Committer: lidongsjtu <do...@ebay.com>
Committed: Sun Dec 27 11:05:11 2015 +0800
----------------------------------------------------------------------
.../common/hll/HyperLogLogPlusCounter.java | 2 +-
.../java/org/apache/kylin/common/util/Pair.java | 46 ++-
.../cube/inmemcubing/InMemCubeBuilder.java | 2 +-
.../org/apache/kylin/cube/util/KryoUtils.java | 2 +-
.../kylin/gridtable/GTAggregateScanner.java | 406 +++++++++++++++----
.../apache/kylin/gridtable/GTScanRequest.java | 17 +-
.../coprocessor/endpoint/CubeVisitService.java | 10 +-
7 files changed, 377 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/eaed4f6b/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java b/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
index ef91509..11ae78f 100644
--- a/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
@@ -42,7 +42,7 @@ public class HyperLogLogPlusCounter implements Serializable, Comparable<HyperLog
private final int p;
private final int m;
- private final HashFunction hashFunc;
+ private transient final HashFunction hashFunc;
byte[] registers;
public HyperLogLogPlusCounter() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/eaed4f6b/core-common/src/main/java/org/apache/kylin/common/util/Pair.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Pair.java b/core-common/src/main/java/org/apache/kylin/common/util/Pair.java
index d28b05f..9e4e9ee 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/Pair.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/Pair.java
@@ -28,7 +28,7 @@ import java.io.Serializable;
*/
public class Pair<T1, T2> implements Serializable {
private static final long serialVersionUID = -3986244606585552569L;
-
+
protected T1 first = null;
protected T2 second = null;
@@ -60,6 +60,18 @@ public class Pair<T1, T2> implements Serializable {
return new Pair<T1, T2>(a, b);
}
+ private static boolean equals(Object x, Object y) {
+ return (x == null && y == null) || (x != null && x.equals(y));
+ }
+
+ /**
+ * Return the first element stored in the pair.
+ * @return T1
+ */
+ public T1 getFirst() {
+ return first;
+ }
+
/**
* Replace the first element of the pair.
* @param a operand
@@ -68,20 +80,12 @@ public class Pair<T1, T2> implements Serializable {
this.first = a;
}
- /**
- * Replace the second element of the pair.
- * @param b operand
- */
- public void setSecond(T2 b) {
- this.second = b;
+ public T1 getKey() {
+ return getFirst();
}
- /**
- * Return the first element stored in the pair.
- * @return T1
- */
- public T1 getFirst() {
- return first;
+ public void setKey(T1 a) {
+ setFirst(a);
}
/**
@@ -92,8 +96,20 @@ public class Pair<T1, T2> implements Serializable {
return second;
}
- private static boolean equals(Object x, Object y) {
- return (x == null && y == null) || (x != null && x.equals(y));
+ /**
+ * Replace the second element of the pair.
+ * @param b operand
+ */
+ public void setSecond(T2 b) {
+ this.second = b;
+ }
+
+ public T2 getValue() {
+ return getSecond();
+ }
+
+ public void setValue(T2 b) {
+ setSecond(b);
}
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/eaed4f6b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index 575583f..4bad818 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -330,7 +330,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
Pair<ImmutableBitSet, ImmutableBitSet> dimensionMetricsBitSet = InMemCubeBuilderUtils.getDimensionAndMetricColumnBitSet(baseCuboidId, measureCount);
GTScanRequest req = new GTScanRequest(baseCuboid.getInfo(), null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null);
- GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req, true);
+ GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req);
aggregationScanner.trackMemoryLevel(baseCuboidMemTracker);
int count = 0;
http://git-wip-us.apache.org/repos/asf/kylin/blob/eaed4f6b/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java
index 9dbe0d2..48f925a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/util/KryoUtils.java
@@ -48,7 +48,7 @@ public class KryoUtils {
return deserialize(bytes, clazz);
}
- private static Kryo getKryo() {
+ public static Kryo getKryo() {
if (_Kryo.get() == null) {
Kryo kryo = new Kryo();
kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
http://git-wip-us.apache.org/repos/asf/kylin/blob/eaed4f6b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index 658a08f..a760b92 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -1,26 +1,37 @@
package org.apache.kylin.gridtable;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.Iterator;
+import java.util.List;
import java.util.Map.Entry;
+import java.util.PriorityQueue;
import java.util.SortedMap;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.MemoryBudgetController;
import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.util.KryoUtils;
import org.apache.kylin.measure.MeasureAggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@SuppressWarnings({ "rawtypes", "unchecked" })
public class GTAggregateScanner implements IGTScanner {
- @SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(GTAggregateScanner.class);
final GTInfo info;
@@ -30,13 +41,13 @@ public class GTAggregateScanner implements IGTScanner {
final String[] metricsAggrFuncs;
final IGTScanner inputScanner;
final AggregationCache aggrCache;
- final boolean enableMemCheck;
+ final long spillThreshold;
private int aggregatedRowCount = 0;
private MemoryWaterLevel memTracker;
- public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req, boolean enableMemCheck) {
- if (req.hasAggregation() == false)
+ public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req) {
+ if (!req.hasAggregation())
throw new IllegalStateException();
this.info = inputScanner.getInfo();
@@ -46,7 +57,31 @@ public class GTAggregateScanner implements IGTScanner {
this.metricsAggrFuncs = req.getAggrMetricsFuncs();
this.inputScanner = inputScanner;
this.aggrCache = new AggregationCache();
- this.enableMemCheck = enableMemCheck;
+ this.spillThreshold = (long) (req.getAggrCacheGB() * MemoryBudgetController.ONE_GB);
+ }
+
+ public static long estimateSizeOfAggrCache(byte[] keySample, MeasureAggregator<?>[] aggrSample, int size) {
+ // Aggregation cache is basically a tree map. The tree map entry overhead is
+ // - 40 according to http://java-performance.info/memory-consumption-of-java-data-types-2/
+ // - 41~52 according to AggregationCacheMemSizeTest
+ return (estimateSizeOf(keySample) + estimateSizeOf(aggrSample) + 64) * size;
+ }
+
+ public static long estimateSizeOf(MeasureAggregator[] aggrs) {
+ // size of array, AggregationCacheMemSizeTest reports 4 for [0], 12 for [1], 12 for [2], 20 for [3] etc..
+ // Memory alignment to 8 bytes
+ long est = (aggrs.length + 1) / 2 * 8 + 4 + (4 /* extra */);
+ for (MeasureAggregator aggr : aggrs) {
+ if (aggr != null)
+ est += aggr.getMemBytesEstimate();
+ }
+ return est;
+ }
+
+ public static long estimateSizeOf(byte[] bytes) {
+ // AggregationCacheMemSizeTest reports 20 for byte[10] and 20 again for byte[16]
+ // Memory alignment to 8 bytes
+ return (bytes.length + 7) / 8 * 8 + 4 + (4 /* extra */);
}
public void trackMemoryLevel(MemoryWaterLevel tracker) {
@@ -66,6 +101,7 @@ public class GTAggregateScanner implements IGTScanner {
@Override
public void close() throws IOException {
inputScanner.close();
+ aggrCache.close();
}
@Override
@@ -76,40 +112,51 @@ public class GTAggregateScanner implements IGTScanner {
return aggrCache.iterator();
}
+ public int getNumOfSpills() {
+ return aggrCache.dumps.size();
+ }
+
/** return the estimate memory size of aggregation cache */
public long getEstimateSizeOfAggrCache() {
return aggrCache.estimatedMemSize();
}
- class AggregationCache {
- final SortedMap<byte[], MeasureAggregator[]> aggBufMap;
+ class AggregationCache implements Closeable {
+ final List<Dump> dumps;
final int keyLength;
final boolean[] compareMask;
- public AggregationCache() {
- compareMask = createCompareMask();
- keyLength = compareMask.length;
- aggBufMap = Maps.newTreeMap(new Comparator<byte[]>() {
- @Override
- public int compare(byte[] o1, byte[] o2) {
- int result = 0;
- // profiler shows this check is slow
- // Preconditions.checkArgument(keyLength == o1.length && keyLength == o2.length);
- for (int i = 0; i < keyLength; ++i) {
- if (compareMask[i]) {
- int a = (o1[i] & 0xff);
- int b = (o2[i] & 0xff);
- result = a - b;
- if (result == 0) {
- continue;
- } else {
- return result;
- }
+ final Kryo kryo = KryoUtils.getKryo();
+
+ final Comparator<byte[]> bytesComparator = new Comparator<byte[]>() {
+ @Override
+ public int compare(byte[] o1, byte[] o2) {
+ int result = 0;
+ // profiler shows this check is slow
+ // Preconditions.checkArgument(keyLength == o1.length && keyLength == o2.length);
+ for (int i = 0; i < keyLength; ++i) {
+ if (compareMask[i]) {
+ int a = (o1[i] & 0xff);
+ int b = (o2[i] & 0xff);
+ result = a - b;
+ if (result == 0) {
+ continue;
+ } else {
+ return result;
}
}
- return result;
}
- });
+ return result;
+ }
+ };
+
+ SortedMap<byte[], MeasureAggregator[]> aggBufMap;
+
+ public AggregationCache() {
+ compareMask = createCompareMask();
+ keyLength = compareMask.length;
+ dumps = Lists.newArrayList();
+ aggBufMap = createBuffMap();
}
private boolean[] createCompareMask() {
@@ -133,6 +180,10 @@ public class GTAggregateScanner implements IGTScanner {
return mask;
}
+ private SortedMap<byte[], MeasureAggregator[]> createBuffMap() {
+ return Maps.newTreeMap(bytesComparator);
+ }
+
private byte[] createKey(GTRecord record) {
byte[] result = new byte[keyLength];
int offset = 0;
@@ -148,13 +199,15 @@ public class GTAggregateScanner implements IGTScanner {
}
void aggregate(GTRecord r) {
- if (enableMemCheck && (++aggregatedRowCount % 1000 == 0)) {
+ if (++aggregatedRowCount % 1000 == 0) {
if (memTracker != null) {
memTracker.markHigh();
}
- long estimated = estimatedMemSize();
- if (estimated > 10 * MemoryBudgetController.ONE_GB) {
- throw new RuntimeException("AggregationCache exceed 10GB, estimated size is: " + estimated);
+ if (spillThreshold > 0) {
+ // spill to disk when aggBufMap used too large memory
+ if (estimatedMemSize() > spillThreshold) {
+ spillBuffMap();
+ }
}
}
@@ -171,6 +224,31 @@ public class GTAggregateScanner implements IGTScanner {
}
}
+ private void spillBuffMap() throws RuntimeException {
+ if (aggBufMap.isEmpty())
+ return;
+
+ try {
+ Dump dump = new Dump(aggBufMap);
+ dump.flush();
+ dumps.add(dump);
+ aggBufMap = createBuffMap();
+ } catch (Exception e) {
+ throw new RuntimeException("AggregationCache spill failed: " + e.getMessage());
+ }
+ }
+
+ @Override
+ public void close() throws RuntimeException {
+ try {
+ for (Dump dump : dumps) {
+ dump.terminate();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("AggregationCache close failed: " + e.getMessage());
+ }
+ }
+
private MeasureAggregator[] newAggregators() {
return info.codeSystem.newMetricsAggregators(metrics, metricsAggrFuncs);
}
@@ -185,71 +263,233 @@ public class GTAggregateScanner implements IGTScanner {
}
public Iterator<GTRecord> iterator() {
- return new Iterator<GTRecord>() {
+ // the all-in-mem case
+ if (dumps.isEmpty()) {
+ return new Iterator<GTRecord>() {
+
+ final Iterator<Entry<byte[], MeasureAggregator[]>> it = aggBufMap.entrySet().iterator();
+ final ReturningRecord returningRecord = new ReturningRecord();
+
+ @Override
+ public boolean hasNext() {
+ return it.hasNext();
+ }
- final Iterator<Entry<byte[], MeasureAggregator[]>> it = aggBufMap.entrySet().iterator();
+ @Override
+ public GTRecord next() {
+ Entry<byte[], MeasureAggregator[]> entry = it.next();
+ returningRecord.load(entry.getKey(), entry.getValue());
+ return returningRecord.record;
+ }
- final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics));
- final GTRecord secondRecord = new GTRecord(info);
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ // the spill case
+ else {
+ logger.info("Last spill, current AggregationCache memory estimated size is: " + getEstimateSizeOfAggrCache());
+ this.spillBuffMap();
+
+ return new Iterator<GTRecord>() {
+ final DumpMerger merger = new DumpMerger(dumps);
+ final Iterator<Pair<byte[], MeasureAggregator[]>> it = merger.iterator();
+ final ReturningRecord returningRecord = new ReturningRecord();
+
+ @Override
+ public boolean hasNext() {
+ return it.hasNext();
+ }
- @Override
- public boolean hasNext() {
- return it.hasNext();
+ @Override
+ public GTRecord next() {
+ Pair<byte[], MeasureAggregator[]> entry = it.next();
+ returningRecord.load(entry.getKey(), entry.getValue());
+ return returningRecord.record;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ }
+
+ class ReturningRecord {
+ final GTRecord record = new GTRecord(info);
+ final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics));
+
+ void load(byte[] key, MeasureAggregator[] value) {
+ int offset = 0;
+ for (int i = 0; i < dimensions.trueBitCount(); i++) {
+ int c = dimensions.trueBitAt(i);
+ final int columnLength = info.codeSystem.maxCodeLength(c);
+ record.cols[c].set(key, offset, columnLength);
+ offset += columnLength;
+ }
+ metricsBuf.clear();
+ for (int i = 0; i < value.length; i++) {
+ int col = metrics.trueBitAt(i);
+ int pos = metricsBuf.position();
+ info.codeSystem.encodeColumnValue(col, value[i].getState(), metricsBuf);
+ record.cols[col].set(metricsBuf.array(), pos, metricsBuf.position() - pos);
}
+ }
+ }
+
+ class Dump implements Iterable<Pair<byte[], MeasureAggregator[]>> {
+ File dumpedFile;
+ Input input;
+ SortedMap<byte[], MeasureAggregator[]> buffMap;
+
+ public Dump(SortedMap<byte[], MeasureAggregator[]> buffMap) throws IOException {
+ this.buffMap = buffMap;
+ }
+
+ @Override
+ public Iterator<Pair<byte[], MeasureAggregator[]>> iterator() {
+ try {
+ if (dumpedFile == null || !dumpedFile.exists()) {
+ throw new RuntimeException("Dumped file cannot be found at: " + (dumpedFile == null ? "<null>" : dumpedFile.getAbsolutePath()));
+ }
+
+ input = new Input(new FileInputStream(dumpedFile));
+
+ final int count = kryo.readObject(input, Integer.class);
+ return new Iterator<Pair<byte[], MeasureAggregator[]>>() {
+ int cursorIdx = 0;
+
+ @Override
+ public boolean hasNext() {
+ return cursorIdx < count;
+ }
+
+ @Override
+ public Pair<byte[], MeasureAggregator[]> next() {
+ try {
+ cursorIdx++;
+ return (Pair<byte[], MeasureAggregator[]>) kryo.readObject(input, Pair.class);
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot read AggregationCache from dumped file: " + e.getMessage());
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to read dumped file: " + e.getMessage());
+ }
+ }
- @Override
- public GTRecord next() {
- Entry<byte[], MeasureAggregator[]> entry = it.next();
- create(entry.getKey(), entry.getValue());
- return secondRecord;
+ public void flush() throws IOException {
+ if (buffMap != null) {
+ Output output = null;
+ try {
+ dumpedFile = File.createTempFile("KYLIN_AGGR_", ".tmp");
+
+ logger.info("AggregationCache will dump to file: " + dumpedFile.getAbsolutePath());
+ output = new Output(new FileOutputStream(dumpedFile));
+ kryo.writeObject(output, buffMap.size());
+ for (Entry<byte[], MeasureAggregator[]> entry : buffMap.entrySet()) {
+ kryo.writeObject(output, new Pair(entry.getKey(), entry.getValue()));
+ }
+ } finally {
+ buffMap = null;
+ if (output != null)
+ output.close();
+ }
}
+ }
- private void create(byte[] key, MeasureAggregator[] value) {
- int offset = 0;
- for (int i = 0; i < dimensions.trueBitCount(); i++) {
- int c = dimensions.trueBitAt(i);
- final int columnLength = info.codeSystem.maxCodeLength(c);
- secondRecord.set(c, new ByteArray(key, offset, columnLength));
- offset += columnLength;
+ public void terminate() throws IOException {
+ buffMap = null;
+ if (input != null)
+ input.close();
+ if (dumpedFile != null && dumpedFile.exists())
+ dumpedFile.delete();
+ }
+ }
+
+ class DumpMerger implements Iterable<Pair<byte[], MeasureAggregator[]>> {
+ final PriorityQueue<Pair<byte[], Integer>> minHeap;
+ final List<Iterator<Pair<byte[], MeasureAggregator[]>>> dumpIterators;
+ final List<MeasureAggregator[]> dumpCurrentValues;
+
+ public DumpMerger(List<Dump> dumps) {
+ minHeap = new PriorityQueue<>(dumps.size(), new Comparator<Pair<byte[], Integer>>() {
+ @Override
+ public int compare(Pair<byte[], Integer> o1, Pair<byte[], Integer> o2) {
+ return bytesComparator.compare(o1.getFirst(), o2.getFirst());
}
- metricsBuf.clear();
- for (int i = 0; i < value.length; i++) {
- int col = metrics.trueBitAt(i);
- int pos = metricsBuf.position();
- info.codeSystem.encodeColumnValue(col, value[i].getState(), metricsBuf);
- secondRecord.cols[col].set(metricsBuf.array(), pos, metricsBuf.position() - pos);
+ });
+ dumpIterators = Lists.newArrayListWithCapacity(dumps.size());
+ dumpCurrentValues = Lists.newArrayListWithCapacity(dumps.size());
+
+ Iterator<Pair<byte[], MeasureAggregator[]>> it;
+ for (int i = 0; i < dumps.size(); i++) {
+ it = dumps.get(i).iterator();
+ if (it.hasNext()) {
+ dumpIterators.add(i, it);
+ Pair<byte[], MeasureAggregator[]> entry = it.next();
+ minHeap.offer(new Pair(entry.getKey(), i));
+ dumpCurrentValues.add(i, entry.getValue());
+ } else {
+ dumpIterators.add(i, null);
+ dumpCurrentValues.add(i, null);
}
}
+ }
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
+ private void enqueueFromDump(int index) {
+ if (dumpIterators.get(index) != null && dumpIterators.get(index).hasNext()) {
+ Pair<byte[], MeasureAggregator[]> entry = dumpIterators.get(index).next();
+ minHeap.offer(new Pair(entry.getKey(), index));
+ dumpCurrentValues.set(index, entry.getValue());
}
- };
- }
- }
+ }
- public static long estimateSizeOfAggrCache(byte[] keySample, MeasureAggregator<?>[] aggrSample, int size) {
- // Aggregation cache is basically a tree map. The tree map entry overhead is
- // - 40 according to http://java-performance.info/memory-consumption-of-java-data-types-2/
- // - 41~52 according to AggregationCacheMemSizeTest
- return (estimateSizeOf(keySample) + estimateSizeOf(aggrSample) + 64) * size;
- }
+ @Override
+ public Iterator<Pair<byte[], MeasureAggregator[]>> iterator() {
+ return new Iterator<Pair<byte[], MeasureAggregator[]>>() {
+ @Override
+ public boolean hasNext() {
+ return !minHeap.isEmpty();
+ }
- public static long estimateSizeOf(MeasureAggregator[] aggrs) {
- // size of array, AggregationCacheMemSizeTest reports 4 for [0], 12 for [1], 12 for [2], 20 for [3] etc..
- // Memory alignment to 8 bytes
- long est = (aggrs.length + 1) / 2 * 8 + 4 + (4 /* extra */);
- for (MeasureAggregator aggr : aggrs) {
- if (aggr != null)
- est += aggr.getMemBytesEstimate();
- }
- return est;
- }
+ @Override
+ public Pair<byte[], MeasureAggregator[]> next() {
+ // Use minimum heap to merge sort the keys,
+ // also do aggregation for measures with same keys in different dumps
+ Pair<byte[], Integer> peekEntry = minHeap.poll();
+ MeasureAggregator[] mergedAggr = dumpCurrentValues.get(peekEntry.getValue());
+ enqueueFromDump(peekEntry.getValue());
- public static long estimateSizeOf(byte[] bytes) {
- // AggregationCacheMemSizeTest reports 20 for byte[10] and 20 again for byte[16]
- // Memory alignment to 8 bytes
- return (bytes.length + 7) / 8 * 8 + 4 + (4 /* extra */);
+ while (!minHeap.isEmpty() && bytesComparator.compare(peekEntry.getKey(), minHeap.peek().getKey()) == 0) {
+ Pair<byte[], Integer> newPeek = minHeap.poll();
+
+ MeasureAggregator[] newPeekAggr = dumpCurrentValues.get(newPeek.getValue());
+ for (int i = 0; i < newPeekAggr.length; i++) {
+ mergedAggr[i].aggregate(newPeekAggr[i].getState());
+ }
+
+ enqueueFromDump(newPeek.getValue());
+ }
+
+ return new Pair(peekEntry.getKey(), mergedAggr);
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/eaed4f6b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index 2c284c9..ac99d4e 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -29,6 +29,7 @@ public class GTScanRequest {
// hint to storage behavior
private boolean allowPreAggregation = true;
+ private double aggrCacheGB = 0; // no limit
public GTScanRequest(GTInfo info) {
this(info, null, null, null);
@@ -118,7 +119,7 @@ public class GTScanRequest {
}
public IGTScanner decorateScanner(IGTScanner scanner) throws IOException {
- return decorateScanner(scanner, true, true, false);//by default do not check mem
+ return decorateScanner(scanner, true, true);//by default do not check mem
}
/**
@@ -127,7 +128,7 @@ public class GTScanRequest {
*
* Refer to CoprocessorBehavior for explanation
*/
- public IGTScanner decorateScanner(IGTScanner scanner, boolean doFilter, boolean doAggr, boolean doMemCheck) throws IOException {
+ public IGTScanner decorateScanner(IGTScanner scanner, boolean doFilter, boolean doAggr) throws IOException {
IGTScanner result = scanner;
if (!doFilter) { //Skip reading this section if you're not profiling!
lookAndForget(result);
@@ -144,13 +145,13 @@ public class GTScanRequest {
}
if (this.allowPreAggregation && this.hasAggregation()) {
- result = new GTAggregateScanner(result, this, doMemCheck);
+ result = new GTAggregateScanner(result, this);
}
return result;
}
}
- //touch every byte of the cell so that the cost of scanning will be trully reflected
+ //touch every byte of the cell so that the cost of scanning will be truly reflected
private void lookAndForget(IGTScanner scanner) {
byte meaninglessByte = 0;
for (GTRecord gtRecord : scanner) {
@@ -215,6 +216,14 @@ public class GTScanRequest {
return aggrMetricsFuncs;
}
+ public double getAggrCacheGB() {
+ return aggrCacheGB;
+ }
+
+ public void setAggrCacheGB(double gb) {
+ this.aggrCacheGB = gb;
+ }
+
@Override
public String toString() {
return "GTScanRequest [range=" + range + ", columns=" + columns + ", filterPushDown=" + filterPushDown + ", aggrGroupBy=" + aggrGroupBy + ", aggrMetrics=" + aggrMetrics + ", aggrMetricsFuncs=" + Arrays.toString(aggrMetricsFuncs) + "]";
http://git-wip-us.apache.org/repos/asf/kylin/blob/eaed4f6b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 3759738..6feed33 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -147,14 +147,18 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
innerScanner = region.getScanner(scan);
InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner);
+ CoprocessorBehavior behavior = CoprocessorBehavior.valueOf(request.getBehavior());
+ if (behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
+ if (scanReq.getAggrCacheGB() <= 0)
+ scanReq.setAggrCacheGB(10); // 10 GB threshold, inherit from v1.0
+ }
+
IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScan.hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize());
IGTScanner rawScanner = store.scan(scanReq);
- CoprocessorBehavior behavior = CoprocessorBehavior.valueOf(request.getBehavior());
IGTScanner finalScanner = scanReq.decorateScanner(rawScanner,//
behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER.ordinal(),//
- behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal(),//
- behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal());//
+ behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal());
ByteBuffer buffer = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
[2/3] kylin git commit: KYLIN-1233 Add testcases for spill and in-mem
of AggregationCache
Posted by li...@apache.org.
KYLIN-1233 Add testcases for spill and in-mem of AggregationCache
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a46a25dd
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a46a25dd
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a46a25dd
Branch: refs/heads/2.x-staging
Commit: a46a25dd096ad54add369e9067401229a797d11f
Parents: eaed4f6
Author: lidongsjtu <do...@ebay.com>
Authored: Sun Dec 27 14:34:06 2015 +0800
Committer: lidongsjtu <do...@ebay.com>
Committed: Sun Dec 27 14:34:06 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/gridtable/UnitTestSupport.java | 44 +++++-
.../gridtable/AggregationCacheSpillTest.java | 140 +++++++++++++++++++
2 files changed, 182 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/a46a25dd/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java b/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
index ff71b4f..08187e9 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.gridtable.GTInfo.Builder;
@@ -44,6 +45,23 @@ public class UnitTestSupport {
return info;
}
+ public static GTInfo hllInfo() {
+ Builder builder = GTInfo.builder();
+ builder.setCodeSystem(new GTSampleCodeSystem());
+ builder.setColumns( //
+ DataType.getType("varchar(10)"), //
+ DataType.getType("varchar(10)"), //
+ DataType.getType("varchar(10)"), //
+ DataType.getType("bigint"), //
+ DataType.getType("decimal"), //
+ DataType.getType("hllc14") //
+ );
+ builder.setPrimaryKey(setOf(0));
+ builder.setColumnPreferIndex(setOf(0));
+ GTInfo info = builder.build();
+ return info;
+ }
+
private static Builder infoBuilder() {
Builder builder = GTInfo.builder();
builder.setCodeSystem(new GTSampleCodeSystem());
@@ -81,15 +99,37 @@ public class UnitTestSupport {
return result;
}
+ public static List<GTRecord> mockupHllData(GTInfo info, int nRows) {
+ List<GTRecord> result = new ArrayList<GTRecord>(nRows);
+ int round = nRows / 10;
+ for (int i = 0; i < round; i++) {
+ String d_01_14 = datePlus("2015-01-14", i * 4);
+ String d_01_15 = datePlus("2015-01-15", i * 4);
+ String d_01_16 = datePlus("2015-01-16", i * 4);
+ String d_01_17 = datePlus("2015-01-17", i * 4);
+ result.add(newRec(info, d_01_14, "Yang", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+ result.add(newRec(info, d_01_14, "Luke", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+ result.add(newRec(info, d_01_15, "Xu", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+ result.add(newRec(info, d_01_15, "Dong", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+ result.add(newRec(info, d_01_15, "Jason", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+ result.add(newRec(info, d_01_16, "Mahone", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+ result.add(newRec(info, d_01_16, "Shaofeng", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+ result.add(newRec(info, d_01_16, "Qianhao", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+ result.add(newRec(info, d_01_16, "George", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+ result.add(newRec(info, d_01_17, "Kejia", "Food", new LongMutable(10), new BigDecimal("10.5"), new HyperLogLogPlusCounter(14)));
+ }
+ return result;
+ }
+
private static String datePlus(String date, int plusDays) {
long millis = DateFormat.stringToMillis(date);
millis += (1000L * 3600L * 24L) * plusDays;
return DateFormat.formatToDateStr(millis);
}
- private static GTRecord newRec(GTInfo info, String date, String name, String category, LongMutable amount, BigDecimal price) {
+ private static GTRecord newRec(GTInfo info, Object... values) {
GTRecord rec = new GTRecord(info);
- return rec.setValues(date, name, category, amount, price);
+ return rec.setValues(values);
}
private static ImmutableBitSet setOf(int... values) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/a46a25dd/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
new file mode 100644
index 0000000..7fd5ce7
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.metadata.datatype.LongMutable;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Created by dongli on 12/16/15.
+ */
+public class AggregationCacheSpillTest {
+
+ final static int DATA_CARDINALITY = 40000;
+ final static int DATA_REPLICATION = 2;
+
+ final static GTInfo INFO = UnitTestSupport.hllInfo();
+ final static List<GTRecord> TEST_DATA = Lists.newArrayListWithCapacity(DATA_CARDINALITY * DATA_REPLICATION);;
+
+ @BeforeClass
+ public static void beforeClass() {
+ System.setProperty("log4j.configuration", "kylin-log4j.properties");
+
+ final List<GTRecord> data = UnitTestSupport.mockupHllData(INFO, DATA_CARDINALITY);
+ for (int i = 0; i < DATA_REPLICATION; i++)
+ TEST_DATA.addAll(data);
+ }
+
+ @Test
+ public void testAggregationCacheSpill() throws IOException {
+ IGTScanner inputScanner = new IGTScanner() {
+ @Override
+ public GTInfo getInfo() {
+ return INFO;
+ }
+
+ @Override
+ public int getScannedRowCount() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public Iterator<GTRecord> iterator() {
+ return TEST_DATA.iterator();
+ }
+ };
+
+ GTScanRequest scanRequest = new GTScanRequest(INFO, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(0, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null, true);
+ scanRequest.setAggrCacheGB(0.5); // 500 MB
+
+ GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest);
+
+ int count = 0;
+ for (GTRecord record : scanner) {
+ assertNotNull(record);
+ Object[] returnRecord = record.getValues();
+ assertEquals(20, ((LongMutable) returnRecord[3]).get());
+ assertEquals(21, ((BigDecimal) returnRecord[4]).longValue());
+ count++;
+
+ System.out.println(record);
+ }
+ assertEquals(DATA_CARDINALITY, count);
+ scanner.close();
+ }
+
+ @Test
+ public void testAggregationCacheInMem() throws IOException {
+ IGTScanner inputScanner = new IGTScanner() {
+ @Override
+ public GTInfo getInfo() {
+ return INFO;
+ }
+
+ @Override
+ public int getScannedRowCount() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public Iterator<GTRecord> iterator() {
+ return TEST_DATA.iterator();
+ }
+ };
+
+ // all-in-mem testcase
+ GTScanRequest scanRequest = new GTScanRequest(INFO, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(1, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null, true);
+ scanRequest.setAggrCacheGB(0.5); // 500 MB
+
+ GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest);
+
+ int count = 0;
+ for (GTRecord record : scanner) {
+ assertNotNull(record);
+ Object[] returnRecord = record.getValues();
+ assertEquals(80000, ((LongMutable) returnRecord[3]).get());
+ assertEquals(84000, ((BigDecimal) returnRecord[4]).longValue());
+ count++;
+
+ System.out.println(record);
+ }
+ assertEquals(10, count);
+ scanner.close();
+ }
+}
\ No newline at end of file
[3/3] kylin git commit: minor, fix typo
Posted by li...@apache.org.
minor, fix typo
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e5832699
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e5832699
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e5832699
Branch: refs/heads/2.x-staging
Commit: e58326999711bef09a823026038ec3adf0acf2ad
Parents: a46a25d
Author: lidongsjtu <do...@ebay.com>
Authored: Sun Dec 27 14:35:10 2015 +0800
Committer: lidongsjtu <do...@ebay.com>
Committed: Sun Dec 27 14:35:10 2015 +0800
----------------------------------------------------------------------
webapp/app/partials/admin/admin.html | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/e5832699/webapp/app/partials/admin/admin.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/admin/admin.html b/webapp/app/partials/admin/admin.html
index 3e063a6..0ed5ab5 100644
--- a/webapp/app/partials/admin/admin.html
+++ b/webapp/app/partials/admin/admin.html
@@ -61,7 +61,7 @@
</div>
<div style="padding-top: 10px;width: 180px;">
<button class="btn btn-primary btn-lg btn-block" ng-click="enableCache()">
- <label tooltip="Disable Query Cache" style="cursor: pointer;">Enable Cache</label>
+ <label tooltip="Enable Query Cache" style="cursor: pointer;">Enable Cache</label>
</button>
</div>
<div style="padding-top: 10px;width: 180px;">