You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/05/31 15:55:02 UTC

incubator-kylin git commit: KYLIN-802, Let InMemCubeBuilder build multiple cuboids in parallel

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 b4b108842 -> 8a80c17b9


KYLIN-802, Let InMemCubeBuilder build multiple cuboids in parallel


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8a80c17b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8a80c17b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8a80c17b

Branch: refs/heads/0.8.0
Commit: 8a80c17b9e4f37fda3040af60782f67a53fec42a
Parents: b4b1088
Author: Yang Li <li...@apache.org>
Authored: Sun May 31 21:54:52 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sun May 31 21:54:52 2015 +0800

----------------------------------------------------------------------
 .../common/util/MemoryBudgetController.java     | 144 ------------
 common/src/main/resources/log4j.properties      |   2 +-
 .../common/util/MemoryBudgetControllerTest.java |  60 -----
 .../job/inmemcubing/InMemCubeBuilderTest.java   |   6 +-
 .../kylin/storage/cube/CubeGridTable.java       |   1 +
 .../apache/kylin/storage/gridtable/GTInfo.java  |  11 +
 .../gridtable/memstore/GTMemDiskStore.java      |  19 +-
 .../memstore/MemoryBudgetController.java        | 233 +++++++++++++++++++
 .../gridtable/AggregationCacheMemSizeTest.java  |   2 +-
 .../storage/gridtable/GTMemDiskStoreTest.java   |   2 +-
 .../gridtable/MemoryBudgetControllerTest.java   |  81 +++++++
 11 files changed, 343 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8a80c17b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
