You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/05/31 15:55:02 UTC
incubator-kylin git commit: KYLIN-802,
Let InMemCubeBuilder build multiple cuboids in parallel
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8.0 b4b108842 -> 8a80c17b9
KYLIN-802, Let InMemCubeBuilder build multiple cuboids in parallel
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8a80c17b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8a80c17b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8a80c17b
Branch: refs/heads/0.8.0
Commit: 8a80c17b9e4f37fda3040af60782f67a53fec42a
Parents: b4b1088
Author: Yang Li <li...@apache.org>
Authored: Sun May 31 21:54:52 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sun May 31 21:54:52 2015 +0800
----------------------------------------------------------------------
.../common/util/MemoryBudgetController.java | 144 ------------
common/src/main/resources/log4j.properties | 2 +-
.../common/util/MemoryBudgetControllerTest.java | 60 -----
.../job/inmemcubing/InMemCubeBuilderTest.java | 6 +-
.../kylin/storage/cube/CubeGridTable.java | 1 +
.../apache/kylin/storage/gridtable/GTInfo.java | 11 +
.../gridtable/memstore/GTMemDiskStore.java | 19 +-
.../memstore/MemoryBudgetController.java | 233 +++++++++++++++++++
.../gridtable/AggregationCacheMemSizeTest.java | 2 +-
.../storage/gridtable/GTMemDiskStoreTest.java | 2 +-
.../gridtable/MemoryBudgetControllerTest.java | 81 +++++++
11 files changed, 343 insertions(+), 218 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8a80c17b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
deleted file mode 100644
index 82e503c..0000000
--- a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package org.apache.kylin.common.util;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MemoryBudgetController {
-
- public static interface MemoryConsumer {
- // return number MB released
- int freeUp(int mb);
- }
-
- @SuppressWarnings("serial")
- public static class NotEnoughBudgetException extends IllegalStateException {
- }
-
- private static class ConsumerEntry {
- final MemoryConsumer consumer;
- int reservedMB;
-
- ConsumerEntry(MemoryConsumer consumer) {
- this.consumer = consumer;
- }
- }
-
- public static final MemoryBudgetController ZERO_BUDGET = new MemoryBudgetController(0);
- public static final int ONE_MB = 1024 * 1024;
-
- private static final Logger logger = LoggerFactory.getLogger(MemoryBudgetController.class);
-
- // all budget numbers are in MB
- private final int totalBudgetMB;
- private final AtomicInteger totalReservedMB;
- private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
-
- public MemoryBudgetController(int totalBudgetMB) {
- if (totalBudgetMB < 0)
- throw new IllegalArgumentException();
- if (totalBudgetMB > 0 && checkSystemAvailMB(totalBudgetMB) == false)
- throw new IllegalStateException();
-
- this.totalBudgetMB = totalBudgetMB;
- this.totalReservedMB = new AtomicInteger();
- }
-
- public int getTotalBudgetMB() {
- return totalBudgetMB;
- }
-
- public int getTotalReservedMB() {
- return totalReservedMB.get();
- }
-
- public int getRemainingBudgetMB() {
- return totalBudgetMB - totalReservedMB.get();
- }
-
- public void reserve(MemoryConsumer consumer, int requestMB) {
- if (totalBudgetMB == 0 && requestMB > 0)
- throw new NotEnoughBudgetException();
-
- ConsumerEntry entry = booking.get(consumer);
- if (entry == null) {
- booking.putIfAbsent(consumer, new ConsumerEntry(consumer));
- entry = booking.get(consumer);
- }
-
- int delta = requestMB - entry.reservedMB;
-
- if (delta > 0) {
- checkFreeMemoryAndUpdateBooking(entry, delta);
- } else {
- updateBooking(entry, delta);
- }
- }
-
- synchronized private void updateBooking(ConsumerEntry entry, int delta) {
- totalReservedMB.addAndGet(delta);
- entry.reservedMB += delta;
- if (entry.reservedMB == 0) {
- booking.remove(entry.consumer);
- }
- if (delta < 0) {
- this.notify();
- }
- if (delta != 0) {
- logger.debug(entry.consumer + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB");
- }
- }
-
- private void checkFreeMemoryAndUpdateBooking(ConsumerEntry consumer, int delta) {
- while (true) {
- // if budget is not enough, try free up
- while (delta > totalBudgetMB - totalReservedMB.get()) {
- int freeUpToGo = delta;
- for (ConsumerEntry entry : booking.values()) {
- int mb = entry.consumer.freeUp(freeUpToGo);
- updateBooking(entry, -mb);
- freeUpToGo -= mb;
- if (freeUpToGo <= 0)
- break;
- }
- if (freeUpToGo > 0)
- throw new NotEnoughBudgetException();
- }
-
- if (checkSystemAvailMB(delta))
- break;
-
- try {
- synchronized (this) {
- logger.debug("Remaining budget is " + getRemainingBudgetMB() + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong.");
- this.wait(200);
- }
- } catch (InterruptedException e) {
- logger.error("Interrupted while wait free memory", e);
- }
- }
-
- updateBooking(consumer, delta);
- }
-
- private boolean checkSystemAvailMB(int mb) {
- return getSystemAvailMB() >= mb;
- }
-
- public static long getSystemAvailBytes() {
- Runtime runtime = Runtime.getRuntime();
- long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
- long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
- long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
- long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
- long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
- return availableMemory;
- }
-
- public static int getSystemAvailMB() {
- return (int) (getSystemAvailBytes() / ONE_MB);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8a80c17b/common/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/log4j.properties b/common/src/main/resources/log4j.properties
index 616f3a2..bfd65cf 100644
--- a/common/src/main/resources/log4j.properties
+++ b/common/src/main/resources/log4j.properties
@@ -3,7 +3,7 @@ log4j.rootLogger=INFO,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=L4J [%d{yyyy-MM-dd HH:mm:ss,SSS}][%p][%c] - %m%n
+log4j.appender.stdout.layout.ConversionPattern=L4J [%d{HH:mm:ss,SSS}][%t][%p][%C{1}] - %m%n
#log4j.logger.org.apache.hadoop=ERROR
log4j.logger.org.apache.kylin=DEBUG
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8a80c17b/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java b/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java
deleted file mode 100644
index a523263..0000000
--- a/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.apache.kylin.common.util;
-
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-
-import org.apache.kylin.common.util.MemoryBudgetController;
-import org.junit.Test;
-
-public class MemoryBudgetControllerTest {
-
- @Test
- public void test() {
- int n = MemoryBudgetController.getSystemAvailMB() / 2;
- MemoryBudgetController mbc = new MemoryBudgetController(n);
-
- ArrayList<OneMB> mbList = new ArrayList<OneMB>();
- for (int i = 0; i < n; i++) {
- mbList.add(new OneMB(mbc));
- assertEquals(mbList.size(), mbc.getTotalReservedMB());
- }
-
- mbc.reserve(new OneMB(), n);
-
- for (int i = 0; i < n; i++) {
- assertEquals(null, mbList.get(i).data);
- }
-
- try {
- mbc.reserve(new OneMB(), 1);
- fail();
- } catch (IllegalStateException ex) {
- // expected
- }
- }
-
- class OneMB implements MemoryBudgetController.MemoryConsumer {
-
- byte[] data;
-
- OneMB() {
- }
-
- OneMB(MemoryBudgetController mbc) {
- mbc.reserve(this, 1);
- data = new byte[MemoryBudgetController.ONE_MB - 24]; // 24 is object shell of this + object shell of data + reference of data
- }
-
- @Override
- public int freeUp(int mb) {
- if (data != null) {
- data = null;
- return 1;
- } else {
- return 0;
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8a80c17b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
index 0a0c97c..4e94ffa 100644
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
@@ -75,6 +75,9 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
@Test
public void test() throws Exception {
+ final int inputRows = 70000;
+ final int threads = 4;
+
final CubeInstance cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
final String flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
@@ -82,10 +85,11 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube.getDescriptor(), dictionaryMap, new ConsoleGTRecordWriter());
+ cubeBuilder.setConcurrentThreads(threads);
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<?> future = executorService.submit(cubeBuilder);
- feedData(cube, flatTable, queue, 60000);
+ feedData(cube, flatTable, queue, inputRows);
try {
future.get();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8a80c17b/storage/src/main/java/org/apache/kylin/storage/cube/CubeGridTable.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeGridTable.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeGridTable.java
index bc33213..89bde96 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeGridTable.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeGridTable.java
@@ -62,6 +62,7 @@ public class CubeGridTable {
}
GTInfo.Builder builder = GTInfo.builder();
+ builder.setTableName("Cuboid " + cuboidId);
builder.setCodeSystem(new CubeCodeSystem(dictionaryByColIdx, fixLenByColIdx));
builder.setColumns(mapping.getDataTypes());
builder.setPrimaryKey(mapping.getPrimaryKey());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8a80c17b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
index 6c9968f..5892296 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
@@ -14,6 +14,7 @@ public class GTInfo {
return new Builder();
}
+ String tableName;
IGTCodeSystem codeSystem;
// column schema
@@ -36,6 +37,10 @@ public class GTInfo {
private GTInfo() {
}
+ public String getTableName() {
+ return tableName;
+ }
+
public IGTCodeSystem getCodeSystem() {
return codeSystem;
}
@@ -181,6 +186,12 @@ public class GTInfo {
this.info = new GTInfo();
}
+ /** optional */
+ public Builder setTableName(String name) {
+ info.tableName = name;
+ return this;
+ }
+
/** required */
public Builder setCodeSystem(IGTCodeSystem cs) {
info.codeSystem = cs;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8a80c17b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
index 0b2c0a4..45a51ee 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
@@ -1,6 +1,6 @@
package org.apache.kylin.storage.gridtable.memstore;
-import static org.apache.kylin.common.util.MemoryBudgetController.*;
+import static org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController.*;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@@ -17,24 +17,23 @@ import java.nio.file.StandardOpenOption;
import java.util.BitSet;
import java.util.NoSuchElementException;
-import org.apache.kylin.common.util.MemoryBudgetController;
-import org.apache.kylin.common.util.MemoryBudgetController.MemoryConsumer;
-import org.apache.kylin.common.util.MemoryBudgetController.NotEnoughBudgetException;
import org.apache.kylin.storage.gridtable.GTInfo;
import org.apache.kylin.storage.gridtable.GTRecord;
import org.apache.kylin.storage.gridtable.GTRowBlock;
import org.apache.kylin.storage.gridtable.GTScanRequest;
import org.apache.kylin.storage.gridtable.IGTStore;
+import org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController.MemoryConsumer;
+import org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController.NotEnoughBudgetException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class GTMemDiskStore implements IGTStore, Closeable {
private static final Logger logger = LoggerFactory.getLogger(GTMemDiskStore.class);
- private static final boolean debug = false;
+ private static final boolean debug = true;
private static final int STREAM_BUFFER_SIZE = 8192;
- private static final int MEM_CHUNK_SIZE_MB = 2;
+ private static final int MEM_CHUNK_SIZE_MB = 5;
private final GTInfo info;
private final Object lock; // all public methods that read/write object states are synchronized on this lock
@@ -111,7 +110,7 @@ public class GTMemDiskStore implements IGTStore, Closeable {
@Override
public String toString() {
- return "MemDiskStore@" + this.hashCode();
+ return "MemDiskStore@" + (info.getTableName() == null ? this.hashCode() : info.getTableName());
}
private class Reader implements IGTStoreScanner {
@@ -481,8 +480,8 @@ public class GTMemDiskStore implements IGTStore, Closeable {
data = null;
synchronized (lock) {
asyncFlushDiskOffset += flushedLen; // bytes written in last loop
- if (debug)
- logger.debug(GTMemDiskStore.this + " async flush @ " + asyncFlushDiskOffset);
+ // if (debug)
+ // logger.debug(GTMemDiskStore.this + " async flush @ " + asyncFlushDiskOffset);
if (asyncFlushChunk != null && asyncFlushChunk.tailOffset() == asyncFlushDiskOffset) {
asyncFlushChunk = asyncFlushChunk.next;
}
@@ -556,7 +555,7 @@ public class GTMemDiskStore implements IGTStore, Closeable {
synchronized (lock) {
if (asyncFlushException != null)
throwAsyncException(asyncFlushException);
-
+
clear();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8a80c17b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemoryBudgetController.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemoryBudgetController.java
new file mode 100644
index 0000000..c4efa75
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemoryBudgetController.java
@@ -0,0 +1,233 @@
+package org.apache.kylin.storage.gridtable.memstore;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemoryBudgetController {
+
+ private static final boolean debug = true;
+
+ public static interface MemoryConsumer {
+ // return number MB released
+ int freeUp(int mb);
+ }
+
+ @SuppressWarnings("serial")
+ public static class NotEnoughBudgetException extends IllegalStateException {
+
+ public NotEnoughBudgetException() {
+ super();
+ }
+
+ public NotEnoughBudgetException(Throwable cause) {
+ super(cause);
+ }
+ }
+
+ private static class ConsumerEntry {
+ final MemoryConsumer consumer;
+ int reservedMB;
+
+ ConsumerEntry(MemoryConsumer consumer) {
+ this.consumer = consumer;
+ }
+ }
+
+ public static final MemoryBudgetController ZERO_BUDGET = new MemoryBudgetController(0);
+ public static final int ONE_MB = 1024 * 1024;
+
+ private static final Logger logger = LoggerFactory.getLogger(MemoryBudgetController.class);
+
+ // all budget numbers are in MB
+ private final int totalBudgetMB;
+ private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
+ private int totalReservedMB;
+ private final ReentrantLock lock = new ReentrantLock();
+
+ public MemoryBudgetController(int totalBudgetMB) {
+ if (totalBudgetMB < 0)
+ throw new IllegalArgumentException();
+ if (totalBudgetMB > getSystemAvailMB())
+ throw new IllegalStateException();
+
+ this.totalBudgetMB = totalBudgetMB;
+ this.totalReservedMB = 0;
+ }
+
+ public int getTotalBudgetMB() {
+ return totalBudgetMB;
+ }
+
+ public int getTotalReservedMB() {
+ lock.lock();
+ try {
+ return totalReservedMB;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public int getRemainingBudgetMB() {
+ lock.lock();
+ try {
+ return totalBudgetMB - totalReservedMB;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void reserveInsist(MemoryConsumer consumer, int requestMB) {
+ long waitStart = 0;
+ while (true) {
+ try {
+ reserve(consumer, requestMB);
+ if (debug && waitStart > 0)
+ logger.debug(consumer + " waited " + (System.currentTimeMillis() - waitStart) + " ms on the " + requestMB + " MB request");
+ return;
+ } catch (NotEnoughBudgetException ex) {
+ // retry
+ }
+
+ if (waitStart == 0)
+ waitStart = System.currentTimeMillis();
+
+ synchronized (lock) {
+ try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ throw new NotEnoughBudgetException(e);
+ }
+ }
+ }
+ }
+
+ /** reserve without wait, fail with NotEnoughBudgetException immediately if no mem */
+ public void reserve(MemoryConsumer consumer, int requestMB) {
+ if (totalBudgetMB == 0 && requestMB > 0)
+ throw new NotEnoughBudgetException();
+
+ boolean ok = false;
+ while (!ok) {
+ int gap = calculateGap(consumer, requestMB);
+ if (gap > 0) {
+ // to void deadlock, don't hold lock when invoking consumer.freeUp()
+ tryFreeUp(gap);
+ }
+ ok = updateBooking(consumer, requestMB);
+ }
+ }
+
+ private int calculateGap(MemoryConsumer consumer, int requestMB) {
+ lock.lock();
+ try {
+ ConsumerEntry entry = booking.get(consumer);
+ int curMB = entry == null ? 0 : entry.reservedMB;
+ int delta = requestMB - curMB;
+ return delta - (totalBudgetMB - totalReservedMB);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void tryFreeUp(int gap) {
+ // note don't hold lock when calling consumer.freeUp(), that method holding lock for itself and may cause deadlock
+ for (ConsumerEntry entry : booking.values()) {
+ int mb = entry.consumer.freeUp(gap);
+ if (mb > 0) {
+ lock.lock();
+ try {
+ updateBookingWithDelta(entry.consumer, -mb);
+ } finally {
+ lock.unlock();
+ }
+ gap -= mb;
+ if (gap <= 0)
+ break;
+ }
+ }
+ if (gap > 0)
+ throw new NotEnoughBudgetException();
+
+ if (debug) {
+ if (getSystemAvailMB() < getRemainingBudgetMB()) {
+ logger.debug("Remaining budget is " + getRemainingBudgetMB() + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong.");
+ }
+ }
+ }
+
+ private boolean updateBooking(MemoryConsumer consumer, int requestMB) {
+ lock.lock();
+ try {
+ ConsumerEntry entry = booking.get(consumer);
+ if (entry == null) {
+ if (requestMB == 0)
+ return true;
+
+ entry = new ConsumerEntry(consumer);
+ booking.put(consumer, entry);
+ }
+
+ int delta = requestMB - entry.reservedMB;
+ return updateBookingWithDelta(consumer, delta);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ // lock MUST be obtained before entering
+ private boolean updateBookingWithDelta(MemoryConsumer consumer, int delta) {
+ if (delta == 0)
+ return true;
+
+ ConsumerEntry entry = booking.get(consumer);
+ if (entry == null) {
+ if (delta <= 0)
+ return true;
+
+ entry = new ConsumerEntry(consumer);
+ booking.put(consumer, entry);
+ }
+
+ // double check gap again, it may be changed by other concurrent requests
+ if (delta > 0) {
+ int gap = delta - (totalBudgetMB - totalReservedMB);
+ if (gap > 0)
+ return false;
+ }
+
+ totalReservedMB += delta;
+ entry.reservedMB += delta;
+ if (entry.reservedMB == 0) {
+ booking.remove(entry.consumer);
+ }
+ if (debug) {
+ logger.debug(entry.consumer + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB");
+ }
+
+ if (delta < 0) {
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ }
+
+ return true;
+ }
+
+ public static long getSystemAvailBytes() {
+ Runtime runtime = Runtime.getRuntime();
+ long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
+ long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
+ long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
+ long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
+ long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
+ return availableMemory;
+ }
+
+ public static int getSystemAvailMB() {
+ return (int) (getSystemAvailBytes() / ONE_MB);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8a80c17b/storage/src/test/java/org/apache/kylin/storage/gridtable/AggregationCacheMemSizeTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/AggregationCacheMemSizeTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/AggregationCacheMemSizeTest.java
index 52c092e..2cafde0 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/AggregationCacheMemSizeTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/AggregationCacheMemSizeTest.java
@@ -10,12 +10,12 @@ import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.MemoryBudgetController;
import org.apache.kylin.metadata.measure.BigDecimalSumAggregator;
import org.apache.kylin.metadata.measure.DoubleSumAggregator;
import org.apache.kylin.metadata.measure.HLLCAggregator;
import org.apache.kylin.metadata.measure.LongSumAggregator;
import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController;
import org.junit.Test;
/** Note: Execute each test alone to get accurate size estimate. */
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8a80c17b/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
index b7678b8..4441687 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
@@ -5,8 +5,8 @@ import static org.junit.Assert.*;
import java.io.IOException;
import java.util.List;
-import org.apache.kylin.common.util.MemoryBudgetController;
import org.apache.kylin.storage.gridtable.memstore.GTMemDiskStore;
+import org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController;
import org.junit.Test;
public class GTMemDiskStoreTest {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8a80c17b/storage/src/test/java/org/apache/kylin/storage/gridtable/MemoryBudgetControllerTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/MemoryBudgetControllerTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/MemoryBudgetControllerTest.java
new file mode 100644
index 0000000..69b3bb9
--- /dev/null
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/MemoryBudgetControllerTest.java
@@ -0,0 +1,81 @@
+package org.apache.kylin.storage.gridtable;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+
+import org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController;
+import org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController.NotEnoughBudgetException;
+import org.junit.Test;
+
+public class MemoryBudgetControllerTest {
+
+ @Test
+ public void test() {
+ final int n = MemoryBudgetController.getSystemAvailMB() / 2;
+ final MemoryBudgetController mbc = new MemoryBudgetController(n);
+
+ ArrayList<Consumer> mbList = new ArrayList<Consumer>();
+ for (int i = 0; i < n; i++) {
+ mbList.add(new Consumer(mbc));
+ assertEquals(mbList.size(), mbc.getTotalReservedMB());
+ }
+
+ // a's reservation will free up all the previous
+ final Consumer a = new Consumer();
+ mbc.reserve(a, n);
+ for (int i = 0; i < n; i++) {
+ assertEquals(null, mbList.get(i).data);
+ }
+
+ // cancel a in 2 seconds
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ mbc.reserve(a, 0);
+ }
+ }.start();
+
+ // b will success after some wait
+ long bWaitStart = System.currentTimeMillis();
+ final Consumer b = new Consumer();
+ mbc.reserveInsist(b, n);
+ assertTrue(System.currentTimeMillis() - bWaitStart > 1000);
+
+ try {
+ mbc.reserve(a, 1);
+ fail();
+ } catch (NotEnoughBudgetException ex) {
+ // expected
+ }
+ }
+
+ class Consumer implements MemoryBudgetController.MemoryConsumer {
+
+ byte[] data;
+
+ Consumer() {
+ }
+
+ Consumer(MemoryBudgetController mbc) {
+ mbc.reserve(this, 1);
+ data = new byte[MemoryBudgetController.ONE_MB - 24]; // 24 is object shell of this + object shell of data + reference of data
+ }
+
+ @Override
+ public int freeUp(int mb) {
+ if (data != null) {
+ data = null;
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+
+ }
+}