You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/12/14 23:37:04 UTC

kylin git commit: KYLIN-1226 A new way to estimate aggr cache size, MemoryWaterLevel

Repository: kylin
Updated Branches:
  refs/heads/2.0-rc af883ef39 -> 949efc558


KYLIN-1226 A new way to estimate aggr cache size, MemoryWaterLevel


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/949efc55
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/949efc55
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/949efc55

Branch: refs/heads/2.0-rc
Commit: 949efc5580f5507f548f935653c74deab9a8007b
Parents: af883ef
Author: Yang Li <li...@apache.org>
Authored: Tue Dec 15 06:36:17 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Tue Dec 15 06:36:17 2015 +0800

----------------------------------------------------------------------
 .../common/util/MemoryBudgetController.java     | 22 +++++++++
 .../cube/inmemcubing/DoggedCubeBuilder.java     |  9 +---
 .../cube/inmemcubing/InMemCubeBuilder.java      | 52 +++++++++-----------
 .../kylin/gridtable/GTAggregateScanner.java     | 14 +++++-
 .../engine/mr/steps/InMemCuboidMapper.java      |  2 +-
 5 files changed, 60 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/949efc55/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java b/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
index d94f32b..37b0bd4 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
@@ -271,4 +271,26 @@ public class MemoryBudgetController {
         return (int) (getSystemAvailBytes() / ONE_MB);
     }
 