deleted file mode 100644
index 82e503c..0000000
--- a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package org.apache.kylin.common.util;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MemoryBudgetController {
-
-    public static interface MemoryConsumer {
-        // return number MB released
-        int freeUp(int mb);
-    }
-
-    @SuppressWarnings("serial")
-    public static class NotEnoughBudgetException extends IllegalStateException {
-    }
-
-    private static class ConsumerEntry {
-        final MemoryConsumer consumer;
-        int reservedMB;
-
-        ConsumerEntry(MemoryConsumer consumer) {
-            this.consumer = consumer;
-        }
-    }
-
-    public static final MemoryBudgetController ZERO_BUDGET = new MemoryBudgetController(0);
-    public static final int ONE_MB = 1024 * 1024;
-
-    private static final Logger logger = LoggerFactory.getLogger(MemoryBudgetController.class);
-
-    // all budget numbers are in MB
-    private final int totalBudgetMB;
-    private final AtomicInteger totalReservedMB;
-    private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
-
-    public MemoryBudgetController(int totalBudgetMB) {
-        if (totalBudgetMB < 0)
-            throw new IllegalArgumentException();
-        if (totalBudgetMB > 0 && checkSystemAvailMB(totalBudgetMB) == false)
-            throw new IllegalStateException();
-
-        this.totalBudgetMB = totalBudgetMB;
-        this.totalReservedMB = new AtomicInteger();
-    }
-
-    public int getTotalBudgetMB() {
-        return totalBudgetMB;
-    }
-
-    public int getTotalReservedMB() {
-        return totalReservedMB.get();
-    }
-
-    public int getRemainingBudgetMB() {
-        return totalBudgetMB - totalReservedMB.get();
-    }
-
-    public void reserve(MemoryConsumer consumer, int requestMB) {
-        if (totalBudgetMB == 0 && requestMB > 0)
-            throw new NotEnoughBudgetException();
-
-        ConsumerEntry entry = booking.get(consumer);
-        if (entry == null) {
-            booking.putIfAbsent(consumer, new ConsumerEntry(consumer));
-            entry = booking.get(consumer);
-        }
-
-        int delta = requestMB - entry.reservedMB;
-
-        if (delta > 0) {
-            checkFreeMemoryAndUpdateBooking(entry, delta);
-        } else {
-            updateBooking(entry, delta);
-        }
-    }
-
-    synchronized private void updateBooking(ConsumerEntry entry, int delta) {
-        totalReservedMB.addAndGet(delta);
-        entry.reservedMB += delta;
-        if (entry.reservedMB == 0) {
-            booking.remove(entry.consumer);
-        }
-        if (delta < 0) {
-            this.notify();
-        }
-        if (delta != 0) {
-            logger.debug(entry.consumer + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB");
-        }
-    }
-
-    private void checkFreeMemoryAndUpdateBooking(ConsumerEntry consumer, int delta) {
-        while (true) {
-            // if budget is not enough, try free up 
-            while (delta > totalBudgetMB - totalReservedMB.get()) {
-                int freeUpToGo = delta;
-                for (ConsumerEntry entry : booking.values()) {
-                    int mb = entry.consumer.freeUp(freeUpToGo);
-                    updateBooking(entry, -mb);
-                    freeUpToGo -= mb;
-                    if (freeUpToGo <= 0)
-                        break;
-                }
-                if (freeUpToGo > 0)
-                    throw new NotEnoughBudgetException();
-            }
-
-            if (checkSystemAvailMB(delta))
-                break;
-
-            try {
-                synchronized (this) {
-                    logger.debug("Remaining budget is " + getRemainingBudgetMB() + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong.");
-                    this.wait(200);
-                }
-            } catch (InterruptedException e) {
-                logger.error("Interrupted while wait free memory", e);
-            }
-        }
-
-        updateBooking(consumer, delta);
-    }
-
-    private boolean checkSystemAvailMB(int mb) {
-        return getSystemAvailMB() >= mb;
-    }
-
-    public static long getSystemAvailBytes() {
-        Runtime runtime = Runtime.getRuntime();
-        long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
-        long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
-        long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
-        long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
-        long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
-        return availableMemory;
-    }
-
-    public static int getSystemAvailMB() {
-        return (int) (getSystemAvailBytes() / ONE_MB);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8a80c17b/common/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/log4j.properties b/common/src/main/resources/log4j.properties
index 616f3a2..bfd65cf 100644
--- a/common/src/main/resources/log4j.properties
+++ b/common/src/main/resources/log4j.properties
@@ -3,7 +3,7 @@ log4j.rootLogger=INFO,stdout
 
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=L4J [%d{yyyy-MM-dd HH:mm:ss,SSS}][%p][%c] - %m%n
+log4j.appender.stdout.layout.ConversionPattern=L4J [%d{HH:mm:ss,SSS}][%t][%p][%C{1}] - %m%n
 
 #log4j.logger.org.apache.hadoop=ERROR
 log4j.logger.org.apache.kylin=DEBUG

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8a80c17b/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java b/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java
deleted file mode 100644
index a523263..0000000
--- a/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.apache.kylin.common.util;
-
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-
-import org.apache.kylin.common.util.MemoryBudgetController;
-import org.junit.Test;
-
-public class MemoryBudgetControllerTest {
-
-    @Test
-    public void test() {
-        int n = MemoryBudgetController.getSystemAvailMB() / 2;
-        MemoryBudgetController mbc = new MemoryBudgetController(n);
-
-        ArrayList<OneMB> mbList = new ArrayList<OneMB>();
-        for (int i = 0; i < n; i++) {
-            mbList.add(new OneMB(mbc));
-            assertEquals(mbList.size(), mbc.getTotalReservedMB());
-        }
-
-        mbc.reserve(new OneMB(), n);
-
-        for (int i = 0; i < n; i++) {
-            assertEquals(null, mbList.get(i).data);
-        }
-
-        try {
-            mbc.reserve(new OneMB(), 1);
-            fail();
-        } catch (IllegalStateException ex) {
-            // expected
-        }
-    }
-
-    class OneMB implements MemoryBudgetController.MemoryConsumer {
-
-        byte[] data;
-
-        OneMB() {
-        }
-
-        OneMB(MemoryBudgetController mbc) {
-            mbc.reserve(this, 1);
-            data = new byte[MemoryBudgetController.ONE_MB - 24]; // 24 is object shell of this + object shell of data + reference of data 
-        }
-
-        @Override
-        public int freeUp(int mb) {
-            if (data != null) {
-                data = null;
-                return 1;
-            } else {
-                return 0;
-            }
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8a80c17b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
index 0a0c97c..4e94ffa 100644
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilderTest.java
@@ -75,6 +75,9 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
 
     @Test
     public void test() throws Exception {
+        final int inputRows = 70000;
+        final int threads = 4;
+        
         final CubeInstance cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
         final String flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
 
@@ -82,10 +85,11 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
         ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
 
         InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(queue, cube.getDescriptor(), dictionaryMap, new ConsoleGTRecordWriter());
+        cubeBuilder.setConcurrentThreads(threads);
         ExecutorService executorService = Executors.newSingleThreadExecutor();
         Future<?> future = executorService.submit(cubeBuilder);
 
-        feedData(cube, flatTable, queue, 60000);
+        feedData(cube, flatTable, queue, inputRows);
 
         try {
             future.get();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8a80c17b/storage/src/main/java/org/apache/kylin/storage/cube/CubeGridTable.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeGridTable.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeGridTable.java
index bc33213..89bde96 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeGridTable.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeGridTable.java
@@ -62,6 +62,7 @@ public class CubeGridTable {
         }
 
         GTInfo.Builder builder = GTInfo.builder();
+        builder.setTableName("Cuboid " + cuboidId);
         builder.setCodeSystem(new CubeCodeSystem(dictionaryByColIdx, fixLenByColIdx));
         builder.setColumns(mapping.getDataTypes());
         builder.setPrimaryKey(mapping.getPrimaryKey());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8a80c17b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
index 6c9968f..5892296 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
@@ -14,6 +14,7 @@ public class GTInfo {
         return new Builder();
     }
 
+    String tableName;
     IGTCodeSystem codeSystem;
 
     // column schema
@@ -36,6 +37,10 @@ public class GTInfo {
     private GTInfo() {
     }
     
+    public String getTableName() {
+        return tableName;
+    }
+    
     public IGTCodeSystem getCodeSystem() {
         return codeSystem;
     }
@@ -181,6 +186,12 @@ public class GTInfo {
             this.info = new GTInfo();
         }
 
+        /** optional */
+        public Builder setTableName(String name) {
+            info.tableName = name;
+            return this;
+        }
+        
         /** required */
         public Builder setCodeSystem(IGTCodeSystem cs) {
             info.codeSystem = cs;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8a80c17b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
index 0b2c0a4..45a51ee 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
@@ -1,6 +1,6 @@
 package org.apache.kylin.storage.gridtable.memstore;
 
-import static org.apache.kylin.common.util.MemoryBudgetController.*;
+import static org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController.*;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
@@ -17,24 +17,23 @@ import java.nio.file.StandardOpenOption;
 import java.util.BitSet;
 import java.util.NoSuchElementException;
 
-import org.apache.kylin.common.util.MemoryBudgetController;
-import org.apache.kylin.common.util.MemoryBudgetController.MemoryConsumer;
-import org.apache.kylin.common.util.MemoryBudgetController.NotEnoughBudgetException;
 import org.apache.kylin.storage.gridtable.GTInfo;
 import org.apache.kylin.storage.gridtable.GTRecord;
 import org.apache.kylin.storage.gridtable.GTRowBlock;
 import org.apache.kylin.storage.gridtable.GTScanRequest;
 import org.apache.kylin.storage.gridtable.IGTStore;
+import org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController.MemoryConsumer;
+import org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController.NotEnoughBudgetException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class GTMemDiskStore implements IGTStore, Closeable {
 
     private static final Logger logger = LoggerFactory.getLogger(GTMemDiskStore.class);
-    private static final boolean debug = false;
+    private static final boolean debug = true;
 
     private static final int STREAM_BUFFER_SIZE = 8192;
-    private static final int MEM_CHUNK_SIZE_MB = 2;
+    private static final int MEM_CHUNK_SIZE_MB = 5;
 
     private final GTInfo info;
     private final Object lock; // all public methods that read/write object states are synchronized on this lock
@@ -111,7 +110,7 @@ public class GTMemDiskStore implements IGTStore, Closeable {
 
     @Override
     public String toString() {
-        return "MemDiskStore@" + this.hashCode();
+        return "MemDiskStore@" + (info.getTableName() == null ? this.hashCode() : info.getTableName());
     }
 
     private class Reader implements IGTStoreScanner {
@@ -481,8 +480,8 @@ public class GTMemDiskStore implements IGTStore, Closeable {
                 data = null;
                 synchronized (lock) {
                     asyncFlushDiskOffset += flushedLen; // bytes written in last loop
-                    if (debug)
-                        logger.debug(GTMemDiskStore.this + " async flush @ " + asyncFlushDiskOffset);
+                    //                    if (debug)
+                    //                        logger.debug(GTMemDiskStore.this + " async flush @ " + asyncFlushDiskOffset);
                     if (asyncFlushChunk != null && asyncFlushChunk.tailOffset() == asyncFlushDiskOffset) {
                         asyncFlushChunk = asyncFlushChunk.next;
                     }
@@ -556,7 +555,7 @@ public class GTMemDiskStore implements IGTStore, Closeable {
             synchronized (lock) {
                 if (asyncFlushException != null)
                     throwAsyncException(asyncFlushException);
-                
+
                 clear();
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8a80c17b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemoryBudgetController.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemoryBudgetController.java
new file mode 100644
index 0000000..c4efa75
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemoryBudgetController.java
@@ -0,0 +1,233 @@
+package org.apache.kylin.storage.gridtable.memstore;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemoryBudgetController {
+
+    private static final boolean debug = true;
+
+    public static interface MemoryConsumer {
+        // return number MB released
+        int freeUp(int mb);
+    }
+
+    @SuppressWarnings("serial")
+    public static class NotEnoughBudgetException extends IllegalStateException {
+
+        public NotEnoughBudgetException() {
+            super();
+        }
+
+        public NotEnoughBudgetException(Throwable cause) {
+            super(cause);
+        }
+    }
+
+    private static class ConsumerEntry {
+        final MemoryConsumer consumer;
+        int reservedMB;
+
+        ConsumerEntry(MemoryConsumer consumer) {
+            this.consumer = consumer;
+        }
+    }
+
+    public static final MemoryBudgetController ZERO_BUDGET = new MemoryBudgetController(0);
+    public static final int ONE_MB = 1024 * 1024;
+
+    private static final Logger logger = LoggerFactory.getLogger(MemoryBudgetController.class);
+
+    // all budget numbers are in MB
+    private final int totalBudgetMB;
+    private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
+    private int totalReservedMB;
+    private final ReentrantLock lock = new ReentrantLock();
+
+    public MemoryBudgetController(int totalBudgetMB) {
+        if (totalBudgetMB < 0)
+            throw new IllegalArgumentException();
+        if (totalBudgetMB > getSystemAvailMB())
+            throw new IllegalStateException();
+
+        this.totalBudgetMB = totalBudgetMB;
+        this.totalReservedMB = 0;
+    }
+
+    public int getTotalBudgetMB() {
+        return totalBudgetMB;
+    }
+
+    public int getTotalReservedMB() {
+        lock.lock();
+        try {
+            return totalReservedMB;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public int getRemainingBudgetMB() {
+        lock.lock();
+        try {
+            return totalBudgetMB - totalReservedMB;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void reserveInsist(MemoryConsumer consumer, int requestMB) {
+        long waitStart = 0;
+        while (true) {
+            try {
+                reserve(consumer, requestMB);
+                if (debug && waitStart > 0)
+                    logger.debug(consumer + " waited " + (System.currentTimeMillis() - waitStart) + " ms on the " + requestMB + " MB request");
+                return;
+            } catch (NotEnoughBudgetException ex) {
+                // retry
+            }
+
+            if (waitStart == 0)
+                waitStart = System.currentTimeMillis();
+
+            synchronized (lock) {
+                try {
+                    lock.wait();
+                } catch (InterruptedException e) {
+                    throw new NotEnoughBudgetException(e);
+                }
+            }
+        }
+    }
+
+    /** reserve without wait, fail with NotEnoughBudgetException immediately if no mem */
+    public void reserve(MemoryConsumer consumer, int requestMB) {
+        if (totalBudgetMB == 0 && requestMB > 0)
+            throw new NotEnoughBudgetException();
+
+        boolean ok = false;
+        while (!ok) {
+            int gap = calculateGap(consumer, requestMB);
+            if (gap > 0) {
+                // to void deadlock, don't hold lock when invoking consumer.freeUp()
+                tryFreeUp(gap);
+            }
+            ok = updateBooking(consumer, requestMB);
+        }
+    }
+
+    private int calculateGap(MemoryConsumer consumer, int requestMB) {
+        lock.lock();
+        try {
+            ConsumerEntry entry = booking.get(consumer);
+            int curMB = entry == null ? 0 : entry.reservedMB;
+            int delta = requestMB - curMB;
+            return delta - (totalBudgetMB - totalReservedMB);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void tryFreeUp(int gap) {
+        // note don't hold lock when calling consumer.freeUp(), that method holding lock for itself and may cause deadlock
+        for (ConsumerEntry entry : booking.values()) {
+            int mb = entry.consumer.freeUp(gap);
+            if (mb > 0) {
+                lock.lock();
+                try {
+                    updateBookingWithDelta(entry.consumer, -mb);
+                } finally {
+                    lock.unlock();
+                }
+                gap -= mb;
+                if (gap <= 0)
+                    break;
+            }
+        }
+        if (gap > 0)
+            throw new NotEnoughBudgetException();
+
+        if (debug) {
+            if (getSystemAvailMB() < getRemainingBudgetMB()) {
+                logger.debug("Remaining budget is " + getRemainingBudgetMB() + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong.");
+            }
+        }
+    }
+
+    private boolean updateBooking(MemoryConsumer consumer, int requestMB) {
+        lock.lock();
+        try {
+            ConsumerEntry entry = booking.get(consumer);
+            if (entry == null) {
+                if (requestMB == 0)
+                    return true;
+
+                entry = new ConsumerEntry(consumer);
+                booking.put(consumer, entry);
+            }
+
+            int delta = requestMB - entry.reservedMB;
+            return updateBookingWithDelta(consumer, delta);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    // lock MUST be obtained before entering
+    private boolean updateBookingWithDelta(MemoryConsumer consumer, int delta) {
+        if (delta == 0)
+            return true;
+
+        ConsumerEntry entry = booking.get(consumer);
+        if (entry == null) {
+            if (delta <= 0)
+                return true;
+
+            entry = new ConsumerEntry(consumer);
+            booking.put(consumer, entry);
+        }
+
+        // double check gap again, it may be changed by other concurrent requests
+        if (delta > 0) {
+            int gap = delta - (totalBudgetMB - totalReservedMB);
+            if (gap > 0)
+                return false;
+        }
+
+        totalReservedMB += delta;
+        entry.reservedMB += delta;
+        if (entry.reservedMB == 0) {
+            booking.remove(entry.consumer);
+        }
+        if (debug) {
+            logger.debug(entry.consumer + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB");
+        }
+
+        if (delta < 0) {
+            synchronized (lock) {
+                lock.notifyAll();
+            }
+        }
+
+        return true;
+    }
+
+    public static long getSystemAvailBytes() {
+        Runtime runtime = Runtime.getRuntime();
+        long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
+        long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
+        long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
+        long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
+        long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
+        return availableMemory;
+    }
+
+    public static int getSystemAvailMB() {
+        return (int) (getSystemAvailBytes() / ONE_MB);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8a80c17b/storage/src/test/java/org/apache/kylin/storage/gridtable/AggregationCacheMemSizeTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/AggregationCacheMemSizeTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/AggregationCacheMemSizeTest.java
index 52c092e..2cafde0 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/AggregationCacheMemSizeTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/AggregationCacheMemSizeTest.java
@@ -10,12 +10,12 @@ import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.MemoryBudgetController;
 import org.apache.kylin.metadata.measure.BigDecimalSumAggregator;
 import org.apache.kylin.metadata.measure.DoubleSumAggregator;
 import org.apache.kylin.metadata.measure.HLLCAggregator;
 import org.apache.kylin.metadata.measure.LongSumAggregator;
 import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController;
 import org.junit.Test;
 
 /** Note: Execute each test alone to get accurate size estimate. */

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8a80c17b/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
index b7678b8..4441687 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
@@ -5,8 +5,8 @@ import static org.junit.Assert.*;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.kylin.common.util.MemoryBudgetController;
 import org.apache.kylin.storage.gridtable.memstore.GTMemDiskStore;
+import org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController;
 import org.junit.Test;
 
 public class GTMemDiskStoreTest {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8a80c17b/storage/src/test/java/org/apache/kylin/storage/gridtable/MemoryBudgetControllerTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/MemoryBudgetControllerTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/MemoryBudgetControllerTest.java
new file mode 100644
index 0000000..69b3bb9
--- /dev/null
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/MemoryBudgetControllerTest.java
@@ -0,0 +1,81 @@
+package org.apache.kylin.storage.gridtable;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+
+import org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController;
+import org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController.NotEnoughBudgetException;
+import org.junit.Test;
+
+public class MemoryBudgetControllerTest {
+
+    @Test
+    public void test() {
+        final int n = MemoryBudgetController.getSystemAvailMB() / 2;
+        final MemoryBudgetController mbc = new MemoryBudgetController(n);
+
+        ArrayList<Consumer> mbList = new ArrayList<Consumer>();
+        for (int i = 0; i < n; i++) {
+            mbList.add(new Consumer(mbc));
+            assertEquals(mbList.size(), mbc.getTotalReservedMB());
+        }
+
+        // a's reservation will free up all the previous
+        final Consumer a = new Consumer();
+        mbc.reserve(a, n);
+        for (int i = 0; i < n; i++) {
+            assertEquals(null, mbList.get(i).data);
+        }
+        
+        // cancel a in 2 seconds
+        new Thread() {
+            @Override
+            public void run() {
+                try {
+                    Thread.sleep(2000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+                mbc.reserve(a, 0);
+            }
+        }.start();
+        
+        // b will success after some wait
+        long bWaitStart = System.currentTimeMillis();
+        final Consumer b = new Consumer();
+        mbc.reserveInsist(b, n);
+        assertTrue(System.currentTimeMillis() - bWaitStart > 1000);
+
+        try {
+            mbc.reserve(a, 1);
+            fail();
+        } catch (NotEnoughBudgetException ex) {
+            // expected
+        }
+    }
+
+    class Consumer implements MemoryBudgetController.MemoryConsumer {
+
+        byte[] data;
+
+        Consumer() {
+        }
+
+        Consumer(MemoryBudgetController mbc) {
+            mbc.reserve(this, 1);
+            data = new byte[MemoryBudgetController.ONE_MB - 24]; // 24 is object shell of this + object shell of data + reference of data 
+        }
+
+        @Override
+        public int freeUp(int mb) {
+            if (data != null) {
+                data = null;
+                return 1;
+            } else {
+                return 0;
+            }
+        }
+
+    }
+}