You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/06/01 15:19:45 UTC

[1/2] incubator-kylin git commit: KYLIN-806, refactor in-mem cube classes to under job.inmemcubing package

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 731f33b0c -> 765dfca8c


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/storage/src/test/java/org/apache/kylin/storage/gridtable/MemoryBudgetControllerTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/MemoryBudgetControllerTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/MemoryBudgetControllerTest.java
deleted file mode 100644
index 69b3bb9..0000000
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/MemoryBudgetControllerTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-
-import org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController;
-import org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController.NotEnoughBudgetException;
-import org.junit.Test;
-
-public class MemoryBudgetControllerTest {
-
-    @Test
-    public void test() {
-        final int n = MemoryBudgetController.getSystemAvailMB() / 2;
-        final MemoryBudgetController mbc = new MemoryBudgetController(n);
-
-        ArrayList<Consumer> mbList = new ArrayList<Consumer>();
-        for (int i = 0; i < n; i++) {
-            mbList.add(new Consumer(mbc));
-            assertEquals(mbList.size(), mbc.getTotalReservedMB());
-        }
-
-        // a's reservation will free up all the previous
-        final Consumer a = new Consumer();
-        mbc.reserve(a, n);
-        for (int i = 0; i < n; i++) {
-            assertEquals(null, mbList.get(i).data);
-        }
-        
-        // cancel a in 2 seconds
-        new Thread() {
-            @Override
-            public void run() {
-                try {
-                    Thread.sleep(2000);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-                mbc.reserve(a, 0);
-            }
-        }.start();
-        
-        // b will success after some wait
-        long bWaitStart = System.currentTimeMillis();
-        final Consumer b = new Consumer();
-        mbc.reserveInsist(b, n);
-        assertTrue(System.currentTimeMillis() - bWaitStart > 1000);
-
-        try {
-            mbc.reserve(a, 1);
-            fail();
-        } catch (NotEnoughBudgetException ex) {
-            // expected
-        }
-    }
-
-    class Consumer implements MemoryBudgetController.MemoryConsumer {
-
-        byte[] data;
-
-        Consumer() {
-        }
-
-        Consumer(MemoryBudgetController mbc) {
-            mbc.reserve(this, 1);
-            data = new byte[MemoryBudgetController.ONE_MB - 24]; // 24 is object shell of this + object shell of data + reference of data 
-        }
-
-        @Override
-        public int freeUp(int mb) {
-            if (data != null) {
-                data = null;
-                return 1;
-            } else {
-                return 0;
-            }
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
index 88db599..bcf0f29 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
@@ -1,3 +1,20 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package org.apache.kylin.storage.gridtable;
 
 import static org.junit.Assert.*;
@@ -130,7 +147,7 @@ public class SimpleGridTableTest {
         return builder;
     }
     
-    static List<GTRecord> mockupData(GTInfo info, int nRows) {
+    public static List<GTRecord> mockupData(GTInfo info, int nRows) {
         List<GTRecord> result = new ArrayList<GTRecord>(nRows);
         int round = nRows / 10;
         for (int i = 0; i < round; i++) {
@@ -199,13 +216,13 @@ public class SimpleGridTableTest {
         System.out.println("Written Row Count: " + builder.getWrittenRowCount());
     }
 
-    static GTInfo basicInfo() {
+    public static GTInfo basicInfo() {
         Builder builder = infoBuilder();
         GTInfo info = builder.build();
         return info;
     }
 
-    static GTInfo advancedInfo() {
+    public static GTInfo advancedInfo() {
         Builder builder = infoBuilder();
         builder.enableColumnBlock(new ImmutableBitSet[] { setOf(0), setOf(1, 2), setOf(3, 4) });
         builder.enableRowBlock(4);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java
index ee3826a..6cfb32f 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java
@@ -1,3 +1,20 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package org.apache.kylin.storage.gridtable;
 
 import static org.junit.Assert.*;


[2/2] incubator-kylin git commit: KYLIN-806, refactor in-mem cube classes to under job.inmemcubing package

Posted by li...@apache.org.
KYLIN-806, refactor in-mem cube classes to under job.inmemcubing package


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/765dfca8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/765dfca8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/765dfca8

Branch: refs/heads/0.8.0
Commit: 765dfca8ca4374bfd811c2aae485a182e420c042
Parents: 731f33b
Author: Yang Li <li...@apache.org>
Authored: Mon Jun 1 21:18:24 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Mon Jun 1 21:19:18 2015 +0800

----------------------------------------------------------------------
 .../job/inmemcubing/ConcurrentDiskStore.java    |  22 +
 .../kylin/job/inmemcubing/ICuboidWriter.java    |  17 +
 .../kylin/job/inmemcubing/InMemCubeBuilder.java |   6 +-
 .../kylin/job/inmemcubing/MemDiskStore.java     | 678 +++++++++++++++++++
 .../job/inmemcubing/MemoryBudgetController.java | 250 +++++++
 .../job/inmemcubing/GTMemDiskStoreTest.java     |  98 +++
 .../inmemcubing/MemoryBudgetControllerTest.java |  98 +++
 .../gridtable/memstore/GTMemDiskStore.java      | 661 ------------------
 .../memstore/MemoryBudgetController.java        | 233 -------
 .../gridtable/AggregationCacheMemSizeTest.java  |  31 +-
 .../storage/gridtable/DictGridTableTest.java    |  17 +
 .../storage/gridtable/GTMemDiskStoreTest.java   |  74 --
 .../gridtable/MemoryBudgetControllerTest.java   |  81 ---
 .../storage/gridtable/SimpleGridTableTest.java  |  23 +-
 .../gridtable/SimpleInvertedIndexTest.java      |  17 +
 15 files changed, 1247 insertions(+), 1059 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/job/src/main/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStore.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStore.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStore.java
new file mode 100644
index 0000000..cca1040
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStore.java
@@ -0,0 +1,22 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.job.inmemcubing;
+
+public class ConcurrentDiskStore {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/job/src/main/java/org/apache/kylin/job/inmemcubing/ICuboidWriter.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/ICuboidWriter.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/ICuboidWriter.java
index c05f217..91d4a2a 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/ICuboidWriter.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/ICuboidWriter.java
@@ -1,3 +1,20 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package org.apache.kylin.job.inmemcubing;
 
 import org.apache.kylin.storage.gridtable.GTRecord;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
index 567716a..1207466 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
@@ -55,8 +55,6 @@ import org.apache.kylin.storage.gridtable.GTRecord;
 import org.apache.kylin.storage.gridtable.GTScanRequest;
 import org.apache.kylin.storage.gridtable.GridTable;
 import org.apache.kylin.storage.gridtable.IGTScanner;
-import org.apache.kylin.storage.gridtable.memstore.GTMemDiskStore;
-import org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -160,7 +158,7 @@ public class InMemCubeBuilder implements Runnable {
         GTInfo info = CubeGridTable.newGTInfo(cubeDesc, cuboidID, dictionaryMap);
         // could use a real budget controller, but experiment shows write directly to disk is the same fast
         // GTMemDiskStore store = new GTMemDiskStore(info, memBudget == null ? MemoryBudgetController.ZERO_BUDGET : memBudget);
-        GTMemDiskStore store = new GTMemDiskStore(info, MemoryBudgetController.ZERO_BUDGET);
+        MemDiskStore store = new MemDiskStore(info, MemoryBudgetController.ZERO_BUDGET);
         GridTable gridTable = new GridTable(info, store);
         return gridTable;
     }
@@ -568,7 +566,7 @@ public class InMemCubeBuilder implements Runnable {
     }
 
     private void closeStore(GridTable gt) throws IOException {
-        ((GTMemDiskStore) gt.getStore()).close();
+        ((MemDiskStore) gt.getStore()).close();
     }
 
     // ===========================================================================

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java
new file mode 100644
index 0000000..44376e5
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java
@@ -0,0 +1,678 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.job.inmemcubing;
+
+import static org.apache.kylin.job.inmemcubing.MemoryBudgetController.*;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.NoSuchElementException;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.job.inmemcubing.MemoryBudgetController.MemoryConsumer;
+import org.apache.kylin.job.inmemcubing.MemoryBudgetController.NotEnoughBudgetException;
+import org.apache.kylin.storage.gridtable.GTInfo;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTRowBlock;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.IGTStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemDiskStore implements IGTStore, Closeable {
+
+    private static final Logger logger = LoggerFactory.getLogger(MemDiskStore.class);
+    private static final boolean debug = true;
+
+    private static final int STREAM_BUFFER_SIZE = 8192;
+    private static final int MEM_CHUNK_SIZE_MB = 5;
+
+    private final GTInfo info;
+    private final Object lock; // all public methods that read/write object states are synchronized on this lock
+    private final MemPart memPart;
+    private final DiskPart diskPart;
+    private final boolean delOnClose;
+
+    private Writer ongoingWriter;
+
+    public MemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl) throws IOException {
+        this(info, budgetCtrl, File.createTempFile("GTMemDiskStore", ""), true);
+    }
+
+    public MemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile) throws IOException {
+        this(info, budgetCtrl, diskFile, false);
+    }
+
+    private MemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile, boolean delOnClose) throws IOException {
+        this.info = info;
+        this.lock = this;
+        this.memPart = new MemPart(budgetCtrl);
+        this.diskPart = new DiskPart(diskFile);
+        this.delOnClose = delOnClose;
+
+        // in case user forget to call close()
+        if (delOnClose)
+            diskFile.deleteOnExit();
+    }
+
+    @Override
+    public GTInfo getInfo() {
+        return info;
+    }
+
+    @Override
+    public IGTStoreWriter rebuild(int shard) throws IOException {
+        return newWriter(0);
+    }
+
+    @Override
+    public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException {
+        return newWriter(length());
+    }
+
+    private Writer newWriter(long startOffset) throws IOException {
+        synchronized (lock) {
+            if (ongoingWriter != null)
+                throw new IllegalStateException();
+
+            ongoingWriter = new Writer(startOffset);
+            return ongoingWriter;
+        }
+    }
+
+    @Override
+    public IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, ImmutableBitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
+        synchronized (lock) {
+            return new Reader();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        // synchronized inside the parts close()
+        memPart.close();
+        diskPart.close();
+    }
+
+    public long length() {
+        synchronized (lock) {
+            return Math.max(memPart.tailOffset(), diskPart.tailOffset);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "MemDiskStore@" + (info.getTableName() == null ? this.hashCode() : info.getTableName());
+    }
+
+    private class Reader implements IGTStoreScanner {
+
+        final DataInputStream din;
+        long readOffset = 0;
+        long memRead = 0;
+        long diskRead = 0;
+        int nReadCalls = 0;
+
+        GTRowBlock block = GTRowBlock.allocate(info);
+        GTRowBlock next = null;
+
+        Reader() throws IOException {
+            diskPart.openRead();
+            if (debug)
+                logger.debug(MemDiskStore.this + " read start @ " + readOffset);
+
+            InputStream in = new InputStream() {
+                byte[] tmp = new byte[1];
+                MemChunk memChunk;
+
+                @Override
+                public int read() throws IOException {
+                    int n = read(tmp, 0, 1);
+                    if (n <= 0)
+                        return -1;
+                    else
+                        return (int) tmp[0];
+                }
+
+                @Override
+                public int read(byte[] b, int off, int len) throws IOException {
+                    synchronized (lock) {
+                        nReadCalls++;
+                        if (available() <= 0)
+                            return -1;
+
+                        if (memChunk == null && memPart.headOffset() <= readOffset && readOffset < memPart.tailOffset()) {
+                            memChunk = memPart.seekMemChunk(readOffset);
+                        }
+
+                        int lenToGo = Math.min(available(), len);
+
+                        int nRead = 0;
+                        while (lenToGo > 0) {
+                            int n;
+                            if (memChunk != null) {
+                                if (memChunk.headOffset() > readOffset) {
+                                    memChunk = null;
+                                    continue;
+                                }
+                                if (readOffset >= memChunk.tailOffset()) {
+                                    memChunk = memChunk.next;
+                                    continue;
+                                }
+                                int chunkOffset = (int) (readOffset - memChunk.headOffset());
+                                n = Math.min((int) (memChunk.tailOffset() - readOffset), lenToGo);
+                                System.arraycopy(memChunk.data, chunkOffset, b, off, n);
+                                memRead += n;
+                            } else {
+                                n = diskPart.read(readOffset, b, off, lenToGo);
+                                diskRead += n;
+                            }
+                            lenToGo -= n;
+                            nRead += n;
+                            off += n;
+                            readOffset += n;
+                        }
+                        return nRead;
+                    }
+                }
+
+                @Override
+                public int available() throws IOException {
+                    synchronized (lock) {
+                        return (int) (length() - readOffset);
+                    }
+                }
+            };
+
+            din = new DataInputStream(new BufferedInputStream(in, STREAM_BUFFER_SIZE));
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (next != null)
+                return true;
+
+            try {
+                if (din.available() > 0) {
+                    block.importFrom(din);
+                    next = block;
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+
+            return next != null;
+        }
+
+        @Override
+        public GTRowBlock next() {
+            if (next == null) {
+                hasNext();
+                if (next == null)
+                    throw new NoSuchElementException();
+            }
+            GTRowBlock r = next;
+            next = null;
+            return r;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void close() throws IOException {
+            synchronized (lock) {
+                din.close();
+                diskPart.closeRead();
+                if (debug)
+                    logger.debug(MemDiskStore.this + " read end @ " + readOffset + ", " + (memRead) + " from mem, " + (diskRead) + " from disk, " + nReadCalls + " read() calls");
+            }
+        }
+
+    }
+
+    private class Writer implements IGTStoreWriter {
+
+        final DataOutputStream dout;
+        long writeOffset;
+        long memWrite = 0;
+        long diskWrite = 0;
+        int nWriteCalls;
+        boolean closed = false;
+
+        Writer(long startOffset) throws IOException {
+            writeOffset = 0; // TODO does not support append yet
+            memPart.clear();
+            diskPart.clear();
+            diskPart.openWrite(false);
+            if (debug)
+                logger.debug(MemDiskStore.this + " write start @ " + writeOffset);
+
+            memPart.activateMemWrite();
+
+            OutputStream out = new OutputStream() {
+                byte[] tmp = new byte[1];
+                boolean memPartActivated = true;
+
+                @Override
+                public void write(int b) throws IOException {
+                    tmp[0] = (byte) b;
+                    write(tmp, 0, 1);
+                }
+
+                @Override
+                public void write(byte[] bytes, int offset, int length) throws IOException {
+                    // lock inside memPart.write() and diskPartm.write()
+                    nWriteCalls++;
+                    while (length > 0) {
+                        int n;
+                        if (memPartActivated) {
+                            n = memPart.write(bytes, offset, length, writeOffset);
+                            memWrite += n;
+                            if (n == 0) {
+                                memPartActivated = false;
+                            }
+                        } else {
+                            n = diskPart.write(writeOffset, bytes, offset, length);
+                            diskWrite += n;
+                        }
+                        offset += n;
+                        length -= n;
+                        writeOffset += n;
+                    }
+                }
+            };
+            dout = new DataOutputStream(new BufferedOutputStream(out, STREAM_BUFFER_SIZE));
+        }
+
+        @Override
+        public void write(GTRowBlock block) throws IOException {
+            block.export(dout);
+        }
+
+        @Override
+        public void close() throws IOException {
+            synchronized (lock) {
+                if (!closed) {
+                    dout.close();
+                    memPart.deactivateMemWrite();
+                }
+
+                if (memPart.asyncFlusher == null) {
+                    assert writeOffset == diskPart.tailOffset;
+                    diskPart.closeWrite();
+                    ongoingWriter = null;
+                    if (debug)
+                        logger.debug(MemDiskStore.this + " write end @ " + writeOffset + ", " + (memWrite) + " to mem, " + (diskWrite) + " to disk, " + nWriteCalls + " write() calls");
+                } else {
+                    // the asyncFlusher will call this close() again later
+                }
+                closed = true;
+            }
+        }
+    }
+
+    private static class MemChunk {
+        long diskOffset;
+        int length;
+        byte[] data;
+        MemChunk next;
+
+        boolean isFull() {
+            return length == data.length;
+        }
+
+        long headOffset() {
+            return diskOffset;
+        }
+
+        long tailOffset() {
+            return diskOffset + length;
+        }
+
+        int freeSpace() {
+            return data.length - length;
+        }
+    }
+
+    private class MemPart implements Closeable, MemoryConsumer {
+
+        final MemoryBudgetController budgetCtrl;
+
+        // async flush thread checks this flag out of sync block
+        volatile boolean writeActivated;
+        MemChunk firstChunk;
+        MemChunk lastChunk;
+        int chunkCount;
+
+        Thread asyncFlusher;
+        MemChunk asyncFlushChunk;
+        long asyncFlushDiskOffset;
+        Throwable asyncFlushException;
+
+        MemPart(MemoryBudgetController budgetCtrl) {
+            this.budgetCtrl = budgetCtrl;
+        }
+
+        long headOffset() {
+            return firstChunk == null ? 0 : firstChunk.headOffset();
+        }
+
+        long tailOffset() {
+            return lastChunk == null ? 0 : lastChunk.tailOffset();
+        }
+
+        public MemChunk seekMemChunk(long diskOffset) {
+            MemChunk c = firstChunk;
+            while (c != null && c.headOffset() <= diskOffset) {
+                if (diskOffset < c.tailOffset())
+                    break;
+                c = c.next;
+            }
+            return c;
+        }
+
+        public int write(byte[] bytes, int offset, int length, long diskOffset) {
+            int needMoreMem = 0;
+
+            synchronized (lock) {
+                if (writeActivated == false)
+                    return 0;
+
+                // write is only expected at the tail
+                if (diskOffset != tailOffset())
+                    return 0;
+
+                if (chunkCount == 0 || lastChunk.isFull())
+                    needMoreMem = (chunkCount + 1) * MEM_CHUNK_SIZE_MB;
+            }
+
+            // call to budgetCtrl.reserve() must be out of synchronized block, or deadlock may happen between MemoryConsumers
+            if (needMoreMem > 0) {
+                try {
+                    budgetCtrl.reserve(this, needMoreMem);
+                } catch (NotEnoughBudgetException ex) {
+                    deactivateMemWrite();
+                    return 0;
+                }
+            }
+
+            synchronized (lock) {
+                if (needMoreMem > 0 && (chunkCount == 0 || lastChunk.isFull())) {
+                    MemChunk chunk = new MemChunk();
+                    chunk.diskOffset = diskOffset;
+                    chunk.data = new byte[ONE_MB * MEM_CHUNK_SIZE_MB - 48]; // -48 for MemChunk overhead
+                    if (chunkCount == 0) {
+                        firstChunk = lastChunk = chunk;
+                    } else {
+                        lastChunk.next = chunk;
+                        lastChunk = chunk;
+                    }
+                    chunkCount++;
+                }
+
+                int n = Math.min(lastChunk.freeSpace(), length);
+                System.arraycopy(bytes, offset, lastChunk.data, lastChunk.length, n);
+                lastChunk.length += n;
+
+                if (n > 0)
+                    asyncFlush(lastChunk, diskOffset, n);
+
+                return n;
+            }
+        }
+
+        private void asyncFlush(MemChunk lastChunk, long diskOffset, int n) {
+            if (asyncFlushChunk == null) {
+                asyncFlushChunk = lastChunk;
+                asyncFlushDiskOffset = diskOffset;
+            }
+
+            if (asyncFlusher == null) {
+                asyncFlusher = new Thread() {
+                    public void run() {
+                        asyncFlushException = null;
+                        if (debug)
+                            logger.debug(MemDiskStore.this + " async flush started @ " + asyncFlushDiskOffset);
+                        try {
+                            while (writeActivated) {
+                                flushToDisk();
+                                Thread.sleep(10);
+                            }
+                            flushToDisk();
+
+                            if (debug)
+                                logger.debug(MemDiskStore.this + " async flush ended @ " + asyncFlushDiskOffset);
+
+                            synchronized (lock) {
+                                asyncFlusher = null;
+                                asyncFlushChunk = null;
+                                if (ongoingWriter.closed) {
+                                    ongoingWriter.close(); // call writer.close() again to clean up
+                                }
+                            }
+                        } catch (Throwable ex) {
+                            asyncFlushException = ex;
+                        }
+                    }
+                };
+                asyncFlusher.start();
+            }
+        }
+
+        private void flushToDisk() throws IOException {
+            byte[] data;
+            int offset = 0;
+            int length = 0;
+            int flushedLen = 0;
+
+            while (true) {
+                data = null;
+                synchronized (lock) {
+                    asyncFlushDiskOffset += flushedLen; // bytes written in last loop
+                    //                    if (debug)
+                    //                        logger.debug(GTMemDiskStore.this + " async flush @ " + asyncFlushDiskOffset);
+                    if (asyncFlushChunk != null && asyncFlushChunk.tailOffset() == asyncFlushDiskOffset) {
+                        asyncFlushChunk = asyncFlushChunk.next;
+                    }
+                    if (asyncFlushChunk != null) {
+                        data = asyncFlushChunk.data;
+                        offset = (int) (asyncFlushDiskOffset - asyncFlushChunk.headOffset());
+                        length = asyncFlushChunk.length - offset;
+                    }
+                }
+
+                if (data == null)
+                    break;
+
+                flushedLen = diskPart.write(asyncFlushDiskOffset, data, offset, length);
+            }
+        }
+
+        @Override
+        public int freeUp(int mb) {
+            synchronized (lock) {
+                int mbReleased = 0;
+                while (chunkCount > 0 && mbReleased < mb) {
+                    if (firstChunk == asyncFlushChunk)
+                        break;
+
+                    mbReleased += MEM_CHUNK_SIZE_MB;
+                    chunkCount--;
+                    if (chunkCount == 0) {
+                        firstChunk = lastChunk = null;
+                    } else {
+                        MemChunk next = firstChunk.next;
+                        firstChunk.next = null;
+                        firstChunk = next;
+                    }
+                }
+                return mbReleased;
+            }
+        }
+
+        public void activateMemWrite() {
+            if (budgetCtrl.getTotalBudgetMB() > 0) {
+                writeActivated = true;
+                if (debug)
+                    logger.debug(MemDiskStore.this + " mem write activated");
+            }
+        }
+
+        public void deactivateMemWrite() {
+            writeActivated = false;
+            if (debug)
+                logger.debug(MemDiskStore.this + " mem write de-activated");
+        }
+
+        public void clear() {
+            chunkCount = 0;
+            firstChunk = lastChunk = null;
+            budgetCtrl.reserve(this, 0);
+        }
+
+        @Override
+        public void close() throws IOException {
+            synchronized (lock) {
+                if (asyncFlushException != null)
+                    throwAsyncException(asyncFlushException);
+            }
+            try {
+                asyncFlusher.join();
+            } catch (NullPointerException npe) {
+                // that's fine, async flusher may not present
+            } catch (InterruptedException e) {
+                logger.warn("async join interrupted", e);
+            }
+            synchronized (lock) {
+                if (asyncFlushException != null)
+                    throwAsyncException(asyncFlushException);
+
+                clear();
+            }
+        }
+
+        private void throwAsyncException(Throwable ex) throws IOException {
+            if (ex instanceof IOException)
+                throw (IOException) ex;
+            else
+                throw new IOException(ex);
+        }
+
+        @Override
+        public String toString() {
+            return MemDiskStore.this.toString();
+        }
+
+    }
+
+    private class DiskPart implements Closeable {
+        final File diskFile;
+        FileChannel writeChannel;
+        FileChannel readChannel;
+        int readerCount = 0; // allow parallel readers
+        long tailOffset;
+
+        DiskPart(File diskFile) throws IOException {
+            this.diskFile = diskFile;
+            this.tailOffset = diskFile.length();
+            if (debug)
+                logger.debug(MemDiskStore.this + " disk file " + diskFile.getAbsolutePath());
+        }
+
+        public void openRead() throws IOException {
+            if (readChannel == null) {
+                readChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.READ);
+            }
+            readerCount++;
+        }
+
+        public int read(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
+            return readChannel.read(ByteBuffer.wrap(bytes, offset, length), diskOffset);
+        }
+
+        public void closeRead() throws IOException {
+            closeRead(false);
+        }
+
+        private void closeRead(boolean force) throws IOException {
+            readerCount--;
+            if (readerCount == 0 || force) {
+                if (readChannel != null) {
+                    readChannel.close();
+                    readChannel = null;
+                }
+            }
+        }
+
+        public void openWrite(boolean append) throws IOException {
+            if (append) {
+                writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.APPEND, StandardOpenOption.WRITE);
+                tailOffset = diskFile.length();
+            } else {
+                diskFile.delete();
+                writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
+                tailOffset = 0;
+            }
+        }
+
+        public int write(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
+            synchronized (lock) {
+                int n = writeChannel.write(ByteBuffer.wrap(bytes, offset, length), diskOffset);
+                tailOffset = Math.max(diskOffset + n, tailOffset);
+                return n;
+            }
+        }
+
+        public void closeWrite() throws IOException {
+            if (writeChannel != null) {
+                writeChannel.close();
+                writeChannel = null;
+            }
+        }
+
+        public void clear() throws IOException {
+            diskFile.delete();
+            tailOffset = 0;
+        }
+
+        @Override
+        public void close() throws IOException {
+            synchronized (lock) {
+                closeWrite();
+                closeRead(true);
+                if (delOnClose) {
+                    diskFile.delete();
+                }
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/job/src/main/java/org/apache/kylin/job/inmemcubing/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/MemoryBudgetController.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/MemoryBudgetController.java
new file mode 100644
index 0000000..4b50160
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/MemoryBudgetController.java
@@ -0,0 +1,250 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.job.inmemcubing;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemoryBudgetController {
+
+    private static final boolean debug = true;
+
+    public static interface MemoryConsumer {
+        // return number MB released
+        int freeUp(int mb);
+    }
+
+    @SuppressWarnings("serial")
+    public static class NotEnoughBudgetException extends IllegalStateException {
+
+        public NotEnoughBudgetException() {
+            super();
+        }
+
+        public NotEnoughBudgetException(Throwable cause) {
+            super(cause);
+        }
+    }
+
+    private static class ConsumerEntry {
+        final MemoryConsumer consumer;
+        int reservedMB;
+
+        ConsumerEntry(MemoryConsumer consumer) {
+            this.consumer = consumer;
+        }
+    }
+
+    public static final MemoryBudgetController ZERO_BUDGET = new MemoryBudgetController(0);
+    public static final int ONE_MB = 1024 * 1024;
+
+    private static final Logger logger = LoggerFactory.getLogger(MemoryBudgetController.class);
+
+    // all budget numbers are in MB
+    private final int totalBudgetMB;
+    private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
+    private int totalReservedMB;
+    private final ReentrantLock lock = new ReentrantLock();
+
+    public MemoryBudgetController(int totalBudgetMB) {
+        if (totalBudgetMB < 0)
+            throw new IllegalArgumentException();
+        if (totalBudgetMB > getSystemAvailMB())
+            throw new IllegalStateException();
+
+        this.totalBudgetMB = totalBudgetMB;
+        this.totalReservedMB = 0;
+    }
+
+    public int getTotalBudgetMB() {
+        return totalBudgetMB;
+    }
+
+    public int getTotalReservedMB() {
+        lock.lock();
+        try {
+            return totalReservedMB;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public int getRemainingBudgetMB() {
+        lock.lock();
+        try {
+            return totalBudgetMB - totalReservedMB;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void reserveInsist(MemoryConsumer consumer, int requestMB) {
+        long waitStart = 0;
+        while (true) {
+            try {
+                reserve(consumer, requestMB);
+                if (debug && waitStart > 0)
+                    logger.debug(consumer + " waited " + (System.currentTimeMillis() - waitStart) + " ms on the " + requestMB + " MB request");
+                return;
+            } catch (NotEnoughBudgetException ex) {
+                // retry
+            }
+
+            if (waitStart == 0)
+                waitStart = System.currentTimeMillis();
+
+            synchronized (lock) {
+                try {
+                    lock.wait();
+                } catch (InterruptedException e) {
+                    throw new NotEnoughBudgetException(e);
+                }
+            }
+        }
+    }
+
+    /** reserve without wait, fail with NotEnoughBudgetException immediately if no mem */
+    public void reserve(MemoryConsumer consumer, int requestMB) {
+        if (totalBudgetMB == 0 && requestMB > 0)
+            throw new NotEnoughBudgetException();
+
+        boolean ok = false;
+        while (!ok) {
+            int gap = calculateGap(consumer, requestMB);
+            if (gap > 0) {
+                // to void deadlock, don't hold lock when invoking consumer.freeUp()
+                tryFreeUp(gap);
+            }
+            ok = updateBooking(consumer, requestMB);
+        }
+    }
+
+    private int calculateGap(MemoryConsumer consumer, int requestMB) {
+        lock.lock();
+        try {
+            ConsumerEntry entry = booking.get(consumer);
+            int curMB = entry == null ? 0 : entry.reservedMB;
+            int delta = requestMB - curMB;
+            return delta - (totalBudgetMB - totalReservedMB);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private void tryFreeUp(int gap) {
+        // note don't hold lock when calling consumer.freeUp(), that method holding lock for itself and may cause deadlock
+        for (ConsumerEntry entry : booking.values()) {
+            int mb = entry.consumer.freeUp(gap);
+            if (mb > 0) {
+                lock.lock();
+                try {
+                    updateBookingWithDelta(entry.consumer, -mb);
+                } finally {
+                    lock.unlock();
+                }
+                gap -= mb;
+                if (gap <= 0)
+                    break;
+            }
+        }
+        if (gap > 0)
+            throw new NotEnoughBudgetException();
+
+        if (debug) {
+            if (getSystemAvailMB() < getRemainingBudgetMB()) {
+                logger.debug("Remaining budget is " + getRemainingBudgetMB() + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong.");
+            }
+        }
+    }
+
+    private boolean updateBooking(MemoryConsumer consumer, int requestMB) {
+        lock.lock();
+        try {
+            ConsumerEntry entry = booking.get(consumer);
+            if (entry == null) {
+                if (requestMB == 0)
+                    return true;
+
+                entry = new ConsumerEntry(consumer);
+                booking.put(consumer, entry);
+            }
+
+            int delta = requestMB - entry.reservedMB;
+            return updateBookingWithDelta(consumer, delta);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    // lock MUST be obtained before entering
+    private boolean updateBookingWithDelta(MemoryConsumer consumer, int delta) {
+        if (delta == 0)
+            return true;
+
+        ConsumerEntry entry = booking.get(consumer);
+        if (entry == null) {
+            if (delta <= 0)
+                return true;
+
+            entry = new ConsumerEntry(consumer);
+            booking.put(consumer, entry);
+        }
+
+        // double check gap again, it may be changed by other concurrent requests
+        if (delta > 0) {
+            int gap = delta - (totalBudgetMB - totalReservedMB);
+            if (gap > 0)
+                return false;
+        }
+
+        totalReservedMB += delta;
+        entry.reservedMB += delta;
+        if (entry.reservedMB == 0) {
+            booking.remove(entry.consumer);
+        }
+        if (debug) {
+            logger.debug(entry.consumer + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB");
+        }
+
+        if (delta < 0) {
+            synchronized (lock) {
+                lock.notifyAll();
+            }
+        }
+
+        return true;
+    }
+
+    public static long getSystemAvailBytes() {
+        Runtime runtime = Runtime.getRuntime();
+        long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
+        long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
+        long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
+        long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
+        long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
+        return availableMemory;
+    }
+
+    public static int getSystemAvailMB() {
+        return (int) (getSystemAvailBytes() / ONE_MB);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/job/src/test/java/org/apache/kylin/job/inmemcubing/GTMemDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/GTMemDiskStoreTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/GTMemDiskStoreTest.java
new file mode 100644
index 0000000..37eb42d
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/GTMemDiskStoreTest.java
@@ -0,0 +1,98 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.job.inmemcubing;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.job.inmemcubing.MemDiskStore;
+import org.apache.kylin.job.inmemcubing.MemoryBudgetController;
+import org.apache.kylin.storage.gridtable.GTBuilder;
+import org.apache.kylin.storage.gridtable.GTInfo;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.GridTable;
+import org.apache.kylin.storage.gridtable.IGTScanner;
+import org.apache.kylin.storage.gridtable.SimpleGridTableTest;
+import org.junit.Test;
+
+public class GTMemDiskStoreTest {
+
+    final MemoryBudgetController budgetCtrl = new MemoryBudgetController(20);
+    final GTInfo info = SimpleGridTableTest.advancedInfo();
+    final List<GTRecord> data = SimpleGridTableTest.mockupData(info, 1000000); // converts to about 34 MB data
+
+    @Test
+    public void testSingleThreadWriteRead() throws IOException {
+        long start = System.currentTimeMillis();
+        verifyOneTableWriteAndRead();
+        long end = System.currentTimeMillis();
+        System.out.println("Cost " + (end - start) + " millis");
+    }
+
+    @Test
+    public void testMultiThreadWriteRead() throws IOException, InterruptedException {
+        long start = System.currentTimeMillis();
+
+        int nThreads = 5;
+        Thread[] t = new Thread[nThreads];
+        for (int i = 0; i < nThreads; i++) {
+            t[i] = new Thread() {
+                public void run() {
+                    try {
+                        verifyOneTableWriteAndRead();
+                    } catch (Exception ex) {
+                        ex.printStackTrace();
+                    }
+                }
+            };
+            t[i].start();
+        }
+        for (int i = 0; i < nThreads; i++) {
+            t[i].join();
+        }
+
+        long end = System.currentTimeMillis();
+        System.out.println("Cost " + (end - start) + " millis");
+    }
+
+    private void verifyOneTableWriteAndRead() throws IOException {
+        MemDiskStore store = new MemDiskStore(info, budgetCtrl);
+        GridTable table = new GridTable(info, store);
+        verifyWriteAndRead(table);
+    }
+
+    private void verifyWriteAndRead(GridTable table) throws IOException {
+        GTInfo info = table.getInfo();
+
+        GTBuilder builder = table.rebuild();
+        for (GTRecord r : data) {
+            builder.write(r);
+        }
+        builder.close();
+
+        IGTScanner scanner = table.scan(new GTScanRequest(info));
+        int i = 0;
+        for (GTRecord r : scanner) {
+            assertEquals(data.get(i++), r);
+        }
+        scanner.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/job/src/test/java/org/apache/kylin/job/inmemcubing/MemoryBudgetControllerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/MemoryBudgetControllerTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/MemoryBudgetControllerTest.java
new file mode 100644
index 0000000..e6a1fb4
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/MemoryBudgetControllerTest.java
@@ -0,0 +1,98 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.job.inmemcubing;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+
+import org.apache.kylin.job.inmemcubing.MemoryBudgetController;
+import org.apache.kylin.job.inmemcubing.MemoryBudgetController.NotEnoughBudgetException;
+import org.junit.Test;
+
+public class MemoryBudgetControllerTest {
+
+    @Test
+    public void test() {
+        final int n = MemoryBudgetController.getSystemAvailMB() / 2;
+        final MemoryBudgetController mbc = new MemoryBudgetController(n);
+
+        ArrayList<Consumer> mbList = new ArrayList<Consumer>();
+        for (int i = 0; i < n; i++) {
+            mbList.add(new Consumer(mbc));
+            assertEquals(mbList.size(), mbc.getTotalReservedMB());
+        }
+
+        // a's reservation will free up all the previous
+        final Consumer a = new Consumer();
+        mbc.reserve(a, n);
+        for (int i = 0; i < n; i++) {
+            assertEquals(null, mbList.get(i).data);
+        }
+        
+        // cancel a in 2 seconds
+        new Thread() {
+            @Override
+            public void run() {
+                try {
+                    Thread.sleep(2000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+                mbc.reserve(a, 0);
+            }
+        }.start();
+        
+        // b will success after some wait
+        long bWaitStart = System.currentTimeMillis();
+        final Consumer b = new Consumer();
+        mbc.reserveInsist(b, n);
+        assertTrue(System.currentTimeMillis() - bWaitStart > 1000);
+
+        try {
+            mbc.reserve(a, 1);
+            fail();
+        } catch (NotEnoughBudgetException ex) {
+            // expected
+        }
+    }
+
+    class Consumer implements MemoryBudgetController.MemoryConsumer {
+
+        byte[] data;
+
+        Consumer() {
+        }
+
+        Consumer(MemoryBudgetController mbc) {
+            mbc.reserve(this, 1);
+            data = new byte[MemoryBudgetController.ONE_MB - 24]; // 24 is object shell of this + object shell of data + reference of data 
+        }
+
+        @Override
+        public int freeUp(int mb) {
+            if (data != null) {
+                data = null;
+                return 1;
+            } else {
+                return 0;
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
deleted file mode 100644
index 65d4905..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
+++ /dev/null
@@ -1,661 +0,0 @@
-package org.apache.kylin.storage.gridtable.memstore;
-
-import static org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController.*;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.StandardOpenOption;
-import java.util.NoSuchElementException;
-
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.storage.gridtable.GTInfo;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.storage.gridtable.GTRowBlock;
-import org.apache.kylin.storage.gridtable.GTScanRequest;
-import org.apache.kylin.storage.gridtable.IGTStore;
-import org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController.MemoryConsumer;
-import org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController.NotEnoughBudgetException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class GTMemDiskStore implements IGTStore, Closeable {
-
-    private static final Logger logger = LoggerFactory.getLogger(GTMemDiskStore.class);
-    private static final boolean debug = true;
-
-    private static final int STREAM_BUFFER_SIZE = 8192;
-    private static final int MEM_CHUNK_SIZE_MB = 5;
-
-    private final GTInfo info;
-    private final Object lock; // all public methods that read/write object states are synchronized on this lock
-    private final MemPart memPart;
-    private final DiskPart diskPart;
-    private final boolean delOnClose;
-
-    private Writer ongoingWriter;
-
-    public GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl) throws IOException {
-        this(info, budgetCtrl, File.createTempFile("GTMemDiskStore", ""), true);
-    }
-
-    public GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile) throws IOException {
-        this(info, budgetCtrl, diskFile, false);
-    }
-
-    private GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile, boolean delOnClose) throws IOException {
-        this.info = info;
-        this.lock = this;
-        this.memPart = new MemPart(budgetCtrl);
-        this.diskPart = new DiskPart(diskFile);
-        this.delOnClose = delOnClose;
-
-        // in case user forget to call close()
-        if (delOnClose)
-            diskFile.deleteOnExit();
-    }
-
-    @Override
-    public GTInfo getInfo() {
-        return info;
-    }
-
-    @Override
-    public IGTStoreWriter rebuild(int shard) throws IOException {
-        return newWriter(0);
-    }
-
-    @Override
-    public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException {
-        return newWriter(length());
-    }
-
-    private Writer newWriter(long startOffset) throws IOException {
-        synchronized (lock) {
-            if (ongoingWriter != null)
-                throw new IllegalStateException();
-
-            ongoingWriter = new Writer(startOffset);
-            return ongoingWriter;
-        }
-    }
-
-    @Override
-    public IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, ImmutableBitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
-        synchronized (lock) {
-            return new Reader();
-        }
-    }
-
-    @Override
-    public void close() throws IOException {
-        // synchronized inside the parts close()
-        memPart.close();
-        diskPart.close();
-    }
-
-    public long length() {
-        synchronized (lock) {
-            return Math.max(memPart.tailOffset(), diskPart.tailOffset);
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "MemDiskStore@" + (info.getTableName() == null ? this.hashCode() : info.getTableName());
-    }
-
-    private class Reader implements IGTStoreScanner {
-
-        final DataInputStream din;
-        long readOffset = 0;
-        long memRead = 0;
-        long diskRead = 0;
-        int nReadCalls = 0;
-
-        GTRowBlock block = GTRowBlock.allocate(info);
-        GTRowBlock next = null;
-
-        Reader() throws IOException {
-            diskPart.openRead();
-            if (debug)
-                logger.debug(GTMemDiskStore.this + " read start @ " + readOffset);
-
-            InputStream in = new InputStream() {
-                byte[] tmp = new byte[1];
-                MemChunk memChunk;
-
-                @Override
-                public int read() throws IOException {
-                    int n = read(tmp, 0, 1);
-                    if (n <= 0)
-                        return -1;
-                    else
-                        return (int) tmp[0];
-                }
-
-                @Override
-                public int read(byte[] b, int off, int len) throws IOException {
-                    synchronized (lock) {
-                        nReadCalls++;
-                        if (available() <= 0)
-                            return -1;
-
-                        if (memChunk == null && memPart.headOffset() <= readOffset && readOffset < memPart.tailOffset()) {
-                            memChunk = memPart.seekMemChunk(readOffset);
-                        }
-
-                        int lenToGo = Math.min(available(), len);
-
-                        int nRead = 0;
-                        while (lenToGo > 0) {
-                            int n;
-                            if (memChunk != null) {
-                                if (memChunk.headOffset() > readOffset) {
-                                    memChunk = null;
-                                    continue;
-                                }
-                                if (readOffset >= memChunk.tailOffset()) {
-                                    memChunk = memChunk.next;
-                                    continue;
-                                }
-                                int chunkOffset = (int) (readOffset - memChunk.headOffset());
-                                n = Math.min((int) (memChunk.tailOffset() - readOffset), lenToGo);
-                                System.arraycopy(memChunk.data, chunkOffset, b, off, n);
-                                memRead += n;
-                            } else {
-                                n = diskPart.read(readOffset, b, off, lenToGo);
-                                diskRead += n;
-                            }
-                            lenToGo -= n;
-                            nRead += n;
-                            off += n;
-                            readOffset += n;
-                        }
-                        return nRead;
-                    }
-                }
-
-                @Override
-                public int available() throws IOException {
-                    synchronized (lock) {
-                        return (int) (length() - readOffset);
-                    }
-                }
-            };
-
-            din = new DataInputStream(new BufferedInputStream(in, STREAM_BUFFER_SIZE));
-        }
-
-        @Override
-        public boolean hasNext() {
-            if (next != null)
-                return true;
-
-            try {
-                if (din.available() > 0) {
-                    block.importFrom(din);
-                    next = block;
-                }
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-
-            return next != null;
-        }
-
-        @Override
-        public GTRowBlock next() {
-            if (next == null) {
-                hasNext();
-                if (next == null)
-                    throw new NoSuchElementException();
-            }
-            GTRowBlock r = next;
-            next = null;
-            return r;
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public void close() throws IOException {
-            synchronized (lock) {
-                din.close();
-                diskPart.closeRead();
-                if (debug)
-                    logger.debug(GTMemDiskStore.this + " read end @ " + readOffset + ", " + (memRead) + " from mem, " + (diskRead) + " from disk, " + nReadCalls + " read() calls");
-            }
-        }
-
-    }
-
-    private class Writer implements IGTStoreWriter {
-
-        final DataOutputStream dout;
-        long writeOffset;
-        long memWrite = 0;
-        long diskWrite = 0;
-        int nWriteCalls;
-        boolean closed = false;
-
-        Writer(long startOffset) throws IOException {
-            writeOffset = 0; // TODO does not support append yet
-            memPart.clear();
-            diskPart.clear();
-            diskPart.openWrite(false);
-            if (debug)
-                logger.debug(GTMemDiskStore.this + " write start @ " + writeOffset);
-
-            memPart.activateMemWrite();
-
-            OutputStream out = new OutputStream() {
-                byte[] tmp = new byte[1];
-                boolean memPartActivated = true;
-
-                @Override
-                public void write(int b) throws IOException {
-                    tmp[0] = (byte) b;
-                    write(tmp, 0, 1);
-                }
-
-                @Override
-                public void write(byte[] bytes, int offset, int length) throws IOException {
-                    // lock inside memPart.write() and diskPartm.write()
-                    nWriteCalls++;
-                    while (length > 0) {
-                        int n;
-                        if (memPartActivated) {
-                            n = memPart.write(bytes, offset, length, writeOffset);
-                            memWrite += n;
-                            if (n == 0) {
-                                memPartActivated = false;
-                            }
-                        } else {
-                            n = diskPart.write(writeOffset, bytes, offset, length);
-                            diskWrite += n;
-                        }
-                        offset += n;
-                        length -= n;
-                        writeOffset += n;
-                    }
-                }
-            };
-            dout = new DataOutputStream(new BufferedOutputStream(out, STREAM_BUFFER_SIZE));
-        }
-
-        @Override
-        public void write(GTRowBlock block) throws IOException {
-            block.export(dout);
-        }
-
-        @Override
-        public void close() throws IOException {
-            synchronized (lock) {
-                if (!closed) {
-                    dout.close();
-                    memPart.deactivateMemWrite();
-                }
-
-                if (memPart.asyncFlusher == null) {
-                    assert writeOffset == diskPart.tailOffset;
-                    diskPart.closeWrite();
-                    ongoingWriter = null;
-                    if (debug)
-                        logger.debug(GTMemDiskStore.this + " write end @ " + writeOffset + ", " + (memWrite) + " to mem, " + (diskWrite) + " to disk, " + nWriteCalls + " write() calls");
-                } else {
-                    // the asyncFlusher will call this close() again later
-                }
-                closed = true;
-            }
-        }
-    }
-
-    private static class MemChunk {
-        long diskOffset;
-        int length;
-        byte[] data;
-        MemChunk next;
-
-        boolean isFull() {
-            return length == data.length;
-        }
-
-        long headOffset() {
-            return diskOffset;
-        }
-
-        long tailOffset() {
-            return diskOffset + length;
-        }
-
-        int freeSpace() {
-            return data.length - length;
-        }
-    }
-
-    private class MemPart implements Closeable, MemoryConsumer {
-
-        final MemoryBudgetController budgetCtrl;
-
-        // async flush thread checks this flag out of sync block
-        volatile boolean writeActivated;
-        MemChunk firstChunk;
-        MemChunk lastChunk;
-        int chunkCount;
-
-        Thread asyncFlusher;
-        MemChunk asyncFlushChunk;
-        long asyncFlushDiskOffset;
-        Throwable asyncFlushException;
-
-        MemPart(MemoryBudgetController budgetCtrl) {
-            this.budgetCtrl = budgetCtrl;
-        }
-
-        long headOffset() {
-            return firstChunk == null ? 0 : firstChunk.headOffset();
-        }
-
-        long tailOffset() {
-            return lastChunk == null ? 0 : lastChunk.tailOffset();
-        }
-
-        public MemChunk seekMemChunk(long diskOffset) {
-            MemChunk c = firstChunk;
-            while (c != null && c.headOffset() <= diskOffset) {
-                if (diskOffset < c.tailOffset())
-                    break;
-                c = c.next;
-            }
-            return c;
-        }
-
-        public int write(byte[] bytes, int offset, int length, long diskOffset) {
-            int needMoreMem = 0;
-
-            synchronized (lock) {
-                if (writeActivated == false)
-                    return 0;
-
-                // write is only expected at the tail
-                if (diskOffset != tailOffset())
-                    return 0;
-
-                if (chunkCount == 0 || lastChunk.isFull())
-                    needMoreMem = (chunkCount + 1) * MEM_CHUNK_SIZE_MB;
-            }
-
-            // call to budgetCtrl.reserve() must be out of synchronized block, or deadlock may happen between MemoryConsumers
-            if (needMoreMem > 0) {
-                try {
-                    budgetCtrl.reserve(this, needMoreMem);
-                } catch (NotEnoughBudgetException ex) {
-                    deactivateMemWrite();
-                    return 0;
-                }
-            }
-
-            synchronized (lock) {
-                if (needMoreMem > 0 && (chunkCount == 0 || lastChunk.isFull())) {
-                    MemChunk chunk = new MemChunk();
-                    chunk.diskOffset = diskOffset;
-                    chunk.data = new byte[ONE_MB * MEM_CHUNK_SIZE_MB - 48]; // -48 for MemChunk overhead
-                    if (chunkCount == 0) {
-                        firstChunk = lastChunk = chunk;
-                    } else {
-                        lastChunk.next = chunk;
-                        lastChunk = chunk;
-                    }
-                    chunkCount++;
-                }
-
-                int n = Math.min(lastChunk.freeSpace(), length);
-                System.arraycopy(bytes, offset, lastChunk.data, lastChunk.length, n);
-                lastChunk.length += n;
-
-                if (n > 0)
-                    asyncFlush(lastChunk, diskOffset, n);
-
-                return n;
-            }
-        }
-
-        private void asyncFlush(MemChunk lastChunk, long diskOffset, int n) {
-            if (asyncFlushChunk == null) {
-                asyncFlushChunk = lastChunk;
-                asyncFlushDiskOffset = diskOffset;
-            }
-
-            if (asyncFlusher == null) {
-                asyncFlusher = new Thread() {
-                    public void run() {
-                        asyncFlushException = null;
-                        if (debug)
-                            logger.debug(GTMemDiskStore.this + " async flush started @ " + asyncFlushDiskOffset);
-                        try {
-                            while (writeActivated) {
-                                flushToDisk();
-                                Thread.sleep(10);
-                            }
-                            flushToDisk();
-
-                            if (debug)
-                                logger.debug(GTMemDiskStore.this + " async flush ended @ " + asyncFlushDiskOffset);
-
-                            synchronized (lock) {
-                                asyncFlusher = null;
-                                asyncFlushChunk = null;
-                                if (ongoingWriter.closed) {
-                                    ongoingWriter.close(); // call writer.close() again to clean up
-                                }
-                            }
-                        } catch (Throwable ex) {
-                            asyncFlushException = ex;
-                        }
-                    }
-                };
-                asyncFlusher.start();
-            }
-        }
-
-        private void flushToDisk() throws IOException {
-            byte[] data;
-            int offset = 0;
-            int length = 0;
-            int flushedLen = 0;
-
-            while (true) {
-                data = null;
-                synchronized (lock) {
-                    asyncFlushDiskOffset += flushedLen; // bytes written in last loop
-                    //                    if (debug)
-                    //                        logger.debug(GTMemDiskStore.this + " async flush @ " + asyncFlushDiskOffset);
-                    if (asyncFlushChunk != null && asyncFlushChunk.tailOffset() == asyncFlushDiskOffset) {
-                        asyncFlushChunk = asyncFlushChunk.next;
-                    }
-                    if (asyncFlushChunk != null) {
-                        data = asyncFlushChunk.data;
-                        offset = (int) (asyncFlushDiskOffset - asyncFlushChunk.headOffset());
-                        length = asyncFlushChunk.length - offset;
-                    }
-                }
-
-                if (data == null)
-                    break;
-
-                flushedLen = diskPart.write(asyncFlushDiskOffset, data, offset, length);
-            }
-        }
-
-        @Override
-        public int freeUp(int mb) {
-            synchronized (lock) {
-                int mbReleased = 0;
-                while (chunkCount > 0 && mbReleased < mb) {
-                    if (firstChunk == asyncFlushChunk)
-                        break;
-
-                    mbReleased += MEM_CHUNK_SIZE_MB;
-                    chunkCount--;
-                    if (chunkCount == 0) {
-                        firstChunk = lastChunk = null;
-                    } else {
-                        MemChunk next = firstChunk.next;
-                        firstChunk.next = null;
-                        firstChunk = next;
-                    }
-                }
-                return mbReleased;
-            }
-        }
-
-        public void activateMemWrite() {
-            if (budgetCtrl.getTotalBudgetMB() > 0) {
-                writeActivated = true;
-                if (debug)
-                    logger.debug(GTMemDiskStore.this + " mem write activated");
-            }
-        }
-
-        public void deactivateMemWrite() {
-            writeActivated = false;
-            if (debug)
-                logger.debug(GTMemDiskStore.this + " mem write de-activated");
-        }
-
-        public void clear() {
-            chunkCount = 0;
-            firstChunk = lastChunk = null;
-            budgetCtrl.reserve(this, 0);
-        }
-
-        @Override
-        public void close() throws IOException {
-            synchronized (lock) {
-                if (asyncFlushException != null)
-                    throwAsyncException(asyncFlushException);
-            }
-            try {
-                asyncFlusher.join();
-            } catch (NullPointerException npe) {
-                // that's fine, async flusher may not present
-            } catch (InterruptedException e) {
-                logger.warn("async join interrupted", e);
-            }
-            synchronized (lock) {
-                if (asyncFlushException != null)
-                    throwAsyncException(asyncFlushException);
-
-                clear();
-            }
-        }
-
-        private void throwAsyncException(Throwable ex) throws IOException {
-            if (ex instanceof IOException)
-                throw (IOException) ex;
-            else
-                throw new IOException(ex);
-        }
-
-        @Override
-        public String toString() {
-            return GTMemDiskStore.this.toString();
-        }
-
-    }
-
-    private class DiskPart implements Closeable {
-        final File diskFile;
-        FileChannel writeChannel;
-        FileChannel readChannel;
-        int readerCount = 0; // allow parallel readers
-        long tailOffset;
-
-        DiskPart(File diskFile) throws IOException {
-            this.diskFile = diskFile;
-            this.tailOffset = diskFile.length();
-            if (debug)
-                logger.debug(GTMemDiskStore.this + " disk file " + diskFile.getAbsolutePath());
-        }
-
-        public void openRead() throws IOException {
-            if (readChannel == null) {
-                readChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.READ);
-            }
-            readerCount++;
-        }
-
-        public int read(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
-            return readChannel.read(ByteBuffer.wrap(bytes, offset, length), diskOffset);
-        }
-
-        public void closeRead() throws IOException {
-            closeRead(false);
-        }
-
-        private void closeRead(boolean force) throws IOException {
-            readerCount--;
-            if (readerCount == 0 || force) {
-                if (readChannel != null) {
-                    readChannel.close();
-                    readChannel = null;
-                }
-            }
-        }
-
-        public void openWrite(boolean append) throws IOException {
-            if (append) {
-                writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.APPEND, StandardOpenOption.WRITE);
-                tailOffset = diskFile.length();
-            } else {
-                diskFile.delete();
-                writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
-                tailOffset = 0;
-            }
-        }
-
-        public int write(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
-            synchronized (lock) {
-                int n = writeChannel.write(ByteBuffer.wrap(bytes, offset, length), diskOffset);
-                tailOffset = Math.max(diskOffset + n, tailOffset);
-                return n;
-            }
-        }
-
-        public void closeWrite() throws IOException {
-            if (writeChannel != null) {
-                writeChannel.close();
-                writeChannel = null;
-            }
-        }
-
-        public void clear() throws IOException {
-            diskFile.delete();
-            tailOffset = 0;
-        }
-
-        @Override
-        public void close() throws IOException {
-            synchronized (lock) {
-                closeWrite();
-                closeRead(true);
-                if (delOnClose) {
-                    diskFile.delete();
-                }
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemoryBudgetController.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemoryBudgetController.java
deleted file mode 100644
index c4efa75..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemoryBudgetController.java
+++ /dev/null
@@ -1,233 +0,0 @@
-package org.apache.kylin.storage.gridtable.memstore;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MemoryBudgetController {
-
-    private static final boolean debug = true;
-
-    public static interface MemoryConsumer {
-        // return number MB released
-        int freeUp(int mb);
-    }
-
-    @SuppressWarnings("serial")
-    public static class NotEnoughBudgetException extends IllegalStateException {
-
-        public NotEnoughBudgetException() {
-            super();
-        }
-
-        public NotEnoughBudgetException(Throwable cause) {
-            super(cause);
-        }
-    }
-
-    private static class ConsumerEntry {
-        final MemoryConsumer consumer;
-        int reservedMB;
-
-        ConsumerEntry(MemoryConsumer consumer) {
-            this.consumer = consumer;
-        }
-    }
-
-    public static final MemoryBudgetController ZERO_BUDGET = new MemoryBudgetController(0);
-    public static final int ONE_MB = 1024 * 1024;
-
-    private static final Logger logger = LoggerFactory.getLogger(MemoryBudgetController.class);
-
-    // all budget numbers are in MB
-    private final int totalBudgetMB;
-    private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
-    private int totalReservedMB;
-    private final ReentrantLock lock = new ReentrantLock();
-
-    public MemoryBudgetController(int totalBudgetMB) {
-        if (totalBudgetMB < 0)
-            throw new IllegalArgumentException();
-        if (totalBudgetMB > getSystemAvailMB())
-            throw new IllegalStateException();
-
-        this.totalBudgetMB = totalBudgetMB;
-        this.totalReservedMB = 0;
-    }
-
-    public int getTotalBudgetMB() {
-        return totalBudgetMB;
-    }
-
-    public int getTotalReservedMB() {
-        lock.lock();
-        try {
-            return totalReservedMB;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public int getRemainingBudgetMB() {
-        lock.lock();
-        try {
-            return totalBudgetMB - totalReservedMB;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public void reserveInsist(MemoryConsumer consumer, int requestMB) {
-        long waitStart = 0;
-        while (true) {
-            try {
-                reserve(consumer, requestMB);
-                if (debug && waitStart > 0)
-                    logger.debug(consumer + " waited " + (System.currentTimeMillis() - waitStart) + " ms on the " + requestMB + " MB request");
-                return;
-            } catch (NotEnoughBudgetException ex) {
-                // retry
-            }
-
-            if (waitStart == 0)
-                waitStart = System.currentTimeMillis();
-
-            synchronized (lock) {
-                try {
-                    lock.wait();
-                } catch (InterruptedException e) {
-                    throw new NotEnoughBudgetException(e);
-                }
-            }
-        }
-    }
-
-    /** reserve without wait, fail with NotEnoughBudgetException immediately if no mem */
-    public void reserve(MemoryConsumer consumer, int requestMB) {
-        if (totalBudgetMB == 0 && requestMB > 0)
-            throw new NotEnoughBudgetException();
-
-        boolean ok = false;
-        while (!ok) {
-            int gap = calculateGap(consumer, requestMB);
-            if (gap > 0) {
-                // to void deadlock, don't hold lock when invoking consumer.freeUp()
-                tryFreeUp(gap);
-            }
-            ok = updateBooking(consumer, requestMB);
-        }
-    }
-
-    private int calculateGap(MemoryConsumer consumer, int requestMB) {
-        lock.lock();
-        try {
-            ConsumerEntry entry = booking.get(consumer);
-            int curMB = entry == null ? 0 : entry.reservedMB;
-            int delta = requestMB - curMB;
-            return delta - (totalBudgetMB - totalReservedMB);
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    private void tryFreeUp(int gap) {
-        // note don't hold lock when calling consumer.freeUp(), that method holding lock for itself and may cause deadlock
-        for (ConsumerEntry entry : booking.values()) {
-            int mb = entry.consumer.freeUp(gap);
-            if (mb > 0) {
-                lock.lock();
-                try {
-                    updateBookingWithDelta(entry.consumer, -mb);
-                } finally {
-                    lock.unlock();
-                }
-                gap -= mb;
-                if (gap <= 0)
-                    break;
-            }
-        }
-        if (gap > 0)
-            throw new NotEnoughBudgetException();
-
-        if (debug) {
-            if (getSystemAvailMB() < getRemainingBudgetMB()) {
-                logger.debug("Remaining budget is " + getRemainingBudgetMB() + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong.");
-            }
-        }
-    }
-
-    private boolean updateBooking(MemoryConsumer consumer, int requestMB) {
-        lock.lock();
-        try {
-            ConsumerEntry entry = booking.get(consumer);
-            if (entry == null) {
-                if (requestMB == 0)
-                    return true;
-
-                entry = new ConsumerEntry(consumer);
-                booking.put(consumer, entry);
-            }
-
-            int delta = requestMB - entry.reservedMB;
-            return updateBookingWithDelta(consumer, delta);
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    // lock MUST be obtained before entering
-    private boolean updateBookingWithDelta(MemoryConsumer consumer, int delta) {
-        if (delta == 0)
-            return true;
-
-        ConsumerEntry entry = booking.get(consumer);
-        if (entry == null) {
-            if (delta <= 0)
-                return true;
-
-            entry = new ConsumerEntry(consumer);
-            booking.put(consumer, entry);
-        }
-
-        // double check gap again, it may be changed by other concurrent requests
-        if (delta > 0) {
-            int gap = delta - (totalBudgetMB - totalReservedMB);
-            if (gap > 0)
-                return false;
-        }
-
-        totalReservedMB += delta;
-        entry.reservedMB += delta;
-        if (entry.reservedMB == 0) {
-            booking.remove(entry.consumer);
-        }
-        if (debug) {
-            logger.debug(entry.consumer + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB");
-        }
-
-        if (delta < 0) {
-            synchronized (lock) {
-                lock.notifyAll();
-            }
-        }
-
-        return true;
-    }
-
-    public static long getSystemAvailBytes() {
-        Runtime runtime = Runtime.getRuntime();
-        long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
-        long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
-        long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
-        long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
-        long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
-        return availableMemory;
-    }
-
-    public static int getSystemAvailMB() {
-        return (int) (getSystemAvailBytes() / ONE_MB);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/storage/src/test/java/org/apache/kylin/storage/gridtable/AggregationCacheMemSizeTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/AggregationCacheMemSizeTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/AggregationCacheMemSizeTest.java
index 2cafde0..6520795 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/AggregationCacheMemSizeTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/AggregationCacheMemSizeTest.java
@@ -1,3 +1,20 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package org.apache.kylin.storage.gridtable;
 
 import java.math.BigDecimal;
@@ -15,10 +32,8 @@ import org.apache.kylin.metadata.measure.DoubleSumAggregator;
 import org.apache.kylin.metadata.measure.HLLCAggregator;
 import org.apache.kylin.metadata.measure.LongSumAggregator;
 import org.apache.kylin.metadata.measure.MeasureAggregator;
-import org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController;
 import org.junit.Test;
 
-/** Note: Execute each test alone to get accurate size estimate. */
 public class AggregationCacheMemSizeTest {
 
     public static final int NUM_OF_OBJS = 1000000 / 2;
@@ -182,7 +197,17 @@ public class AggregationCacheMemSizeTest {
     private long memLeft() throws InterruptedException {
         Runtime.getRuntime().gc();
         Thread.sleep(500);
-        return MemoryBudgetController.getSystemAvailBytes();
+        return getSystemAvailBytes();
+    }
+
+    private long getSystemAvailBytes() {
+        Runtime runtime = Runtime.getRuntime();
+        long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
+        long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
+        long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
+        long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
+        long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
+        return availableMemory;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
index d9d1eff..0dec3c3 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
@@ -1,3 +1,20 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package org.apache.kylin.storage.gridtable;
 
 import static org.junit.Assert.*;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/765dfca8/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
deleted file mode 100644
index 4441687..0000000
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package org.apache.kylin.storage.gridtable;
-
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.kylin.storage.gridtable.memstore.GTMemDiskStore;
-import org.apache.kylin.storage.gridtable.memstore.MemoryBudgetController;
-import org.junit.Test;
-
-public class GTMemDiskStoreTest {
-
-    final MemoryBudgetController budgetCtrl = new MemoryBudgetController(20);
-    final GTInfo info = SimpleGridTableTest.advancedInfo();
-    final List<GTRecord> data = SimpleGridTableTest.mockupData(info, 1000000); // converts to about 34 MB data
-
-    @Test
-    public void testSingleThreadWriteRead() throws IOException {
-        long start = System.currentTimeMillis();
-        verifyOneTableWriteAndRead();
-        long end = System.currentTimeMillis();
-        System.out.println("Cost " + (end - start) + " millis");
-    }
-
-    @Test
-    public void testMultiThreadWriteRead() throws IOException, InterruptedException {
-        long start = System.currentTimeMillis();
-
-        int nThreads = 5;
-        Thread[] t = new Thread[nThreads];
-        for (int i = 0; i < nThreads; i++) {
-            t[i] = new Thread() {
-                public void run() {
-                    try {
-                        verifyOneTableWriteAndRead();
-                    } catch (Exception ex) {
-                        ex.printStackTrace();
-                    }
-                }
-            };
-            t[i].start();
-        }
-        for (int i = 0; i < nThreads; i++) {
-            t[i].join();
-        }
-
-        long end = System.currentTimeMillis();
-        System.out.println("Cost " + (end - start) + " millis");
-    }
-
-    private void verifyOneTableWriteAndRead() throws IOException {
-        GTMemDiskStore store = new GTMemDiskStore(info, budgetCtrl);
-        GridTable table = new GridTable(info, store);
-        verifyWriteAndRead(table);
-    }
-
-    private void verifyWriteAndRead(GridTable table) throws IOException {
-        GTInfo info = table.getInfo();
-
-        GTBuilder builder = table.rebuild();
-        for (GTRecord r : data) {
-            builder.write(r);
-        }
-        builder.close();
-
-        IGTScanner scanner = table.scan(new GTScanRequest(info));
-        int i = 0;
-        for (GTRecord r : scanner) {
-            assertEquals(data.get(i++), r);
-        }
-        scanner.close();
-    }
-}