You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by bi...@apache.org on 2018/02/04 14:32:09 UTC

kylin git commit: KYLIN-2929, speed up dump performance, write dump file to disk in lazy way

Repository: kylin
Updated Branches:
  refs/heads/master a2af43d40 -> 4a29d92e8


KYLIN-2929, speed up dump performance, write dump file to disk in lazy way

Signed-off-by: Billy Liu <bi...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4a29d92e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4a29d92e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4a29d92e

Branch: refs/heads/master
Commit: 4a29d92e8cb5e92dc97ac18181417893c0d79d9b
Parents: a2af43d
Author: 冯宇 <hz...@corp.netease.com>
Authored: Mon Jan 29 14:37:39 2018 +0800
Committer: Billy Liu <bi...@apache.org>
Committed: Sun Feb 4 22:31:31 2018 +0800

----------------------------------------------------------------------
 .../kylin/gridtable/GTAggregateScanner.java     | 70 ++++++++++++++++----
 1 file changed, 57 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/4a29d92e/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index 9fba336..6dc5de3 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.gridtable;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -25,6 +27,8 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Comparator;
@@ -64,6 +68,7 @@ import com.google.common.collect.Maps;
 public class GTAggregateScanner implements IGTScanner, IGTBypassChecker {
 
     private static final Logger logger = LoggerFactory.getLogger(GTAggregateScanner.class);
+    private static final int MAX_BUFFER_SIZE = 64 * 1024 * 1024;
 
     final GTInfo info;
     final ImmutableBitSet dimensions; // dimensions to return, can be more than group by
@@ -73,7 +78,7 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker {
     final IGTScanner inputScanner;
     final BufferedMeasureCodec measureCodec;
     final AggregationCache aggrCache;
-    final long spillThreshold; // 0 means no memory control && no spill
+    long spillThreshold; // 0 means no memory control && no spill
     final int storagePushDownLimit;//default to be Int.MAX
     final StorageLimitLevel storageLimitLevel;
     final boolean spillEnabled;
@@ -281,6 +286,7 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker {
         final int keyLength;
         final boolean[] compareMask;
         boolean compareAll = true;
+        long sumSpilledSize = 0;
         ByPassChecker byPassChecker = null;
 
         final Comparator<byte[]> bytesComparator = new Comparator<byte[]>() {
@@ -425,6 +431,18 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker {
                 Dump dump = new Dump(aggBufMap, estMemSize);
                 dump.flush();
                 dumps.add(dump);
+                sumSpilledSize += dump.size();
+                // when spilled data is too much, we can modify it by other strategy.
+                // this means, all spilled data is bigger than half of original spillThreshold.
+                if(sumSpilledSize > spillThreshold) {
+                    for(Dump current : dumps) {
+                        current.spill();
+                    }
+                    spillThreshold += sumSpilledSize;
+                    sumSpilledSize = 0;
+                } else {
+                    spillThreshold -= dump.size();
+                }
             } catch (Exception e) {
                 throw new RuntimeException("AggregationCache failed to spill", e);
             }
@@ -643,7 +661,7 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker {
             final File dumpedFile;
             SortedMap<byte[], MeasureAggregator[]> buffMap;
             final long estMemSize;
-
+            byte[] spillBuffer;
             DataInputStream dis;
 
             public Dump(SortedMap<byte[], MeasureAggregator[]> buffMap, long estMemSize) throws IOException {
@@ -660,7 +678,11 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker {
                                 + (dumpedFile == null ? "<null>" : dumpedFile.getAbsolutePath()));
                     }
 
-                    dis = new DataInputStream(new FileInputStream(dumpedFile));
+                    if(spillBuffer == null) {
+                        dis = new DataInputStream(new FileInputStream(dumpedFile));
+                    } else {
+                        dis = new DataInputStream(new ByteArrayInputStream(spillBuffer));
+                    }
                     final int count = dis.readInt();
                     return new Iterator<Pair<byte[], byte[]>>() {
                         int cursorIdx = 0;
@@ -697,39 +719,61 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker {
                 }
             }
 
+            public void spill() throws IOException {
+                if(spillBuffer == null) return;
+                OutputStream ops = new FileOutputStream(dumpedFile);
+                InputStream ips = new ByteArrayInputStream(spillBuffer);
+                IOUtils.copy(ips, ops);
+                spillBuffer = null;
+                IOUtils.closeQuietly(ips);
+                IOUtils.closeQuietly(ops);
+
+                logger.info("Spill buffer to disk, location: {}, size = {}.", dumpedFile.getAbsolutePath(),
+                    dumpedFile.length());
+            }
+
+            public int size() {
+                return spillBuffer == null ? 0 : spillBuffer.length;
+            }
+
             public void flush() throws IOException {
                 logger.info("AggregationCache(size={} est_mem_size={} threshold={}) will spill to {}", buffMap.size(),
                         estMemSize, spillThreshold, dumpedFile.getAbsolutePath());
-
+                ByteArrayOutputStream baos = new ByteArrayOutputStream(MAX_BUFFER_SIZE);
                 if (buffMap != null) {
-                    DataOutputStream dos = null;
+                    DataOutputStream bos = new DataOutputStream(baos);
                     Object[] aggrResult = null;
                     try {
-                        dos = new DataOutputStream(new FileOutputStream(dumpedFile));
-                        dos.writeInt(buffMap.size());
+                        bos.writeInt(buffMap.size());
+
                         for (Entry<byte[], MeasureAggregator[]> entry : buffMap.entrySet()) {
                             MeasureAggregators aggs = new MeasureAggregators(entry.getValue());
                             aggrResult = new Object[metrics.trueBitCount()];
                             aggs.collectStates(aggrResult);
                             ByteBuffer metricsBuf = measureCodec.encode(aggrResult);
-                            dos.writeInt(entry.getKey().length);
-                            dos.write(entry.getKey());
-                            dos.writeInt(metricsBuf.position());
-                            dos.write(metricsBuf.array(), 0, metricsBuf.position());
+
+                            bos.writeInt(entry.getKey().length);
+                            bos.write(entry.getKey());
+                            bos.writeInt(metricsBuf.position());
+                            bos.write(metricsBuf.array(), 0, metricsBuf.position());
                         }
                     } finally {
                         buffMap = null;
-                        IOUtils.closeQuietly(dos);
+                        IOUtils.closeQuietly(bos);
                     }
                 }
+                spillBuffer = baos.toByteArray();
+                IOUtils.closeQuietly(baos);
+                logger.info("Accurately spill data size = {}", spillBuffer.length);
             }
 
             public void terminate() throws IOException {
                 buffMap = null;
                 if (dis != null)
-                    dis.close();
+                    IOUtils.closeQuietly(dis);
                 if (dumpedFile != null && dumpedFile.exists())
                     dumpedFile.delete();
+                spillBuffer = null;
             }
         }