You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/05/28 02:46:35 UTC

incubator-kylin git commit: MemDiskStore coding done, pending test

Repository: incubator-kylin
Updated Branches:
  refs/heads/KYLIN-786 7945a1e49 -> 294cb8097


MemDiskStore coding done, pending test


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/294cb809
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/294cb809
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/294cb809

Branch: refs/heads/KYLIN-786
Commit: 294cb809732d407a3b5c382a64646cf527f7c62d
Parents: 7945a1e
Author: Yang Li <li...@apache.org>
Authored: Thu May 28 08:46:11 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Thu May 28 08:46:11 2015 +0800

----------------------------------------------------------------------
 .../common/util/MemoryBudgetController.java     |   7 +-
 .../kylin/storage/gridtable/GTRowBlock.java     |  88 ++--
 .../gridtable/memstore/GTMemDiskStore.java      | 509 +++++++++++++++++++
 .../gridtable/memstore/MemDiskStore.java        | 357 -------------
 .../storage/gridtable/DictGridTableTest.java    |   2 +-
 .../storage/gridtable/GTMemDiskStoreTest.java   |   5 +
 .../storage/gridtable/SimpleGridTableTest.java  |   6 +-
 7 files changed, 580 insertions(+), 394 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294cb809/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
index dab5607..36a0998 100644
--- a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
+++ b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java
@@ -52,6 +52,10 @@ public class MemoryBudgetController {
     public int getTotalReservedMB() {
         return totalReservedMB;
     }
+    
+    public int getRemainingBudgetMB() {
+        return totalBudgetMB - totalReservedMB;
+    }
 
     public void reserve(MemoryConsumer consumer, int requestMB) {
         ConsumerEntry entry = booking.get(consumer);
@@ -78,6 +82,7 @@ public class MemoryBudgetController {
         if (delta < 0) {
             this.notify();
         }
+        logger.debug(entry + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB");
     }
 
     synchronized private void checkFreeMemoryAndUpdateBooking(ConsumerEntry consumer, int delta) {
@@ -104,7 +109,7 @@ public class MemoryBudgetController {
                 break;
 
             try {
-                logger.debug("Memory budget has " + (totalBudgetMB - totalReservedMB) + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong.");
+                logger.debug("Remaining budget is " + getRemainingBudgetMB() + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong.");
                 this.wait(200);
             } catch (InterruptedException e) {
                 logger.error("Interrupted while wait free memory", e);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294cb809/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
index 4a68659..eed5698 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
@@ -1,5 +1,6 @@
 package org.apache.kylin.storage.gridtable;
 
+import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -40,36 +41,36 @@ public class GTRowBlock {
             this.cellBlocks[i] = new ByteArray();
         }
     }
-    
+
     public int getSequenceId() {
         return seqId;
     }
-    
+
     public ByteArray getPrimaryKey() {
         return primaryKey;
     }
-    
+
     public ByteArray getCellBlock(int i) {
         return cellBlocks[i];
     }
-    
+
     public Writer getWriter() {
         return new Writer();
     }
 
     public class Writer {
         ByteBuffer[] cellBlockBuffers;
-        
+
         Writer() {
             cellBlockBuffers = new ByteBuffer[info.colBlocks.length];
             for (int i = 0; i < cellBlockBuffers.length; i++) {
                 cellBlockBuffers[i] = cellBlocks[i].asBuffer();
             }
         }
-        
+
         public void copyFrom(GTRowBlock other) {
             assert info == other.info;
-            
+
             seqId = other.seqId;
             nRows = other.nRows;
             primaryKey.copyFrom(other.primaryKey);
@@ -89,7 +90,7 @@ public class GTRowBlock {
             }
             nRows++;
         }
-        
+
         public void readyForFlush() {
             for (int i = 0; i < cellBlocks.length; i++) {
                 cellBlocks[i].setLength(cellBlockBuffers[i].position());
@@ -104,21 +105,21 @@ public class GTRowBlock {
             }
         }
     }
-    
+
     public Reader getReader() {
         return new Reader(info.colBlocksAll);
     }
-    
+
     public Reader getReader(BitSet selectedColBlocks) {
         return new Reader(selectedColBlocks);
     }
-    
+
     public class Reader {
         int cur;
         ByteBuffer primaryKeyBuffer;
         ByteBuffer[] cellBlockBuffers;
         BitSet selectedColBlocks;
-        
+
         Reader(BitSet selectedColBlocks) {
             primaryKeyBuffer = primaryKey.asBuffer();
             cellBlockBuffers = new ByteBuffer[info.colBlocks.length];
@@ -127,15 +128,15 @@ public class GTRowBlock {
             }
             this.selectedColBlocks = selectedColBlocks;
         }
-        
+
         public boolean hasNext() {
             return cur < nRows;
         }
-        
+
         public void fetchNext(GTRecord result) {
             if (hasNext() == false)
                 throw new IllegalArgumentException();
-            
+
             for (int c = selectedColBlocks.nextSetBit(0); c >= 0; c = selectedColBlocks.nextSetBit(c + 1)) {
                 result.loadCellBlock(c, cellBlockBuffers[c]);
             }
@@ -153,7 +154,7 @@ public class GTRowBlock {
 
         return copy;
     }
-    
+
     public boolean isEmpty() {
         return nRows == 0;
     }
@@ -164,7 +165,7 @@ public class GTRowBlock {
         else
             return nRows > 0;
     }
-    
+
     public int getNumberOfRows() {
         return nRows;
     }
@@ -172,36 +173,41 @@ public class GTRowBlock {
     public void setNumberOfRows(int nRows) {
         this.nRows = nRows;
     }
-
-    // TODO export / load should optimize for disabled row block
     
+    // ============================================================================
+
     public int exportLength() {
-        int len = 4 + 4 + (4 + primaryKey.length());
+        int len = 4; // seq Id
+        if (info.isRowBlockEnabled())
+            len += 4; // nRows
+        len += 4 + primaryKey.length(); // PK byte array
         for (ByteArray array : cellBlocks) {
-            len += 4 + array.length();
+            len += 4 + array.length(); // cell block byte array
         }
         return len;
     }
 
-    public void export(DataOutputStream dataOutputStream) throws IOException {
-        dataOutputStream.writeInt(seqId);
-        dataOutputStream.writeInt(nRows);
-        export(dataOutputStream, primaryKey);
+    /** write data to given output stream, like serialize */
+    public void export(DataOutputStream out) throws IOException {
+        out.writeInt(seqId);
+        if (info.isRowBlockEnabled())
+            out.writeInt(nRows);
+        export(out, primaryKey);
         for (ByteArray cb : cellBlocks) {
-            export(dataOutputStream, cb);
+            export(out, cb);
         }
     }
 
-    private void export(DataOutputStream dataOutputStream, ByteArray array) throws IOException {
-        dataOutputStream.writeInt(array.length());
-        dataOutputStream.write(array.array(), array.offset(), array.length());
+    private void export(DataOutputStream out, ByteArray array) throws IOException {
+        out.writeInt(array.length());
+        out.write(array.array(), array.offset(), array.length());
     }
 
-
     /** write data to given buffer, like serialize */
     public void export(ByteBuffer buf) {
         buf.putInt(seqId);
-        buf.putInt(nRows);
+        if (info.isRowBlockEnabled())
+            buf.putInt(nRows);
         export(primaryKey, buf);
         for (ByteArray cb : cellBlocks) {
             export(cb, buf);
@@ -212,11 +218,29 @@ public class GTRowBlock {
         buf.putInt(array.length());
         buf.put(array.array(), array.offset(), array.length());
     }
+    
+    /** read data from given input stream, like deserialize */
+    public void importFrom(DataInputStream in) throws IOException {
+        seqId = in.readInt();
+        nRows = info.isRowBlockEnabled() ? in.readInt() : 1;
+        importFrom(in, primaryKey);
+        for (int i = 0; i < info.colBlocks.length; i++) {
+            ByteArray cb = cellBlocks[i];
+            importFrom(in, cb);
+        }
+    }
+
+    private void importFrom(DataInputStream in, ByteArray result) throws IOException {
+        byte[] data = result.array();
+        int len = in.readInt();
+        in.read(data, 0, len);
+        result.set(data, 0, len);
+    }
 
     /** change pointers to point to data in given buffer, UNLIKE deserialize */
     public void load(ByteBuffer buf) {
         seqId = buf.getInt();
-        nRows = buf.getInt();
+        nRows = info.isRowBlockEnabled() ? buf.getInt() : 1;
         load(primaryKey, buf);
         for (int i = 0; i < info.colBlocks.length; i++) {
             ByteArray cb = cellBlocks[i];

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294cb809/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
new file mode 100644
index 0000000..dfa9cd8
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTMemDiskStore.java
@@ -0,0 +1,509 @@
+package org.apache.kylin.storage.gridtable.memstore;
+
+import static org.apache.kylin.common.util.MemoryBudgetController.*;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.BitSet;
+import java.util.HashSet;
+import java.util.NoSuchElementException;
+
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.common.util.MemoryBudgetController.MemoryConsumer;
+import org.apache.kylin.common.util.MemoryBudgetController.NotEnoughBudgetException;
+import org.apache.kylin.storage.gridtable.GTInfo;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTRowBlock;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.IGTStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GTMemDiskStore implements IGTStore, Closeable {
+
+    private static final Logger logger = LoggerFactory.getLogger(GTMemDiskStore.class);
+    private static final int STREAM_BUFFER_SIZE = 8192;
+    private static final int MEM_CHUNK_SIZE_MB = 1;
+
+    final GTInfo info;
+    final MemPart memPart;
+    final DiskPart diskPart;
+
+    public GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl) throws IOException {
+        this(info, budgetCtrl, File.createTempFile("GTMemDiskStore", ""), true);
+    }
+
+    public GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile) throws IOException {
+        this(info, budgetCtrl, diskFile, false);
+    }
+
+    public GTMemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile, boolean delOnClose) throws IOException {
+        this.info = info;
+        this.memPart = new MemPart(budgetCtrl);
+        this.diskPart = new DiskPart(diskFile, delOnClose);
+    }
+
+    @Override
+    public GTInfo getInfo() {
+        return info;
+    }
+
+    @Override
+    public IGTStoreWriter rebuild(int shard) throws IOException {
+        return new Writer(0);
+    }
+
+    @Override
+    public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException {
+        return new Writer(diskPart.tailOffset);
+    }
+
+    @Override
+    public IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, BitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
+        return new Reader();
+    }
+
+    @Override
+    public void close() throws IOException {
+        memPart.close();
+        diskPart.close();
+    }
+
+    private class Reader implements IGTStoreScanner {
+
+        final DataInputStream din;
+        long diskOffset = 0;
+        long memRead = 0;
+        long diskRead = 0;
+
+        GTRowBlock block = GTRowBlock.allocate(info);
+        GTRowBlock next = null;
+
+        Reader() {
+            logger.debug(GTMemDiskStore.this + " read start @ " + diskOffset);
+
+            InputStream in = new InputStream() {
+                byte[] tmp = new byte[1];
+                MemChunk memChunk;
+
+                @Override
+                public int read() throws IOException {
+                    int n = read(tmp, 0, 1);
+                    if (n <= 0)
+                        return -1;
+                    else
+                        return (int) tmp[0];
+                }
+
+                @Override
+                public int read(byte[] b, int off, int len) throws IOException {
+                    if (available() <= 0)
+                        return -1;
+
+                    if (memChunk == null && memPart.headOffset() <= diskOffset && diskOffset < memPart.tailOffset()) {
+                        memChunk = memPart.seekMemChunk(diskOffset);
+                    }
+
+                    int lenToGo = Math.min(available(), len);
+
+                    int nRead = 0;
+                    while (lenToGo > 0) {
+                        int n;
+                        if (memChunk != null) {
+                            if (memChunk.headOffset() > diskOffset) {
+                                memChunk = null;
+                                continue;
+                            }
+                            if (diskOffset >= memChunk.tailOffset()) {
+                                memChunk = memChunk.next;
+                                continue;
+                            }
+                            int chunkOffset = (int) (diskOffset - memChunk.headOffset());
+                            n = Math.min((int) (memChunk.tailOffset() - diskOffset), lenToGo);
+                            System.arraycopy(memChunk.data, chunkOffset, b, off, n);
+                            memRead += n;
+                        } else {
+                            n = diskPart.read(diskOffset, b, off, lenToGo);
+                            diskRead += n;
+                        }
+                        lenToGo -= n;
+                        nRead += n;
+                        diskOffset += n;
+                    }
+                    return nRead;
+                }
+
+                @Override
+                public int available() throws IOException {
+                    return (int) (diskPart.tailOffset - diskOffset);
+                }
+            };
+
+            din = new DataInputStream(new BufferedInputStream(in, STREAM_BUFFER_SIZE));
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (next != null)
+                return true;
+
+            try {
+                if (din.available() > 0) {
+                    block.importFrom(din);
+                    next = block;
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+
+            return next != null;
+        }
+
+        @Override
+        public GTRowBlock next() {
+            if (next == null) {
+                hasNext();
+                if (next == null)
+                    throw new NoSuchElementException();
+            }
+            GTRowBlock r = next;
+            next = null;
+            return r;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void close() throws IOException {
+            din.close();
+            logger.debug(GTMemDiskStore.this + " read end @ " + diskOffset + ", " + (memRead) + " from mem, " + (diskRead) + " from disk");
+        }
+
+    }
+
+    private class Writer implements IGTStoreWriter {
+
+        final DataOutputStream dout;
+        long diskOffset;
+
+        Writer(long startOffset) throws IOException {
+            diskOffset = startOffset;
+            memPart.clear();
+            diskPart.clear();
+            logger.debug(GTMemDiskStore.this + " write start @ " + diskOffset);
+
+            memPart.activateMemWrite();
+
+            OutputStream out = new OutputStream() {
+                byte[] tmp = new byte[1];
+                boolean memPartActivated = true;
+
+                @Override
+                public void write(int b) throws IOException {
+                    tmp[0] = (byte) b;
+                    write(tmp, 0, 1);
+                }
+
+                @Override
+                public void write(byte[] bytes, int offset, int length) throws IOException {
+                    while (length > 0) {
+                        if (memPartActivated) {
+                            int n = memPart.write(bytes, offset, length, diskOffset);
+                            offset += n;
+                            length -= n;
+                            diskOffset += n;
+                            if (n == 0) {
+                                memPartActivated = false;
+                            }
+                        } else {
+                            diskPart.write(diskOffset, bytes, offset, length);
+                            diskOffset += length;
+                        }
+                    }
+                }
+            };
+            dout = new DataOutputStream(new BufferedOutputStream(out, STREAM_BUFFER_SIZE));
+        }
+
+        @Override
+        public void write(GTRowBlock block) throws IOException {
+            block.export(dout);
+        }
+
+        @Override
+        public void close() throws IOException {
+            dout.close();
+            memPart.finishAsyncFlush();
+            assert diskOffset == diskPart.tailOffset;
+            logger.debug(GTMemDiskStore.this + " write end @ " + diskOffset);
+        }
+    }
+
+    private static class MemChunk {
+        long diskOffset;
+        int length;
+        byte[] data;
+        MemChunk next;
+
+        boolean isFull() {
+            return length == data.length;
+        }
+
+        long headOffset() {
+            return diskOffset;
+        }
+
+        long tailOffset() {
+            return diskOffset + length;
+        }
+
+        int freeSpace() {
+            return data.length - length;
+        }
+    }
+
+    private class MemPart implements Closeable, MemoryConsumer {
+
+        final MemoryBudgetController budgetCtrl;
+
+        // read & write won't go together, but write() / asyncDiskWrite() / freeUp() can happen at the same time
+        volatile boolean writeActivated;
+        MemChunk firstChunk;
+        MemChunk lastChunk;
+        int chunkCount;
+
+        Thread asyncFlusher;
+        MemChunk asyncFlushChunk;
+        long asyncFlushDiskOffset;
+        Throwable asyncFlushException;
+
+        MemPart(MemoryBudgetController budgetCtrl) {
+            this.budgetCtrl = budgetCtrl;
+        }
+
+        long headOffset() {
+            return firstChunk == null ? 0 : firstChunk.headOffset();
+        }
+
+        long tailOffset() {
+            return lastChunk == null ? 0 : lastChunk.tailOffset();
+        }
+
+        synchronized public MemChunk seekMemChunk(long diskOffset) {
+            MemChunk c = firstChunk;
+            while (c != null && c.headOffset() <= diskOffset) {
+                if (c.tailOffset() < diskOffset)
+                    break;
+            }
+            return c;
+        }
+
+        synchronized public int write(byte[] bytes, int offset, int length, long diskOffset) {
+            if (writeActivated == false)
+                return 0;
+
+            // write is only expected at the tail
+            if (diskOffset != tailOffset())
+                return 0;
+
+            if (chunkCount == 0 || lastChunk.isFull()) {
+                allocateNewMemChunk(diskOffset); // fail to allocate will deactivate MemPart
+                if (writeActivated == false)
+                    return 0;
+            }
+
+            int n = Math.min(lastChunk.freeSpace(), length);
+            System.arraycopy(bytes, offset, lastChunk.data, lastChunk.length, n);
+            lastChunk.length += n;
+
+            asyncFlush(lastChunk, diskOffset, n);
+
+            return n;
+        }
+
+        private void asyncFlush(MemChunk lastChunk, long diskOffset, int n) {
+            if (asyncFlushChunk == null) {
+                asyncFlushChunk = lastChunk;
+                asyncFlushDiskOffset = diskOffset;
+            }
+
+            if (asyncFlusher == null) {
+                asyncFlusher = new Thread() {
+                    public void run() {
+                        asyncFlushException = null;
+                        logger.debug(GTMemDiskStore.this + " async flush started @ " + asyncFlushDiskOffset);
+                        try {
+                            while (writeActivated) {
+                                flushToDisk();
+                                Thread.sleep(10);
+                            }
+                            flushToDisk();
+                        } catch (Throwable ex) {
+                            asyncFlushException = ex;
+                        }
+                        logger.debug(GTMemDiskStore.this + " async flush ended @ " + asyncFlushDiskOffset);
+                    }
+                };
+                asyncFlusher.start();
+            }
+        }
+
+        private void flushToDisk() throws IOException {
+            byte[] data;
+            int offset = 0;
+            int length = 0;
+
+            while (true) {
+                data = null;
+                synchronized (memPart) {
+                    asyncFlushDiskOffset += length; // bytes written in last loop
+                    if (asyncFlushChunk != null && asyncFlushChunk.tailOffset() == asyncFlushDiskOffset) {
+                        asyncFlushChunk = asyncFlushChunk.next;
+                    }
+                    if (asyncFlushChunk != null) {
+                        data = asyncFlushChunk.data;
+                        offset = (int) (asyncFlushDiskOffset - asyncFlushChunk.headOffset());
+                        length = asyncFlushChunk.length - offset;
+                    }
+                }
+
+                if (data == null)
+                    break;
+
+                diskPart.write(asyncFlushDiskOffset, data, offset, length);
+            }
+        }
+
+        private void allocateNewMemChunk(long diskOffset) {
+            try {
+                budgetCtrl.reserve(this, (chunkCount + 1) * MEM_CHUNK_SIZE_MB);
+            } catch (NotEnoughBudgetException ex) {
+                deactivateMemWrite();
+                return;
+            }
+
+            MemChunk chunk = new MemChunk();
+            chunk.diskOffset = diskOffset;
+            chunk.data = new byte[ONE_MB * MEM_CHUNK_SIZE_MB - 48]; // -48 for MemChunk overhead
+            if (chunkCount == 0) {
+                firstChunk = lastChunk = chunk;
+            } else {
+                lastChunk.next = chunk;
+                lastChunk = chunk;
+            }
+            chunkCount++;
+        }
+
+        synchronized public void finishAsyncFlush() throws IOException {
+            deactivateMemWrite();
+            if (asyncFlusher != null) {
+                try {
+                    asyncFlusher.join();
+                } catch (InterruptedException e) {
+                    logger.warn("", e);
+                }
+                asyncFlusher = null;
+                asyncFlushChunk = null;
+
+                if (asyncFlushException != null) {
+                    if (asyncFlushException instanceof IOException)
+                        throw (IOException) asyncFlushException;
+                    else
+                        throw new IOException(asyncFlushException);
+                }
+            }
+        }
+
+        @Override
+        synchronized public int freeUp(int mb) {
+            int mbReleased = 0;
+            while (chunkCount > 0 && mbReleased < mb) {
+                if (firstChunk == asyncFlushChunk)
+                    break;
+
+                mbReleased += MEM_CHUNK_SIZE_MB;
+                chunkCount--;
+                if (chunkCount == 0) {
+                    firstChunk = lastChunk = null;
+                } else {
+                    MemChunk next = firstChunk.next;
+                    firstChunk.next = null;
+                    firstChunk = next;
+                }
+            }
+            return mbReleased;
+        }
+
+        public void activateMemWrite() {
+            writeActivated = true;
+            logger.debug(this + " mem write activated");
+        }
+
+        public void deactivateMemWrite() {
+            writeActivated = false;
+            logger.debug(this + " mem write de-activated");
+        }
+
+        synchronized public void clear() {
+            budgetCtrl.reserve(this, 0);
+            chunkCount = 0;
+            firstChunk = lastChunk = null;
+        }
+
+        @Override
+        public void close() throws IOException {
+            finishAsyncFlush();
+            clear();
+        }
+
+    }
+
+    private class DiskPart implements Closeable {
+        final FileChannel channel;
+        long tailOffset;
+
+        DiskPart(File diskFile, boolean delOnClose) throws IOException {
+            HashSet<StandardOpenOption> opts = new HashSet<StandardOpenOption>(10);
+            opts.add(StandardOpenOption.CREATE);
+            opts.add(StandardOpenOption.APPEND);
+            opts.add(StandardOpenOption.WRITE);
+            opts.add(StandardOpenOption.READ);
+            if (delOnClose)
+                opts.add(StandardOpenOption.DELETE_ON_CLOSE);
+            this.channel = FileChannel.open(diskFile.toPath(), opts);
+        }
+
+        public int read(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
+            return channel.read(ByteBuffer.wrap(bytes, offset, length), diskOffset);
+        }
+
+        void write(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
+            channel.write(ByteBuffer.wrap(bytes, offset, length), diskOffset);
+            tailOffset = Math.max(diskOffset + length, tailOffset);
+        }
+
+        void clear() throws IOException {
+            tailOffset = 0;
+            channel.truncate(0);
+        }
+
+        @Override
+        public void close() throws IOException {
+            channel.close();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294cb809/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemDiskStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemDiskStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemDiskStore.java
deleted file mode 100644
index b5a58ca..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/MemDiskStore.java
+++ /dev/null
@@ -1,357 +0,0 @@
-package org.apache.kylin.storage.gridtable.memstore;
-
-import static org.apache.kylin.common.util.MemoryBudgetController.*;
-
-import java.io.Closeable;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.StandardOpenOption;
-import java.util.BitSet;
-
-import org.apache.kylin.common.util.MemoryBudgetController;
-import org.apache.kylin.common.util.MemoryBudgetController.MemoryConsumer;
-import org.apache.kylin.common.util.MemoryBudgetController.NotEnoughBudgetException;
-import org.apache.kylin.storage.gridtable.GTInfo;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.storage.gridtable.GTRowBlock;
-import org.apache.kylin.storage.gridtable.GTScanRequest;
-import org.apache.kylin.storage.gridtable.IGTStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MemDiskStore implements IGTStore {
-
-    private static final Logger logger = LoggerFactory.getLogger(MemDiskStore.class);
-
-    final GTInfo info;
-    final MemPart memPart;
-    final DiskPart diskPart;
-
-    public MemDiskStore(GTInfo info, File diskFile, MemoryBudgetController budgetCtrl) throws IOException {
-        this.info = info;
-        this.memPart = new MemPart(budgetCtrl);
-        this.diskPart = new DiskPart(diskFile);
-    }
-
-    @Override
-    public GTInfo getInfo() {
-        return info;
-    }
-
-    @Override
-    public IGTStoreWriter rebuild(int shard) throws IOException {
-        return new Writer(0);
-    }
-
-    @Override
-    public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException {
-        return new Writer(Math.max(memPart.tailOffset(), diskPart.tailOffset));
-    }
-
-    @Override
-    public IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, BitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    private class Writer implements IGTStoreWriter {
-
-        final DataOutputStream dout;
-        boolean memPartActivated = true;
-
-        Writer(final long disktOffset) throws IOException {
-            memPart.clear();
-            diskPart.clear();
-
-            memPart.activateWrite();
-
-            OutputStream out = new OutputStream() {
-                long diskOffset = disktOffset;
-                byte[] tmp = new byte[1];
-
-                @Override
-                public void write(int b) throws IOException {
-                    tmp[0] = (byte) b;
-                    write(tmp, 0, 1);
-                }
-
-                @Override
-                public void write(byte[] bytes, int offset, int length) throws IOException {
-                    while (length > 0) {
-                        if (memPartActivated) {
-                            int n = memPart.write(bytes, offset, length, diskOffset);
-                            offset += n;
-                            length -= n;
-                            diskOffset += n;
-                            if (n == 0) {
-                                memPartActivated = false;
-                            }
-                        } else {
-                            diskPart.write(diskOffset, bytes, offset, length);
-                            diskOffset += length;
-                        }
-                    }
-                }
-            };
-            dout = new DataOutputStream(out);
-        }
-
-        @Override
-        public void write(GTRowBlock block) throws IOException {
-            block.export(dout);
-        }
-
-        @Override
-        public void close() throws IOException {
-            memPart.finishAsyncFlush();
-        }
-    }
-
-    private static class MemChunk {
-        long diskOffset;
-        int length;
-        byte[] data;
-        MemChunk next;
-
-        boolean isFull() {
-            return length == data.length;
-        }
-
-        long headOffset() {
-            return diskOffset;
-        }
-
-        long tailOffset() {
-            return diskOffset + length;
-        }
-
-        int freeSpace() {
-            return data.length - length;
-        }
-    }
-
-    private class MemPart implements Closeable, MemoryConsumer {
-
-        final MemoryBudgetController budgetCtrl;
-
-        // read & write won't go together, but write() / asyncDiskWrite() / freeUp() can happen at the same time
-        volatile boolean writeActivated;
-        MemChunk firstChunk;
-        MemChunk lastChunk;
-        int chunkCount;
-
-        Thread asyncFlusher;
-        MemChunk asyncFlushChunk;
-        long asyncFlushDiskOffset;
-        Throwable asyncFlushException;
-
-        MemPart(MemoryBudgetController budgetCtrl) {
-            this.budgetCtrl = budgetCtrl;
-        }
-
-        @SuppressWarnings("unused")
-        long headOffset() {
-            return firstChunk == null ? 0 : firstChunk.headOffset();
-        }
-
-        long tailOffset() {
-            return lastChunk == null ? 0 : lastChunk.tailOffset();
-        }
-
-        synchronized public int write(byte[] bytes, int offset, int length, long diskOffset) {
-            if (writeActivated == false)
-                return 0;
-
-            // write is only expected at the tail
-            if (diskOffset != tailOffset())
-                return 0;
-
-            if (chunkCount == 0 || lastChunk.isFull()) {
-                allocateNewMemChunk(diskOffset); // fail to allocate will deactivate MemPart
-                if (writeActivated == false)
-                    return 0;
-            }
-
-            int n = Math.min(lastChunk.freeSpace(), length);
-            System.arraycopy(bytes, offset, lastChunk.data, lastChunk.length, n);
-            lastChunk.length += n;
-
-            asyncFlush(lastChunk, diskOffset, n);
-
-            return n;
-        }
-
-        private void asyncFlush(MemChunk lastChunk, long diskOffset, int n) {
-            if (asyncFlusher == null) {
-                asyncFlusher = new Thread() {
-                    public void run() {
-                        try {
-                            while (writeActivated) {
-                                synchronized (asyncFlusher) {
-                                    try {
-                                        asyncFlusher.wait();
-                                    } catch (InterruptedException e) {
-                                    }
-                                }
-                                flushToDiskPart();
-                            }
-                            flushToDiskPart();
-                        } catch (Throwable ex) {
-                            asyncFlushException = ex;
-                        }
-                    }
-                };
-                asyncFlusher.start();
-            }
-
-            if (asyncFlushChunk == null) {
-                asyncFlushChunk = lastChunk;
-                asyncFlushDiskOffset = diskOffset;
-            }
-
-            // flush in batch
-            if (diskOffset + n >= asyncFlushDiskOffset + 4096) {
-                synchronized (asyncFlusher) {
-                    asyncFlusher.notify();
-                }
-            }
-        }
-
-        private void flushToDiskPart() throws IOException {
-            byte[] data;
-            int offset = 0;
-            int length = 0;
-
-            while (true) {
-                data = null;
-                synchronized (memPart) {
-                    asyncFlushDiskOffset += length; // bytes written in last loop
-                    if (asyncFlushChunk != null && asyncFlushChunk.tailOffset() == asyncFlushDiskOffset) {
-                        asyncFlushChunk = asyncFlushChunk.next;
-                    }
-                    if (asyncFlushChunk != null) {
-                        data = asyncFlushChunk.data;
-                        offset = (int) (asyncFlushDiskOffset - asyncFlushChunk.headOffset());
-                        length = asyncFlushChunk.length - offset;
-                    }
-                }
-
-                if (data == null)
-                    break;
-
-                diskPart.write(asyncFlushDiskOffset, data, offset, length);
-            }
-        }
-
-        private void allocateNewMemChunk(long diskOffset) {
-            try {
-                budgetCtrl.reserve(this, chunkCount + 1);
-
-                MemChunk chunk = new MemChunk();
-                chunk.diskOffset = diskOffset;
-                chunk.data = new byte[ONE_MB - 48]; // -48 for MemChunk overhead
-                if (chunkCount == 0) {
-                    firstChunk = lastChunk = chunk;
-                } else {
-                    lastChunk.next = chunk;
-                    lastChunk = chunk;
-                }
-                chunkCount++;
-
-            } catch (NotEnoughBudgetException ex) {
-                deactivateWrite();
-            }
-        }
-        
-        synchronized public void finishAsyncFlush() throws IOException {
-            deactivateWrite();
-            if (asyncFlusher != null) {
-                synchronized (asyncFlusher) {
-                    asyncFlusher.notify();
-                }
-
-                try {
-                    asyncFlusher.join();
-                } catch (InterruptedException e) {
-                    logger.warn("", e);
-                }
-                asyncFlusher = null;
-
-                if (asyncFlushException != null) {
-                    if (asyncFlushException instanceof IOException)
-                        throw (IOException) asyncFlushException;
-                    else
-                        throw new IOException(asyncFlushException);
-                }
-            }
-        }
-
-        @Override
-        synchronized public int freeUp(int mb) {
-            int mbReleased = 0;
-            while (chunkCount > 0 && mbReleased < mb) {
-                if (firstChunk == asyncFlushChunk)
-                    break;
-
-                mbReleased++;
-                chunkCount--;
-                if (chunkCount == 0)
-                    firstChunk = lastChunk = null;
-                else
-                    firstChunk = firstChunk.next;
-            }
-            return mbReleased;
-        }
-        
-        void activateWrite() {
-            writeActivated = true;
-        }
-
-        void deactivateWrite() {
-            writeActivated = false;
-        }
-        
-        synchronized public void clear() {
-            chunkCount = 0;
-            firstChunk = lastChunk = null;
-        }
-
-        @Override
-        public void close() throws IOException {
-            finishAsyncFlush();
-            clear();
-        }
-
-    }
-
-    private class DiskPart implements Closeable {
-        final File diskFile;
-        final FileChannel channel;
-        long tailOffset;
-
-        DiskPart(File diskFile) throws IOException {
-            this.diskFile = diskFile;
-            this.channel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.APPEND, StandardOpenOption.WRITE, StandardOpenOption.READ);
-        }
-
-        void write(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
-            channel.write(ByteBuffer.wrap(bytes, offset, length), diskOffset);
-            tailOffset += length;
-        }
-
-        void clear() throws IOException {
-            tailOffset = 0;
-            channel.truncate(0);
-        }
-
-        @Override
-        public void close() throws IOException {
-            channel.close();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294cb809/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
index 3607175..2851f6d 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
@@ -302,7 +302,7 @@ public class DictGridTableTest {
         builder.setColumns( //
                 DataType.getInstance("timestamp"), //
                 DataType.getInstance("integer"), //
-                DataType.getInstance("varchar"), //
+                DataType.getInstance("varchar(10)"), //
                 DataType.getInstance("bigint"), //
                 DataType.getInstance("decimal") //
         );

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294cb809/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
new file mode 100644
index 0000000..dd9668e
--- /dev/null
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/GTMemDiskStoreTest.java
@@ -0,0 +1,5 @@
+package org.apache.kylin.storage.gridtable;
+
+public class GTMemDiskStoreTest {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/294cb809/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
index 7639f12..7b676f2 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
@@ -188,9 +188,9 @@ public class SimpleGridTableTest {
         Builder builder = GTInfo.builder();
         builder.setCodeSystem(new GTSampleCodeSystem());
         builder.setColumns( //
-                DataType.getInstance("varchar"), //
-                DataType.getInstance("varchar"), //
-                DataType.getInstance("varchar"), //
+                DataType.getInstance("varchar(10)"), //
+                DataType.getInstance("varchar(10)"), //
+                DataType.getInstance("varchar(10)"), //
                 DataType.getInstance("bigint"), //
                 DataType.getInstance("decimal") //
         );