You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/05/28 15:44:13 UTC

incubator-kylin git commit: KYLIN-786, MemDiskStore that hold cuboid data in mem as much as possible

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 61779f08c -> 4631e40a7


KYLIN-786, MemDiskStore that hold cuboid data in mem as much as possible


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/4631e40a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/4631e40a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/4631e40a

Branch: refs/heads/0.8.0
Commit: 4631e40a7931d93ec681332d2bfbd9b41f0613f5
Parents: 61779f0
Author: Li, Yang <ya...@ebay.com>
Authored: Wed May 27 20:29:02 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Thu May 28 21:41:08 2015 +0800

----------------------------------------------------------------------
 .../common/util/MemoryBudgetController.java     | 142 +++++
 .../common/util/MemoryBudgetControllerTest.java |  60 ++
 .../kylin/storage/gridtable/GTRowBlock.java     |  88 +--
 .../gridtable/memstore/GTMemDiskStore.java      | 573 +++++++++++++++++++
 .../storage/gridtable/DictGridTableTest.java    |   2 +-
 .../storage/gridtable/GTMemDiskStoreTest.java   |  74 +++
 .../storage/gridtable/SimpleGridTableTest.java  |  80 ++-
 7 files changed, 960 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4631e40a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
new file mode 100644
index 0000000..06d3cd2
--- /dev/null
+++ b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
@@ -0,0 +1,142 @@
+package org.apache.kylin.common.util;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemoryBudgetController {
+
+    public static interface MemoryConsumer {
+        // return number MB released
+        int freeUp(int mb);
+    }
+
+    @SuppressWarnings("serial")
+    public static class NotEnoughBudgetException extends IllegalStateException {
+    }
+
+    private static class ConsumerEntry {
+        final MemoryConsumer consumer;
+        int reservedMB;
+
+        ConsumerEntry(MemoryConsumer consumer) {
+            this.consumer = consumer;
+        }
+    }
+
+    public static final int ONE_MB = 1024 * 1024;
+    public static final int SYSTEM_RESERVED = 200;
+
+    private static final Logger logger = LoggerFactory.getLogger(MemoryBudgetController.class);
+
+    // all budget numbers are in MB
+    private final int totalBudgetMB;
+    private final AtomicInteger totalReservedMB;
+    private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>();
+
+
+    public MemoryBudgetController(int totalBudgetMB) {
+        if (totalBudgetMB < 0)
+            throw new IllegalArgumentException();
+        if (checkSystemAvailMB(totalBudgetMB) == false)
+            throw new IllegalStateException();
+
+        this.totalBudgetMB = totalBudgetMB;
+        this.totalReservedMB = new AtomicInteger();
+    }
+
+    public int getTotalBudgetMB() {
+        return totalBudgetMB;
+    }
+
+    public int getTotalReservedMB() {
+        return totalReservedMB.get();
+    }
+
+    public int getRemainingBudgetMB() {
+        return totalBudgetMB - totalReservedMB.get();
+    }
+
+    public void reserve(MemoryConsumer consumer, int requestMB) {
+        if (totalBudgetMB == 0 && requestMB > 0)
+            throw new NotEnoughBudgetException();
+        
+        ConsumerEntry entry = booking.get(consumer);
+        if (entry == null) {
+            booking.putIfAbsent(consumer, new ConsumerEntry(consumer));
+            entry = booking.get(consumer);
+        }
+
+        int delta = requestMB - entry.reservedMB;
+
+        if (delta > 0) {
+            checkFreeMemoryAndUpdateBooking(entry, delta);
+        } else {
+            updateBooking(entry, delta);
+        }
+    }
+
+    synchronized private void updateBooking(ConsumerEntry entry, int delta) {
+        totalReservedMB.addAndGet(delta);
+        entry.reservedMB += delta;
+        if (entry.reservedMB == 0) {
+            booking.remove(entry.consumer);
+        }
+        if (delta < 0) {
+            this.notify();
+        }
+        logger.debug(entry.consumer + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB");
+    }
+
+    private void checkFreeMemoryAndUpdateBooking(ConsumerEntry consumer, int delta) {
+        while (true) {
+            // if budget is not enough, try free up 
+            while (delta > totalBudgetMB - totalReservedMB.get()) {
+                int freeUpToGo = delta;
+                for (ConsumerEntry entry : booking.values()) {
+                    int mb = entry.consumer.freeUp(freeUpToGo);
+                    updateBooking(entry, -mb);
+                    freeUpToGo -= mb;
+                    if (freeUpToGo <= 0)
+                        break;
+                }
+                if (freeUpToGo > 0)
+                    throw new NotEnoughBudgetException();
+            }
+
+            if (checkSystemAvailMB(delta))
+                break;
+
+            try {
+                logger.debug("Remaining budget is " + getRemainingBudgetMB() + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong.");
+                this.wait(200);
+            } catch (InterruptedException e) {
+                logger.error("Interrupted while wait free memory", e);
+            }
+        }
+
+        updateBooking(consumer, delta);
+    }
+
+    private boolean checkSystemAvailMB(int mb) {
+        return getSystemAvailMB() - SYSTEM_RESERVED >= mb;
+    }
+
+    public static int getSystemAvailMB() {
+        Runtime runtime = Runtime.getRuntime();
+        long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process
+        long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free
+        long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting
+        long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using
+        long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used
+        int availMB = (int) (availableMemory / ONE_MB);
+        return availMB;
+    }
+
+    public static int getMaxPossibleBudget() {
+        return getSystemAvailMB() - SYSTEM_RESERVED - 1; // -1 for some extra buffer
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4631e40a/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java b/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java
new file mode 100644
index 0000000..bc2eadd
--- /dev/null
+++ b/common/src/test/java/org/apache/kylin/common/util/MemoryBudgetControllerTest.java
@@ -0,0 +1,60 @@
+package org.apache.kylin.common.util;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.junit.Test;
+
+public class MemoryBudgetControllerTest {
+
+    @Test
+    public void test() {
+        int n = MemoryBudgetController.getMaxPossibleBudget() / 2;
+        MemoryBudgetController mbc = new MemoryBudgetController(n);
+
+        ArrayList<OneMB> mbList = new ArrayList<OneMB>();
+        for (int i = 0; i < n; i++) {
+            mbList.add(new OneMB(mbc));
+            assertEquals(mbList.size(), mbc.getTotalReservedMB());
+        }
+
+        mbc.reserve(new OneMB(), n);
+
+        for (int i = 0; i < n; i++) {
+            assertEquals(null, mbList.get(i).data);
+        }
+
+        try {
+            mbc.reserve(new OneMB(), 1);
+            fail();
+        } catch (IllegalStateException ex) {
+            // expected
+        }
+    }
+
+    class OneMB implements MemoryBudgetController.MemoryConsumer {
+
+        byte[] data;
+
+        OneMB() {
+        }
+
+        OneMB(MemoryBudgetController mbc) {
+            mbc.reserve(this, 1);
+            data = new byte[MemoryBudgetController.ONE_MB - 24]; // 24 is object shell of this + object shell of data + reference of data 
+        }
+
+        @Override
+        public int freeUp(int mb) {
+            if (data != null) {
+                data = null;
+                return 1;
+            } else {
+                return 0;
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4631e40a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
index 4a68659..eed5698 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
@@ -1,5 +1,6 @@
 package org.apache.kylin.storage.gridtable;
 
+import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -40,36 +41,36 @@ public class GTRowBlock {
             this.cellBlocks[i] = new ByteArray();
         }
     }
-    
+
     public int getSequenceId() {
         return seqId;
     }
-    
+
     public ByteArray getPrimaryKey() {
         return primaryKey;
     }
-    
+
     public ByteArray getCellBlock(int i) {
         return cellBlocks[i];
     }
-    
+
     public Writer getWriter() {
         return new Writer();
     }
 
     public class Writer {
         ByteBuffer[] cellBlockBuffers;
-        
+
         Writer() {
             cellBlockBuffers = new ByteBuffer[info.colBlocks.length];
             for (int i = 0; i < cellBlockBuffers.length; i++) {
                 cellBlockBuffers[i] = cellBlocks[i].asBuffer();
             }
         }
-        
+
         public void copyFrom(GTRowBlock other) {
             assert info == other.info;
-            
+
             seqId = other.seqId;
             nRows = other.nRows;
             primaryKey.copyFrom(other.primaryKey);
@@ -89,7 +90,7 @@ public class GTRowBlock {
             }
             nRows++;
         }
-        
+
         public void readyForFlush() {
             for (int i = 0; i < cellBlocks.length; i++) {
                 cellBlocks[i].setLength(cellBlockBuffers[i].position());
@@ -104,21 +105,21 @@ public class GTRowBlock {
             }
         }
     }
-    
+
     public Reader getReader() {
         return new Reader(info.colBlocksAll);
     }
-    
+
     public Reader getReader(BitSet selectedColBlocks) {
         return new Reader(selectedColBlocks);
     }
-    
+
     public class Reader {
         int cur;
         ByteBuffer primaryKeyBuffer;
         ByteBuffer[] cellBlockBuffers;
         BitSet selectedColBlocks;
-        
+
         Reader(BitSet selectedColBlocks) {
             primaryKeyBuffer = primaryKey.asBuffer();
             cellBlockBuffers = new ByteBuffer[info.colBlocks.length];
@@ -127,15 +128,15 @@ public class GTRowBlock {
             }
             this.selectedColBlocks = selectedColBlocks;
         }
-        
+
         public boolean hasNext() {
             return cur < nRows;
         }
-        
+
         public void fetchNext(GTRecord result) {
             if (hasNext() == false)
                 throw new IllegalArgumentException();
-            
+
             for (int c = selectedColBlocks.nextSetBit(0); c >= 0; c = selectedColBlocks.nextSetBit(c + 1)) {
                 result.loadCellBlock(c, cellBlockBuffers[c]);
             }
@@ -153,7 +154,7 @@ public class GTRowBlock {
 
         return copy;
     }
-    
+
     public boolean isEmpty() {
         return nRows == 0;
     }
@@ -164,7 +165,7 @@ public class GTRowBlock {
         else
             return nRows > 0;
     }
-    
+
     public int getNumberOfRows() {
         return nRows;
     }
@@ -172,36 +173,41 @@ public class GTRowBlock {
     public void setNumberOfRows(int nRows) {
         this.nRows = nRows;
     }
-
-    // TODO export / load should optimize for disabled row block
     
+    // ============================================================================
+
     public int exportLength() {
-        int len = 4 + 4 + (4 + primaryKey.length());
+        int len = 4; // seq Id
+        if (info.isRowBlockEnabled())
+            len += 4; // nRows
+        len += 4 + primaryKey.length(); // PK byte array
         for (ByteArray array : cellBlocks) {
-            len += 4 + array.length();
+            len += 4 + array.length(); // cell block byte array
         }
         return len;
     }
 
-    public void export(DataOutputStream dataOutputStream) throws IOException {
-        dataOutputStream.writeInt(seqId);
-        dataOutputStream.writeInt(nRows);
-        export(dataOutputStream, primaryKey);
+    /** write data to given output stream, like serialize */
+    public void export(DataOutputStream out) throws IOException {
+        out.writeInt(seqId);
+        if (info.isRowBlockEnabled())
+            out.writeInt(nRows);
+        export(out, primaryKey);
         for (ByteArray cb : cellBlocks) {
-            export(dataOutputStream, cb);
+            export(out, cb);
         }
     }
 
-    private void export(DataOutputStream dataOutputStream, ByteArray array) throws IOException {
-        dataOutputStream.writeInt(array.length());
-        dataOutputStream.write(array.array(), array.offset(), array.length());
+    private void export(DataOutputStream out, ByteArray array) throws IOException {
+        out.writeInt(array.length());
+        out.write(array.array(), array.offset(), array.length());
     }
 
-
     /** write data to given buffer, like serialize */
     public void export(ByteBuffer buf) {
         buf.putInt(seqId);
-        buf.putInt(nRows);
+        if (info.isRowBlockEnabled())
+            buf.putInt(nRows);
         export(primaryKey, buf);
         for (ByteArray cb : cellBlocks) {
             export(cb, buf);
@@ -212,11 +218,29 @@ public class GTRowBlock {
         buf.putInt(array.length());
         buf.put(array.array(), array.offset(), array.length());
     }
+    
+    /** read data from given input stream, like deserialize */
+    public void importFrom(DataInputStream in) throws IOException {
+        seqId = in.readInt();
+        nRows = info.isRowBlockEnabled() ? in.readInt() : 1;
+        importFrom(in, primaryKey);
+        for (int i = 0; i < info.colBlocks.length; i++) {
+            ByteArray cb = cellBlocks[i];
+            importFrom(in, cb);
+        }
+    }
+
+    private void importFrom(DataInputStream in, ByteArray result) throws IOException {
+        byte[] data = result.array();
+        int len = in.readInt();
+        in.read(data, 0, len);
+        result.set(data, 0, len);
+    }
 
     /** change pointers to point to data in given buffer, UNLIKE deserialize */
     public void load(ByteBuffer buf) {
         seqId = buf.getInt();
-        nRows = buf.getInt();
+        nRows = info.isRowBlockEnabled() ? buf.getInt() : 1;
         load(primaryKey, buf);
         for (int i = 0; i < info.colBlocks.length; i++) {
             ByteArray cb = cellBlocks[i];

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4631e40a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
new file mode 100644
index 0000000..4ae4f5c
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
@@ -0,0 +1,573 @@
+package org.apache.kylin.storage.gridtable.memstore;
+
+import static org.apache.kylin.common.util.MemoryBudgetController.*;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.BitSet;
+import java.util.NoSuchElementException;
+
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.common.util.MemoryBudgetController.MemoryConsumer;
+import org.apache.kylin.common.util.MemoryBudgetController.NotEnoughBudgetException;
+import org.apache.kylin.storage.gridtable.GTInfo;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTRowBlock;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.IGTStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GTMemDiskStore implements IGTStore, Closeable {
+
+    private static final Logger logger = LoggerFactory.getLogger(GTMemDiskStore.class);
+    private static final int STREAM_BUFFER_SIZE = 8192;
+    private static final int MEM_CHUNK_SIZE_MB = 1;
+
+    final GTInfo info;
+    final MemPart memPart;
+    final DiskPart diskPart;
+
+    public GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl) throws IOException {
+        this(info, budgetCtrl, File.createTempFile("GTMemDiskStore", ""), true);
+    }
+
+    public GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile) throws IOException {
+        this(info, budgetCtrl, diskFile, false);
+    }
+
+    private GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile, boolean delOnExit) throws IOException {
+        this.info = info;
+        this.memPart = new MemPart(budgetCtrl);
+        this.diskPart = new DiskPart(diskFile);
+
+        if (delOnExit)
+            diskFile.deleteOnExit();
+    }
+
+    @Override
+    public GTInfo getInfo() {
+        return info;
+    }
+
+    @Override
+    public IGTStoreWriter rebuild(int shard) throws IOException {
+        return new Writer(0);
+    }
+
+    @Override
+    public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException {
+        return new Writer(diskPart.tailOffset);
+    }
+
+    @Override
+    public IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, BitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
+        return new Reader();
+    }
+
+    @Override
+    public void close() throws IOException {
+        memPart.close();
+        diskPart.close();
+    }
+
+    @Override
+    public String toString() {
+        return "MemDiskStore@" + this.hashCode();
+    }
+
+    private class Reader implements IGTStoreScanner {
+
+        final DataInputStream din;
+        long diskOffset = 0;
+        long memRead = 0;
+        long diskRead = 0;
+        int nReadCalls = 0;
+
+        GTRowBlock block = GTRowBlock.allocate(info);
+        GTRowBlock next = null;
+
+        Reader() throws IOException {
+            diskPart.openRead();
+            logger.debug(GTMemDiskStore.this + " read start @ " + diskOffset);
+
+            InputStream in = new InputStream() {
+                byte[] tmp = new byte[1];
+                MemChunk memChunk;
+
+                @Override
+                public int read() throws IOException {
+                    int n = read(tmp, 0, 1);
+                    if (n <= 0)
+                        return -1;
+                    else
+                        return (int) tmp[0];
+                }
+
+                @Override
+                public int read(byte[] b, int off, int len) throws IOException {
+                    nReadCalls++;
+                    if (available() <= 0)
+                        return -1;
+
+                    if (memChunk == null && memPart.headOffset() <= diskOffset && diskOffset < memPart.tailOffset()) {
+                        memChunk = memPart.seekMemChunk(diskOffset);
+                    }
+
+                    int lenToGo = Math.min(available(), len);
+
+                    int nRead = 0;
+                    while (lenToGo > 0) {
+                        int n;
+                        if (memChunk != null) {
+                            if (memChunk.headOffset() > diskOffset) {
+                                memChunk = null;
+                                continue;
+                            }
+                            if (diskOffset >= memChunk.tailOffset()) {
+                                memChunk = memChunk.next;
+                                continue;
+                            }
+                            int chunkOffset = (int) (diskOffset - memChunk.headOffset());
+                            n = Math.min((int) (memChunk.tailOffset() - diskOffset), lenToGo);
+                            System.arraycopy(memChunk.data, chunkOffset, b, off, n);
+                            memRead += n;
+                        } else {
+                            n = diskPart.read(diskOffset, b, off, lenToGo);
+                            diskRead += n;
+                        }
+                        lenToGo -= n;
+                        nRead += n;
+                        off += n;
+                        diskOffset += n;
+                    }
+                    return nRead;
+                }
+
+                @Override
+                public int available() throws IOException {
+                    return (int) (diskPart.tailOffset - diskOffset);
+                }
+            };
+
+            din = new DataInputStream(new BufferedInputStream(in, STREAM_BUFFER_SIZE));
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (next != null)
+                return true;
+
+            try {
+                if (din.available() > 0) {
+                    block.importFrom(din);
+                    next = block;
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+
+            return next != null;
+        }
+
+        @Override
+        public GTRowBlock next() {
+            if (next == null) {
+                hasNext();
+                if (next == null)
+                    throw new NoSuchElementException();
+            }
+            GTRowBlock r = next;
+            next = null;
+            return r;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void close() throws IOException {
+            din.close();
+            diskPart.closeRead();
+            logger.debug(GTMemDiskStore.this + " read end @ " + diskOffset + ", " + (memRead) + " from mem, " + (diskRead) + " from disk, " + nReadCalls + " read() calls");
+        }
+
+    }
+
+    private class Writer implements IGTStoreWriter {
+
+        final DataOutputStream dout;
+        long diskOffset;
+        long memWrite = 0;
+        long diskWrite = 0;
+        int nWriteCalls;
+
+        Writer(long startOffset) throws IOException {
+            diskOffset = 0; // TODO does not support append yet
+            memPart.clear();
+            diskPart.clear();
+            diskPart.openWrite(false);
+            logger.debug(GTMemDiskStore.this + " write start @ " + diskOffset);
+
+            memPart.activateMemWrite();
+
+            OutputStream out = new OutputStream() {
+                byte[] tmp = new byte[1];
+                boolean memPartActivated = true;
+
+                @Override
+                public void write(int b) throws IOException {
+                    tmp[0] = (byte) b;
+                    write(tmp, 0, 1);
+                }
+
+                @Override
+                public void write(byte[] bytes, int offset, int length) throws IOException {
+                    nWriteCalls++;
+                    while (length > 0) {
+                        int n;
+                        if (memPartActivated) {
+                            n = memPart.write(bytes, offset, length, diskOffset);
+                            memWrite += n;
+                            if (n == 0) {
+                                memPartActivated = false;
+                            }
+                        } else {
+                            n = diskPart.write(diskOffset, bytes, offset, length);
+                            diskWrite += n;
+                        }
+                        offset += n;
+                        length -= n;
+                        diskOffset += n;
+                    }
+                }
+            };
+            dout = new DataOutputStream(new BufferedOutputStream(out, STREAM_BUFFER_SIZE));
+        }
+
+        @Override
+        public void write(GTRowBlock block) throws IOException {
+            block.export(dout);
+        }
+
+        @Override
+        public void close() throws IOException {
+            dout.close();
+            memPart.finishAsyncFlush();
+            diskPart.closeWrite();
+            assert diskOffset == diskPart.tailOffset;
+            logger.debug(GTMemDiskStore.this + " write end @ " + diskOffset + ", " + (memWrite) + " to mem, " + (diskWrite) + " to disk, " + nWriteCalls + " write() calls");
+        }
+    }
+
+    private static class MemChunk {
+        long diskOffset;
+        int length;
+        byte[] data;
+        MemChunk next;
+
+        boolean isFull() {
+            return length == data.length;
+        }
+
+        long headOffset() {
+            return diskOffset;
+        }
+
+        long tailOffset() {
+            return diskOffset + length;
+        }
+
+        int freeSpace() {
+            return data.length - length;
+        }
+    }
+
+    private class MemPart implements Closeable, MemoryConsumer {
+
+        final MemoryBudgetController budgetCtrl;
+
+        // read & write won't go together, but write() / asyncDiskWrite() / freeUp() can happen at the same time
+        volatile boolean writeActivated;
+        MemChunk firstChunk;
+        MemChunk lastChunk;
+        int chunkCount;
+
+        Thread asyncFlusher;
+        MemChunk asyncFlushChunk;
+        long asyncFlushDiskOffset;
+        Throwable asyncFlushException;
+
+        MemPart(MemoryBudgetController budgetCtrl) {
+            this.budgetCtrl = budgetCtrl;
+        }
+
+        long headOffset() {
+            return firstChunk == null ? 0 : firstChunk.headOffset();
+        }
+
+        long tailOffset() {
+            return lastChunk == null ? 0 : lastChunk.tailOffset();
+        }
+
+        synchronized public MemChunk seekMemChunk(long diskOffset) {
+            MemChunk c = firstChunk;
+            while (c != null && c.headOffset() <= diskOffset) {
+                if (diskOffset < c.tailOffset())
+                    break;
+                c = c.next;
+            }
+            return c;
+        }
+
+        public int write(byte[] bytes, int offset, int length, long diskOffset) {
+            int needMoreMem = 0;
+
+            synchronized (this) {
+                if (writeActivated == false)
+                    return 0;
+
+                // write is only expected at the tail
+                if (diskOffset != tailOffset())
+                    return 0;
+
+                if (chunkCount == 0 || lastChunk.isFull())
+                    needMoreMem = (chunkCount + 1) * MEM_CHUNK_SIZE_MB;
+            }
+
+            // call to budgetCtrl.reserve() out of synchronized block, or deadlock may happen between MemoryConsumers
+            if (needMoreMem > 0) {
+                try {
+                    budgetCtrl.reserve(this, needMoreMem);
+                } catch (NotEnoughBudgetException ex) {
+                    deactivateMemWrite();
+                    return 0;
+                }
+            }
+
+            synchronized (this) {
+                if (needMoreMem > 0) {
+                    MemChunk chunk = new MemChunk();
+                    chunk.diskOffset = diskOffset;
+                    chunk.data = new byte[ONE_MB * MEM_CHUNK_SIZE_MB - 48]; // -48 for MemChunk overhead
+                    if (chunkCount == 0) {
+                        firstChunk = lastChunk = chunk;
+                    } else {
+                        lastChunk.next = chunk;
+                        lastChunk = chunk;
+                    }
+                    chunkCount++;
+                }
+                
+                int n = Math.min(lastChunk.freeSpace(), length);
+                System.arraycopy(bytes, offset, lastChunk.data, lastChunk.length, n);
+                lastChunk.length += n;
+
+                if (n > 0)
+                    asyncFlush(lastChunk, diskOffset, n);
+
+                return n;
+            }
+        }
+
+        private void asyncFlush(MemChunk lastChunk, long diskOffset, int n) {
+            if (asyncFlushChunk == null) {
+                asyncFlushChunk = lastChunk;
+                asyncFlushDiskOffset = diskOffset;
+            }
+
+            if (asyncFlusher == null) {
+                asyncFlusher = new Thread() {
+                    public void run() {
+                        asyncFlushException = null;
+                        logger.debug(GTMemDiskStore.this + " async flush started @ " + asyncFlushDiskOffset);
+                        try {
+                            while (writeActivated) {
+                                flushToDisk();
+                                Thread.sleep(10);
+                            }
+                            flushToDisk();
+                        } catch (Throwable ex) {
+                            asyncFlushException = ex;
+                        }
+                        logger.debug(GTMemDiskStore.this + " async flush ended @ " + asyncFlushDiskOffset);
+                    }
+                };
+                asyncFlusher.start();
+            }
+        }
+
+        private void flushToDisk() throws IOException {
+            byte[] data;
+            int offset = 0;
+            int length = 0;
+            int flushedLen = 0;
+
+            while (true) {
+                data = null;
+                synchronized (memPart) {
+                    asyncFlushDiskOffset += flushedLen; // bytes written in last loop
+                    // logger.debug(GTMemDiskStore.this + " async flush @ " + asyncFlushDiskOffset);
+                    if (asyncFlushChunk != null && asyncFlushChunk.tailOffset() == asyncFlushDiskOffset) {
+                        asyncFlushChunk = asyncFlushChunk.next;
+                    }
+                    if (asyncFlushChunk != null) {
+                        data = asyncFlushChunk.data;
+                        offset = (int) (asyncFlushDiskOffset - asyncFlushChunk.headOffset());
+                        length = asyncFlushChunk.length - offset;
+                    }
+                }
+
+                if (data == null)
+                    break;
+
+                flushedLen = diskPart.write(asyncFlushDiskOffset, data, offset, length);
+            }
+        }
+
+        public void finishAsyncFlush() throws IOException {
+            deactivateMemWrite();
+            if (asyncFlusher != null) {
+                try {
+                    asyncFlusher.join();
+                } catch (InterruptedException e) {
+                    logger.warn("", e);
+                }
+                asyncFlusher = null;
+                asyncFlushChunk = null;
+
+                if (asyncFlushException != null) {
+                    if (asyncFlushException instanceof IOException)
+                        throw (IOException) asyncFlushException;
+                    else
+                        throw new IOException(asyncFlushException);
+                }
+            }
+        }
+
+        @Override
+        synchronized public int freeUp(int mb) {
+            int mbReleased = 0;
+            while (chunkCount > 0 && mbReleased < mb) {
+                if (firstChunk == asyncFlushChunk)
+                    break;
+
+                mbReleased += MEM_CHUNK_SIZE_MB;
+                chunkCount--;
+                if (chunkCount == 0) {
+                    firstChunk = lastChunk = null;
+                } else {
+                    MemChunk next = firstChunk.next;
+                    firstChunk.next = null;
+                    firstChunk = next;
+                }
+            }
+            return mbReleased;
+        }
+
+        public void activateMemWrite() {
+            writeActivated = true;
+            logger.debug(GTMemDiskStore.this + " mem write activated");
+        }
+
+        public void deactivateMemWrite() {
+            writeActivated = false;
+            logger.debug(GTMemDiskStore.this + " mem write de-activated");
+        }
+
+        synchronized public void clear() {
+            budgetCtrl.reserve(this, 0);
+            chunkCount = 0;
+            firstChunk = lastChunk = null;
+        }
+
+        @Override
+        public void close() throws IOException {
+            finishAsyncFlush();
+            clear();
+        }
+
+        @Override
+        public String toString() {
+            return GTMemDiskStore.this.toString();
+        }
+
+    }
+
+    private class DiskPart implements Closeable {
+        final File diskFile;
+        FileChannel writeChannel;
+        FileChannel readChannel;
+        long tailOffset;
+
+        DiskPart(File diskFile) throws IOException {
+            this.diskFile = diskFile;
+            this.tailOffset = diskFile.length();
+            logger.debug(GTMemDiskStore.this + " disk file " + diskFile.getAbsolutePath());
+        }
+
+        public void openRead() throws IOException {
+            readChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.READ);
+            tailOffset = diskFile.length();
+        }
+
+        public int read(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
+            return readChannel.read(ByteBuffer.wrap(bytes, offset, length), diskOffset);
+        }
+
+        public void closeRead() throws IOException {
+            if (readChannel != null) {
+                readChannel.close();
+                readChannel = null;
+            }
+        }
+
+        public void openWrite(boolean append) throws IOException {
+            if (append) {
+                writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.APPEND, StandardOpenOption.WRITE);
+                tailOffset = diskFile.length();
+            } else {
+                diskFile.delete();
+                writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
+                tailOffset = 0;
+            }
+        }
+
+        public int write(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
+            int n = writeChannel.write(ByteBuffer.wrap(bytes, offset, length), diskOffset);
+            tailOffset = Math.max(diskOffset + n, tailOffset);
+            return n;
+        }
+
+        public void closeWrite() throws IOException {
+            if (writeChannel != null) {
+                writeChannel.close();
+                writeChannel = null;
+            }
+        }
+
+        public void clear() throws IOException {
+            diskFile.delete();
+            tailOffset = 0;
+        }
+
+        @Override
+        public void close() throws IOException {
+            closeWrite();
+            closeRead();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4631e40a/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
index af47b89..374f497 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
@@ -302,7 +302,7 @@ public class DictGridTableTest {
         builder.setColumns( //
                 DataType.getInstance("timestamp"), //
                 DataType.getInstance("integer"), //
-                DataType.getInstance("varchar"), //
+                DataType.getInstance("varchar(10)"), //
                 DataType.getInstance("bigint"), //
                 DataType.getInstance("decimal") //
         );

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4631e40a/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
new file mode 100644
index 0000000..b7678b8
--- /dev/null
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
@@ -0,0 +1,74 @@
+package org.apache.kylin.storage.gridtable;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.storage.gridtable.memstore.GTMemDiskStore;
+import org.junit.Test;
+
+public class GTMemDiskStoreTest {
+
+    final MemoryBudgetController budgetCtrl = new MemoryBudgetController(20);
+    final GTInfo info = SimpleGridTableTest.advancedInfo();
+    final List<GTRecord> data = SimpleGridTableTest.mockupData(info, 1000000); // converts to about 34 MB data
+
+    @Test
+    public void testSingleThreadWriteRead() throws IOException {
+        long start = System.currentTimeMillis();
+        verifyOneTableWriteAndRead();
+        long end = System.currentTimeMillis();
+        System.out.println("Cost " + (end - start) + " millis");
+    }
+
+    @Test
+    public void testMultiThreadWriteRead() throws IOException, InterruptedException {
+        long start = System.currentTimeMillis();
+
+        int nThreads = 5;
+        Thread[] t = new Thread[nThreads];
+        for (int i = 0; i < nThreads; i++) {
+            t[i] = new Thread() {
+                public void run() {
+                    try {
+                        verifyOneTableWriteAndRead();
+                    } catch (Exception ex) {
+                        ex.printStackTrace();
+                    }
+                }
+            };
+            t[i].start();
+        }
+        for (int i = 0; i < nThreads; i++) {
+            t[i].join();
+        }
+
+        long end = System.currentTimeMillis();
+        System.out.println("Cost " + (end - start) + " millis");
+    }
+
+    private void verifyOneTableWriteAndRead() throws IOException {
+        GTMemDiskStore store = new GTMemDiskStore(info, budgetCtrl);
+        GridTable table = new GridTable(info, store);
+        verifyWriteAndRead(table);
+    }
+
+    private void verifyWriteAndRead(GridTable table) throws IOException {
+        GTInfo info = table.getInfo();
+
+        GTBuilder builder = table.rebuild();
+        for (GTRecord r : data) {
+            builder.write(r);
+        }
+        builder.close();
+
+        IGTScanner scanner = table.scan(new GTScanRequest(info));
+        int i = 0;
+        for (GTRecord r : scanner) {
+            assertEquals(data.get(i++), r);
+        }
+        scanner.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4631e40a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
index 7639f12..05e61f8 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
@@ -4,9 +4,12 @@ import static org.junit.Assert.*;
 
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.List;
 
 import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.storage.gridtable.GTInfo.Builder;
 import org.apache.kylin.storage.gridtable.memstore.GTSimpleMemStore;
@@ -115,56 +118,81 @@ public class SimpleGridTableTest {
     }
 
     static GTBuilder rebuild(GridTable table) throws IOException {
-        GTRecord r = new GTRecord(table.getInfo());
         GTBuilder builder = table.rebuild();
-
-        builder.write(r.setValues("2015-01-14", "Yang", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-14", "Luke", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-15", "Xu", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-15", "Dong", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-15", "Jason", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-16", "Mahone", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-16", "Shaofeng", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-16", "Qianhao", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-16", "George", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-17", "Kejia", "Food", new LongWritable(10), new BigDecimal("10.5")));
+        for (GTRecord rec : mockupData(table.getInfo(), 10)) {
+            builder.write(rec);
+        }
         builder.close();
 
         System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
         System.out.println("Written Row Count: " + builder.getWrittenRowCount());
         return builder;
     }
+    
+    static List<GTRecord> mockupData(GTInfo info, int nRows) {
+        List<GTRecord> result = new ArrayList<GTRecord>(nRows);
+        int round = nRows / 10;
+        for (int i = 0; i < round; i++) {
+            String d_01_14 = datePlus("2015-01-14", i * 4);
+            String d_01_15 = datePlus("2015-01-15", i * 4);
+            String d_01_16 = datePlus("2015-01-16", i * 4);
+            String d_01_17 = datePlus("2015-01-17", i * 4);
+            result.add(newRec(info, d_01_14, "Yang", "Food", new LongWritable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_14, "Luke", "Food", new LongWritable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_15, "Xu", "Food", new LongWritable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_15, "Dong", "Food", new LongWritable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_15, "Jason", "Food", new LongWritable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_16, "Mahone", "Food", new LongWritable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_16, "Shaofeng", "Food", new LongWritable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_16, "Qianhao", "Food", new LongWritable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_16, "George", "Food", new LongWritable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_17, "Kejia", "Food", new LongWritable(10), new BigDecimal("10.5")));
+        }
+        return result;
+    }
+    
+    private static String datePlus(String date, int plusDays) {
+        long millis = DateFormat.stringToMillis(date);
+        millis += (1000L * 3600L * 24L) * plusDays;
+        return DateFormat.formatToDateStr(millis);
+    }
+
+    private static GTRecord newRec(GTInfo info, String date, String name, String category, LongWritable amount, BigDecimal price) {
+        GTRecord rec = new GTRecord(info);
+        return rec.setValues(date, name, category, amount, price);
+    }
 
     static void rebuildViaAppend(GridTable table) throws IOException {
-        GTRecord r = new GTRecord(table.getInfo());
+        List<GTRecord> data = mockupData(table.getInfo(), 10);
         GTBuilder builder;
+        int i = 0;
 
         builder = table.append();
-        builder.write(r.setValues("2015-01-14", "Yang", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-14", "Luke", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-15", "Xu", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-15", "Dong", "Food", new LongWritable(10), new BigDecimal("10.5")));
+        builder.write(data.get(i++));
+        builder.write(data.get(i++));
+        builder.write(data.get(i++));
+        builder.write(data.get(i++));
         builder.close();
         System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
         System.out.println("Written Row Count: " + builder.getWrittenRowCount());
 
         builder = table.append();
-        builder.write(r.setValues("2015-01-15", "Jason", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-16", "Mahone", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-16", "Shaofeng", "Food", new LongWritable(10), new BigDecimal("10.5")));
+        builder.write(data.get(i++));
+        builder.write(data.get(i++));
+        builder.write(data.get(i++));
         builder.close();
         System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
         System.out.println("Written Row Count: " + builder.getWrittenRowCount());
 
         builder = table.append();
-        builder.write(r.setValues("2015-01-16", "Qianhao", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-16", "George", "Food", new LongWritable(10), new BigDecimal("10.5")));
+        builder.write(data.get(i++));
+        builder.write(data.get(i++));
         builder.close();
         System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
         System.out.println("Written Row Count: " + builder.getWrittenRowCount());
 
         builder = table.append();
-        builder.write(r.setValues("2015-01-17", "Kejia", "Food", new LongWritable(10), new BigDecimal("10.5")));
+        builder.write(data.get(i++));
         builder.close();
         System.out.println("Written Row Block Count: " + builder.getWrittenRowBlockCount());
         System.out.println("Written Row Count: " + builder.getWrittenRowCount());
@@ -188,9 +216,9 @@ public class SimpleGridTableTest {
         Builder builder = GTInfo.builder();
         builder.setCodeSystem(new GTSampleCodeSystem());
         builder.setColumns( //
-                DataType.getInstance("varchar"), //
-                DataType.getInstance("varchar"), //
-                DataType.getInstance("varchar"), //
+                DataType.getInstance("varchar(10)"), //
+                DataType.getInstance("varchar(10)"), //
+                DataType.getInstance("varchar(10)"), //
                 DataType.getInstance("bigint"), //
                 DataType.getInstance("decimal") //
         );