You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by bi...@apache.org on 2018/02/04 14:32:09 UTC
kylin git commit: KYLIN-2929, speed up dump performance,
write dump file to disk in lazy way
Repository: kylin
Updated Branches:
refs/heads/master a2af43d40 -> 4a29d92e8
KYLIN-2929, speed up dump performance, write dump file to disk in lazy way
Signed-off-by: Billy Liu <bi...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4a29d92e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4a29d92e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4a29d92e
Branch: refs/heads/master
Commit: 4a29d92e8cb5e92dc97ac18181417893c0d79d9b
Parents: a2af43d
Author: 冯宇 <hz...@corp.netease.com>
Authored: Mon Jan 29 14:37:39 2018 +0800
Committer: Billy Liu <bi...@apache.org>
Committed: Sun Feb 4 22:31:31 2018 +0800
----------------------------------------------------------------------
.../kylin/gridtable/GTAggregateScanner.java | 70 ++++++++++++++++----
1 file changed, 57 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/4a29d92e/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index 9fba336..6dc5de3 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -18,6 +18,8 @@
package org.apache.kylin.gridtable;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@@ -25,6 +27,8 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
@@ -64,6 +68,7 @@ import com.google.common.collect.Maps;
public class GTAggregateScanner implements IGTScanner, IGTBypassChecker {
private static final Logger logger = LoggerFactory.getLogger(GTAggregateScanner.class);
+ private static final int MAX_BUFFER_SIZE = 64 * 1024 * 1024;
final GTInfo info;
final ImmutableBitSet dimensions; // dimensions to return, can be more than group by
@@ -73,7 +78,7 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker {
final IGTScanner inputScanner;
final BufferedMeasureCodec measureCodec;
final AggregationCache aggrCache;
- final long spillThreshold; // 0 means no memory control && no spill
+ long spillThreshold; // 0 means no memory control && no spill
final int storagePushDownLimit;//default to be Int.MAX
final StorageLimitLevel storageLimitLevel;
final boolean spillEnabled;
@@ -281,6 +286,7 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker {
final int keyLength;
final boolean[] compareMask;
boolean compareAll = true;
+ long sumSpilledSize = 0;
ByPassChecker byPassChecker = null;
final Comparator<byte[]> bytesComparator = new Comparator<byte[]>() {
@@ -425,6 +431,18 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker {
Dump dump = new Dump(aggBufMap, estMemSize);
dump.flush();
dumps.add(dump);
+ sumSpilledSize += dump.size();
+ // when spilled data is too much, we can modify it by other strategy.
+ // this means, all spilled data is bigger than half of original spillThreshold.
+ if(sumSpilledSize > spillThreshold) {
+ for(Dump current : dumps) {
+ current.spill();
+ }
+ spillThreshold += sumSpilledSize;
+ sumSpilledSize = 0;
+ } else {
+ spillThreshold -= dump.size();
+ }
} catch (Exception e) {
throw new RuntimeException("AggregationCache failed to spill", e);
}
@@ -643,7 +661,7 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker {
final File dumpedFile;
SortedMap<byte[], MeasureAggregator[]> buffMap;
final long estMemSize;
-
+ byte[] spillBuffer;
DataInputStream dis;
public Dump(SortedMap<byte[], MeasureAggregator[]> buffMap, long estMemSize) throws IOException {
@@ -660,7 +678,11 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker {
+ (dumpedFile == null ? "<null>" : dumpedFile.getAbsolutePath()));
}
- dis = new DataInputStream(new FileInputStream(dumpedFile));
+ if(spillBuffer == null) {
+ dis = new DataInputStream(new FileInputStream(dumpedFile));
+ } else {
+ dis = new DataInputStream(new ByteArrayInputStream(spillBuffer));
+ }
final int count = dis.readInt();
return new Iterator<Pair<byte[], byte[]>>() {
int cursorIdx = 0;
@@ -697,39 +719,61 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker {
}
}
+ public void spill() throws IOException {
+ if(spillBuffer == null) return;
+ OutputStream ops = new FileOutputStream(dumpedFile);
+ InputStream ips = new ByteArrayInputStream(spillBuffer);
+ IOUtils.copy(ips, ops);
+ spillBuffer = null;
+ IOUtils.closeQuietly(ips);
+ IOUtils.closeQuietly(ops);
+
+ logger.info("Spill buffer to disk, location: {}, size = {}.", dumpedFile.getAbsolutePath(),
+ dumpedFile.length());
+ }
+
+ public int size() {
+ return spillBuffer == null ? 0 : spillBuffer.length;
+ }
+
public void flush() throws IOException {
logger.info("AggregationCache(size={} est_mem_size={} threshold={}) will spill to {}", buffMap.size(),
estMemSize, spillThreshold, dumpedFile.getAbsolutePath());
-
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(MAX_BUFFER_SIZE);
if (buffMap != null) {
- DataOutputStream dos = null;
+ DataOutputStream bos = new DataOutputStream(baos);
Object[] aggrResult = null;
try {
- dos = new DataOutputStream(new FileOutputStream(dumpedFile));
- dos.writeInt(buffMap.size());
+ bos.writeInt(buffMap.size());
+
for (Entry<byte[], MeasureAggregator[]> entry : buffMap.entrySet()) {
MeasureAggregators aggs = new MeasureAggregators(entry.getValue());
aggrResult = new Object[metrics.trueBitCount()];
aggs.collectStates(aggrResult);
ByteBuffer metricsBuf = measureCodec.encode(aggrResult);
- dos.writeInt(entry.getKey().length);
- dos.write(entry.getKey());
- dos.writeInt(metricsBuf.position());
- dos.write(metricsBuf.array(), 0, metricsBuf.position());
+
+ bos.writeInt(entry.getKey().length);
+ bos.write(entry.getKey());
+ bos.writeInt(metricsBuf.position());
+ bos.write(metricsBuf.array(), 0, metricsBuf.position());
}
} finally {
buffMap = null;
- IOUtils.closeQuietly(dos);
+ IOUtils.closeQuietly(bos);
}
}
+ spillBuffer = baos.toByteArray();
+ IOUtils.closeQuietly(baos);
+ logger.info("Accurately spill data size = {}", spillBuffer.length);
}
public void terminate() throws IOException {
buffMap = null;
if (dis != null)
- dis.close();
+ IOUtils.closeQuietly(dis);
if (dumpedFile != null && dumpedFile.exists())
dumpedFile.delete();
+ spillBuffer = null;
}
}