You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/12/14 23:37:04 UTC
kylin git commit: KYLIN-1226 A new way to estimate aggr cache size,
MemoryWaterLevel
Repository: kylin
Updated Branches:
refs/heads/2.0-rc af883ef39 -> 949efc558
KYLIN-1226 A new way to estimate aggr cache size, MemoryWaterLevel
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/949efc55
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/949efc55
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/949efc55
Branch: refs/heads/2.0-rc
Commit: 949efc5580f5507f548f935653c74deab9a8007b
Parents: af883ef
Author: Yang Li <li...@apache.org>
Authored: Tue Dec 15 06:36:17 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Tue Dec 15 06:36:17 2015 +0800
----------------------------------------------------------------------
.../common/util/MemoryBudgetController.java | 22 +++++++++
.../cube/inmemcubing/DoggedCubeBuilder.java | 9 +---
.../cube/inmemcubing/InMemCubeBuilder.java | 52 +++++++++-----------
.../kylin/gridtable/GTAggregateScanner.java | 14 +++++-
.../engine/mr/steps/InMemCuboidMapper.java | 2 +-
5 files changed, 60 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/949efc55/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java b/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
index d94f32b..37b0bd4 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
@@ -271,4 +271,26 @@ public class MemoryBudgetController {
return (int) (getSystemAvailBytes() / ONE_MB);
}
+
+ // protective estimate of memory usage, prefer overestimate rather than underestimate
+ public static class MemoryWaterLevel {
+ int lowAvail = Integer.MAX_VALUE;
+ int highAvail = Integer.MIN_VALUE;
+
+ public void markHigh() {
+ // get avail mem without gc
+ lowAvail = Math.min(lowAvail, MemoryBudgetController.getSystemAvailMB());
+ logger.info("Lower system avail " + lowAvail + " MB in markHigh()");
+ }
+
+ public void markLow() {
+ // get avail mem after gc
+ highAvail = Math.max(highAvail, MemoryBudgetController.gcAndGetSystemAvailMB());
+ logger.info("Higher system avail " + highAvail + " MB in markLow()");
+ }
+
+ public int getEstimateMB() {
+ return highAvail - lowAvail;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/949efc55/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
index 7f6e173..36a45e1 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
@@ -73,15 +73,10 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
}
private class BuildOnce {
- private int cutAheadMB;
BuildOnce() {
int systemAvailMB = MemoryBudgetController.getSystemAvailMB();
- // InMemCubeBuilder will over-estimate base cuboid size by a factor, must cut ahead at least the same factor
- cutAheadMB = (int) (systemAvailMB * InMemCubeBuilder.getAggrCacheOversizeFactor(cubeDesc));
- logger.info("Cut ahead MB is " + cutAheadMB);
-
int half = systemAvailMB / 2;
if (getReserveMemoryMB() > half) {
logger.info("Reserve " + getReserveMemoryMB() + " MB is more than half of system avail " + systemAvailMB + " MB, override to " + half);
@@ -264,8 +259,8 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
return true;
}
- if (systemAvailMB <= reserveMemoryMB + cutAheadMB) {
- logger.info("Split cut due to hitting memory threshold, system avail " + systemAvailMB + " MB <= reserve " + reserveMemoryMB + " MB + cut ahead " + cutAheadMB + " MB");
+ if (systemAvailMB <= reserveMemoryMB) {
+ logger.info("Split cut due to hitting memory threshold, system avail " + systemAvailMB + " MB <= reserve " + reserveMemoryMB + " MB");
return true;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/949efc55/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index 97c96fb..2cc298b 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -32,6 +32,7 @@ import org.apache.kylin.common.topn.Counter;
import org.apache.kylin.common.topn.TopNCounter;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
@@ -61,7 +62,10 @@ import com.google.common.collect.Lists;
public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
- private static final double BASE_CUBOID_CACHE_OVERSIZE_FACTOR = 0.15;
+
+ // by experience
+ private static final double DERIVE_AGGR_CACHE_CONSTANT_FACTOR = 0.1;
+ private static final double DERIVE_AGGR_CACHE_VARIABLE_FACTOR = 0.9;
private final CuboidScheduler cuboidScheduler;
private final long baseCuboidId;
@@ -72,6 +76,8 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
private final int measureCount;
private MemoryBudgetController memBudget;
+ private MemoryWaterLevel baseCuboidMemTracker;
+
private Thread[] taskThreads;
private Throwable[] taskThreadExceptions;
private TreeSet<CuboidTask> taskPending;
@@ -146,6 +152,9 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
long startTime = System.currentTimeMillis();
logger.info("In Mem Cube Build start, " + cubeDesc.getName());
+ baseCuboidMemTracker = new MemoryWaterLevel();
+ baseCuboidMemTracker.markLow();
+
// multiple threads to compute cuboid in parallel
taskPending = new TreeSet<CuboidTask>();
taskCuboidCompleted.set(0);
@@ -160,6 +169,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
return;
// plan memory budget
+ baseCuboidMemTracker.markLow();
makeMemoryBudget();
// kick off N-D cuboid tasks and output
@@ -299,6 +309,8 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
}
private void makeMemoryBudget() {
+ baseResult.aggrCacheMB = Math.max(baseCuboidMemTracker.getEstimateMB(), 10); // 10 MB at minimal
+ logger.info("Base cuboid aggr cache is " + baseResult.aggrCacheMB + " MB");
int systemAvailMB = getSystemAvailMB();
logger.info("System avail " + systemAvailMB + " MB");
int reserve = reserveMemoryMB;
@@ -308,7 +320,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
if (budget < baseResult.aggrCacheMB) {
// make sure we have base aggr cache as minimal
budget = baseResult.aggrCacheMB;
- logger.warn("!!! System avail memory (" + systemAvailMB + " MB) is less than base aggr cache (" + baseResult.aggrCacheMB + " MB) + minimal reservation (" + reserve + " MB), consider increase JVM heap -Xmx");
+ logger.warn("System avail memory (" + systemAvailMB + " MB) is less than base aggr cache (" + baseResult.aggrCacheMB + " MB) + minimal reservation (" + reserve + " MB), consider increase JVM heap -Xmx");
}
logger.info("Memory Budget is " + budget + " MB");
@@ -316,11 +328,8 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
}
private CuboidResult createBaseCuboid(BlockingQueue<List<String>> input) throws IOException {
- int mbBefore = getSystemAvailMB();
- int mbAfter = 0;
-
long startTime = System.currentTimeMillis();
- logger.info("Calculating base cuboid " + baseCuboidId + ", system avail " + mbBefore + " MB");
+ logger.info("Calculating base cuboid " + baseCuboidId);
GridTable baseCuboid = newGridTableByCuboidID(baseCuboidId);
GTBuilder baseBuilder = baseCuboid.rebuild();
@@ -329,11 +338,12 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
Pair<ImmutableBitSet, ImmutableBitSet> dimensionMetricsBitSet = InMemCubeBuilderUtils.getDimensionAndMetricColumnBitSet(baseCuboidId, measureCount);
GTScanRequest req = new GTScanRequest(baseCuboid.getInfo(), null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null);
GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req, true);
+ aggregationScanner.trackMemoryLevel(baseCuboidMemTracker);
int count = 0;
for (GTRecord r : aggregationScanner) {
- if (mbAfter == 0) {
- mbAfter = getSystemAvailMB();
+ if (count == 0) {
+ baseCuboidMemTracker.markHigh();
}
baseBuilder.write(r);
count++;
@@ -344,31 +354,16 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
long timeSpent = System.currentTimeMillis() - startTime;
logger.info("Cuboid " + baseCuboidId + " has " + count + " rows, build takes " + timeSpent + "ms");
- int mbBaseAggrCacheOnHeap = mbAfter == 0 ? 0 : mbBefore - mbAfter;
int mbEstimateBaseAggrCache = (int) (aggregationScanner.getEstimateSizeOfAggrCache() / MemoryBudgetController.ONE_MB);
- int mbBaseAggrCache = (int) (mbBaseAggrCacheOnHeap * (1 + getAggrCacheOversizeFactor(cubeDesc)));
- mbBaseAggrCache = Math.max(mbBaseAggrCache, 10); // let it be at least 10 MB
- logger.info("Base aggr cache is " + mbBaseAggrCache + " MB (heap " + mbBaseAggrCacheOnHeap + " MB, estimate " + mbEstimateBaseAggrCache + " MB)");
+ logger.info("Wild esitmate of base aggr cache is " + mbEstimateBaseAggrCache + " MB");
- return updateCuboidResult(baseCuboidId, baseCuboid, count, timeSpent, mbBaseAggrCache);
+ return updateCuboidResult(baseCuboidId, baseCuboid, count, timeSpent, 0);
}
-
- // Aggregation cache need to be oversized such that spawned thread not only has memory for
- // aggregation cache but also has enough for temporary measures and others.
- public static double getAggrCacheOversizeFactor(CubeDesc cubeDesc) {
- double r = BASE_CUBOID_CACHE_OVERSIZE_FACTOR;
- for (MeasureDesc m : cubeDesc.getMeasures()) {
- if (m.getFunction().isCountDistinct())
- r += BASE_CUBOID_CACHE_OVERSIZE_FACTOR;
- }
- return r;
- }
-
-
private CuboidResult updateCuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) {
- if (aggrCacheMB <= 0) {
- aggrCacheMB = (int) Math.ceil(1.0 * nRows / baseResult.nRows * baseResult.aggrCacheMB);
+ if (aggrCacheMB <= 0 && baseResult != null) {
+ aggrCacheMB = (int) Math.ceil( //
+ (DERIVE_AGGR_CACHE_CONSTANT_FACTOR + DERIVE_AGGR_CACHE_VARIABLE_FACTOR * nRows / baseResult.nRows) * baseResult.aggrCacheMB);
}
CuboidResult result = new CuboidResult(cuboidId, table, nRows, timeSpent, aggrCacheMB);
@@ -552,6 +547,5 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
public int getScannedRowCount() {
return 0;
}
-
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/949efc55/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index cd2881e..0ed320c 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -10,6 +10,7 @@ import java.util.SortedMap;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
import org.apache.kylin.metadata.measure.MeasureAggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -21,7 +22,6 @@ public class GTAggregateScanner implements IGTScanner {
@SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(GTAggregateScanner.class);
- private int aggregatedRowCount = 0;
final GTInfo info;
final ImmutableBitSet dimensions; // dimensions to return, can be more than group by
@@ -32,6 +32,9 @@ public class GTAggregateScanner implements IGTScanner {
final AggregationCache aggrCache;
final boolean enableMemCheck;
+ private int aggregatedRowCount = 0;
+ private MemoryWaterLevel memTracker;
+
public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req, boolean enableMemCheck) {
if (req.hasAggregation() == false)
throw new IllegalStateException();
@@ -46,6 +49,10 @@ public class GTAggregateScanner implements IGTScanner {
this.enableMemCheck = enableMemCheck;
}
+ public void trackMemoryLevel(MemoryWaterLevel tracker) {
+ this.memTracker = tracker;
+ }
+
@Override
public GTInfo getInfo() {
return info;
@@ -141,7 +148,10 @@ public class GTAggregateScanner implements IGTScanner {
}
void aggregate(GTRecord r) {
- if (enableMemCheck && (++aggregatedRowCount % 100000 == 0)) {
+ if (enableMemCheck && (++aggregatedRowCount % 1000 == 0)) {
+ if (memTracker != null) {
+ memTracker.markHigh();
+ }
long estimated = estimatedMemSize();
if (estimated > 10 * MemoryBudgetController.ONE_GB) {
throw new RuntimeException("AggregationCache exceed 10GB, estimated size is: " + estimated);
http://git-wip-us.apache.org/repos/asf/kylin/blob/949efc55/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index d724c76..1e29066 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -46,7 +46,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
private IMRTableInputFormat flatTableInputFormat;
private int counter;
- private BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(10000);
+ private BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(64);
private Future<?> future;
@Override