+
+    // protective estimate of memory usage, prefer overestimate rather than underestimate
+    public static class MemoryWaterLevel {
+        int lowAvail = Integer.MAX_VALUE;
+        int highAvail = Integer.MIN_VALUE;
+
+        public void markHigh() {
+            // get avail mem without gc
+            lowAvail = Math.min(lowAvail, MemoryBudgetController.getSystemAvailMB());
+            logger.info("Lower system avail " + lowAvail + " MB in markHigh()");
+        }
+
+        public void markLow() {
+            // get avail mem after gc
+            highAvail = Math.max(highAvail, MemoryBudgetController.gcAndGetSystemAvailMB());
+            logger.info("Higher system avail " + highAvail + " MB in markLow()");
+        }
+
+        public int getEstimateMB() {
+            return highAvail - lowAvail;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/949efc55/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
index 7f6e173..36a45e1 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
@@ -73,15 +73,10 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
     }
 
     private class BuildOnce {
-        private int cutAheadMB;
 
         BuildOnce() {
             int systemAvailMB = MemoryBudgetController.getSystemAvailMB();
 
-            // InMemCubeBuilder will over-estimate base cuboid size by a factor, must cut ahead at least the same factor
-            cutAheadMB = (int) (systemAvailMB * InMemCubeBuilder.getAggrCacheOversizeFactor(cubeDesc));
-            logger.info("Cut ahead MB is " + cutAheadMB);
-
             int half = systemAvailMB / 2;
             if (getReserveMemoryMB() > half) {
                 logger.info("Reserve " + getReserveMemoryMB() + " MB is more than half of system avail " + systemAvailMB + " MB, override to " + half);
@@ -264,8 +259,8 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
                 return true;
             }
 
-            if (systemAvailMB <= reserveMemoryMB + cutAheadMB) {
-                logger.info("Split cut due to hitting memory threshold, system avail " + systemAvailMB + " MB <= reserve " + reserveMemoryMB + " MB + cut ahead " + cutAheadMB + " MB");
+            if (systemAvailMB <= reserveMemoryMB) {
+                logger.info("Split cut due to hitting memory threshold, system avail " + systemAvailMB + " MB <= reserve " + reserveMemoryMB + " MB");
                 return true;
             }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/949efc55/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index 97c96fb..2cc298b 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -32,6 +32,7 @@ import org.apache.kylin.common.topn.Counter;
 import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
@@ -61,7 +62,10 @@ import com.google.common.collect.Lists;
 public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
 
     private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
-    private static final double BASE_CUBOID_CACHE_OVERSIZE_FACTOR = 0.15;
+
+    // by experience
+    private static final double DERIVE_AGGR_CACHE_CONSTANT_FACTOR = 0.1;
+    private static final double DERIVE_AGGR_CACHE_VARIABLE_FACTOR = 0.9;
 
     private final CuboidScheduler cuboidScheduler;
     private final long baseCuboidId;
@@ -72,6 +76,8 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     private final int measureCount;
 
     private MemoryBudgetController memBudget;
+    private MemoryWaterLevel baseCuboidMemTracker;
+
     private Thread[] taskThreads;
     private Throwable[] taskThreadExceptions;
     private TreeSet<CuboidTask> taskPending;
@@ -146,6 +152,9 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         long startTime = System.currentTimeMillis();
         logger.info("In Mem Cube Build start, " + cubeDesc.getName());
 
+        baseCuboidMemTracker = new MemoryWaterLevel();
+        baseCuboidMemTracker.markLow();
+
         // multiple threads to compute cuboid in parallel
         taskPending = new TreeSet<CuboidTask>();
         taskCuboidCompleted.set(0);
@@ -160,6 +169,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
             return;
 
         // plan memory budget
+        baseCuboidMemTracker.markLow();
         makeMemoryBudget();
 
         // kick off N-D cuboid tasks and output
@@ -299,6 +309,8 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     }
 
     private void makeMemoryBudget() {
+        baseResult.aggrCacheMB = Math.max(baseCuboidMemTracker.getEstimateMB(), 10); // 10 MB at minimal
+        logger.info("Base cuboid aggr cache is " + baseResult.aggrCacheMB + " MB");
         int systemAvailMB = getSystemAvailMB();
         logger.info("System avail " + systemAvailMB + " MB");
         int reserve = reserveMemoryMB;
@@ -308,7 +320,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         if (budget < baseResult.aggrCacheMB) {
             // make sure we have base aggr cache as minimal
             budget = baseResult.aggrCacheMB;
-            logger.warn("!!! System avail memory (" + systemAvailMB + " MB) is less than base aggr cache (" + baseResult.aggrCacheMB + " MB) + minimal reservation (" + reserve + " MB), consider increase JVM heap -Xmx");
+            logger.warn("System avail memory (" + systemAvailMB + " MB) is less than base aggr cache (" + baseResult.aggrCacheMB + " MB) + minimal reservation (" + reserve + " MB), consider increase JVM heap -Xmx");
         }
 
         logger.info("Memory Budget is " + budget + " MB");
@@ -316,11 +328,8 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     }
 
     private CuboidResult createBaseCuboid(BlockingQueue<List<String>> input) throws IOException {
-        int mbBefore = getSystemAvailMB();
-        int mbAfter = 0;
-
         long startTime = System.currentTimeMillis();
-        logger.info("Calculating base cuboid " + baseCuboidId + ", system avail " + mbBefore + " MB");
+        logger.info("Calculating base cuboid " + baseCuboidId);
 
         GridTable baseCuboid = newGridTableByCuboidID(baseCuboidId);
         GTBuilder baseBuilder = baseCuboid.rebuild();
@@ -329,11 +338,12 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         Pair<ImmutableBitSet, ImmutableBitSet> dimensionMetricsBitSet = InMemCubeBuilderUtils.getDimensionAndMetricColumnBitSet(baseCuboidId, measureCount);
         GTScanRequest req = new GTScanRequest(baseCuboid.getInfo(), null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null);
         GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req, true);
+        aggregationScanner.trackMemoryLevel(baseCuboidMemTracker);
 
         int count = 0;
         for (GTRecord r : aggregationScanner) {
-            if (mbAfter == 0) {
-                mbAfter = getSystemAvailMB();
+            if (count == 0) {
+                baseCuboidMemTracker.markHigh();
             }
             baseBuilder.write(r);
             count++;
@@ -344,31 +354,16 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         long timeSpent = System.currentTimeMillis() - startTime;
         logger.info("Cuboid " + baseCuboidId + " has " + count + " rows, build takes " + timeSpent + "ms");
 
-        int mbBaseAggrCacheOnHeap = mbAfter == 0 ? 0 : mbBefore - mbAfter;
         int mbEstimateBaseAggrCache = (int) (aggregationScanner.getEstimateSizeOfAggrCache() / MemoryBudgetController.ONE_MB);
-        int mbBaseAggrCache = (int) (mbBaseAggrCacheOnHeap * (1 + getAggrCacheOversizeFactor(cubeDesc)));
-        mbBaseAggrCache = Math.max(mbBaseAggrCache, 10); // let it be at least 10 MB
-        logger.info("Base aggr cache is " + mbBaseAggrCache + " MB (heap " + mbBaseAggrCacheOnHeap + " MB, estimate " + mbEstimateBaseAggrCache + " MB)");
+        logger.info("Wild esitmate of base aggr cache is " + mbEstimateBaseAggrCache + " MB");
 
-        return updateCuboidResult(baseCuboidId, baseCuboid, count, timeSpent, mbBaseAggrCache);
+        return updateCuboidResult(baseCuboidId, baseCuboid, count, timeSpent, 0);
     }
-    
-    // Aggregation cache need to be oversized such that spawned thread not only has memory for 
-    // aggregation cache but also has enough for temporary measures and others.
-    public static double getAggrCacheOversizeFactor(CubeDesc cubeDesc) {
-        double r = BASE_CUBOID_CACHE_OVERSIZE_FACTOR;
-        for (MeasureDesc m : cubeDesc.getMeasures()) {
-            if (m.getFunction().isCountDistinct())
-                r += BASE_CUBOID_CACHE_OVERSIZE_FACTOR;
-        }
-        return r;
-    }
-
-
 
     private CuboidResult updateCuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) {
-        if (aggrCacheMB <= 0) {
-            aggrCacheMB = (int) Math.ceil(1.0 * nRows / baseResult.nRows * baseResult.aggrCacheMB);
+        if (aggrCacheMB <= 0 && baseResult != null) {
+            aggrCacheMB = (int) Math.ceil( //
+                    (DERIVE_AGGR_CACHE_CONSTANT_FACTOR + DERIVE_AGGR_CACHE_VARIABLE_FACTOR * nRows / baseResult.nRows) * baseResult.aggrCacheMB);
         }
 
         CuboidResult result = new CuboidResult(cuboidId, table, nRows, timeSpent, aggrCacheMB);
@@ -552,6 +547,5 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         public int getScannedRowCount() {
             return 0;
         }
-
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/949efc55/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index cd2881e..0ed320c 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -10,6 +10,7 @@ import java.util.SortedMap;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
 import org.apache.kylin.metadata.measure.MeasureAggregator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -21,7 +22,6 @@ public class GTAggregateScanner implements IGTScanner {
 
     @SuppressWarnings("unused")
     private static final Logger logger = LoggerFactory.getLogger(GTAggregateScanner.class);
-    private int aggregatedRowCount = 0;
 
     final GTInfo info;
     final ImmutableBitSet dimensions; // dimensions to return, can be more than group by
@@ -32,6 +32,9 @@ public class GTAggregateScanner implements IGTScanner {
     final AggregationCache aggrCache;
     final boolean enableMemCheck;
 
+    private int aggregatedRowCount = 0;
+    private MemoryWaterLevel memTracker;
+
     public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req, boolean enableMemCheck) {
         if (req.hasAggregation() == false)
             throw new IllegalStateException();
@@ -46,6 +49,10 @@ public class GTAggregateScanner implements IGTScanner {
         this.enableMemCheck = enableMemCheck;
     }
 
+    public void trackMemoryLevel(MemoryWaterLevel tracker) {
+        this.memTracker = tracker;
+    }
+
     @Override
     public GTInfo getInfo() {
         return info;
@@ -141,7 +148,10 @@ public class GTAggregateScanner implements IGTScanner {
         }
 
         void aggregate(GTRecord r) {
-            if (enableMemCheck && (++aggregatedRowCount % 100000 == 0)) {
+            if (enableMemCheck && (++aggregatedRowCount % 1000 == 0)) {
+                if (memTracker != null) {
+                    memTracker.markHigh();
+                }
                 long estimated = estimatedMemSize();
                 if (estimated > 10 * MemoryBudgetController.ONE_GB) {
                     throw new RuntimeException("AggregationCache exceed 10GB, estimated size is: " + estimated);

http://git-wip-us.apache.org/repos/asf/kylin/blob/949efc55/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index d724c76..1e29066 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -46,7 +46,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
     private IMRTableInputFormat flatTableInputFormat;
 
     private int counter;
-    private BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(10000);
+    private BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(64);
     private Future<?> future;
 
     @Override