You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/06/02 01:31:03 UTC

incubator-kylin git commit: KYLIN-806, ConcurrentDiskStore that allows concurrent readers, no obvious improvement compre to MemDiskStore. (ConcurrentDiskStoreTest)

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 4739e0c4a -> d27319b58


KYLIN-806, ConcurrentDiskStore that allows concurrent readers, no obvious
improvement compre to MemDiskStore. (ConcurrentDiskStoreTest)


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d27319b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d27319b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d27319b5

Branch: refs/heads/0.8.0
Commit: d27319b587f5b9c6e7dc1599be5f98217375c0c9
Parents: 4739e0c
Author: Yang Li <li...@apache.org>
Authored: Tue Jun 2 07:16:19 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Tue Jun 2 07:29:06 2015 +0800

----------------------------------------------------------------------
 .../job/inmemcubing/ConcurrentDiskStore.java    | 324 ++++++++++++++++++-
 .../kylin/job/inmemcubing/InMemCubeBuilder.java |  16 +-
 .../kylin/job/inmemcubing/MemDiskStore.java     |   2 +-
 .../inmemcubing/ConcurrentDiskStoreTest.java    |  93 ++++++
 .../job/inmemcubing/GTMemDiskStoreTest.java     |  90 ------
 .../kylin/job/inmemcubing/MemDiskStoreTest.java |  96 ++++++
 .../storage/gridtable/UnitTestSupport.java      | 101 ++++++
 .../storage/gridtable/SimpleGridTableTest.java  |  78 +----
 .../gridtable/SimpleInvertedIndexTest.java      |   2 +-
 9 files changed, 632 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d27319b5/job/src/main/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStore.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStore.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStore.java
index cca1040..1ba7e4d 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStore.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStore.java
@@ -17,6 +17,326 @@
 
 package org.apache.kylin.job.inmemcubing;
 
-public class ConcurrentDiskStore {
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.HashSet;
+import java.util.NoSuchElementException;
 
-}
+import org.apache.commons.io.IOUtils;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.storage.gridtable.GTInfo;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTRowBlock;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.IGTStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A disk store that allows concurrent read and exclusive write.
+ */
+public class ConcurrentDiskStore implements IGTStore, Closeable {
+
+    private static final Logger logger = LoggerFactory.getLogger(MemDiskStore.class);
+    private static final boolean debug = true;
+
+    private static final int STREAM_BUFFER_SIZE = 8192;
+
+    final private GTInfo info;
+    final private Object lock;
+
+    final private File diskFile;
+    final private boolean delOnClose;
+
+    private Writer activeWriter;
+    private HashSet<Reader> activeReaders = new HashSet<Reader>();
+    private FileChannel writeChannel;
+    private FileChannel readChannel; // sharable across multi-threads
+
+    public ConcurrentDiskStore(GTInfo info) throws IOException {
+        this(info, File.createTempFile("ConcurrentDiskStore", ""), true);
+    }
+
+    public ConcurrentDiskStore(GTInfo info, File diskFile) throws IOException {
+        this(info, diskFile, false);
+    }
+
+    private ConcurrentDiskStore(GTInfo info, File diskFile, boolean delOnClose) throws IOException {
+        this.info = info;
+        this.lock = this;
+        this.diskFile = diskFile;
+        this.delOnClose = delOnClose;
+
+        // in case user forget to call close()
+        if (delOnClose)
+            diskFile.deleteOnExit();
+
+        if (debug)
+            logger.debug(this + " disk file " + diskFile.getAbsolutePath());
+    }
+
+    @Override
+    public GTInfo getInfo() {
+        return info;
+    }
+
+    @Override
+    public IGTStoreWriter rebuild(int shard) throws IOException {
+        return newWriter(0);
+    }
+
+    @Override
+    public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException {
+        throw new IllegalStateException("does not support append yet");
+        //return newWriter(diskFile.length());
+    }
+
+    private IGTStoreWriter newWriter(long startOffset) throws IOException {
+        synchronized (lock) {
+            if (activeWriter != null || !activeReaders.isEmpty())
+                throw new IllegalStateException();
+
+            openWriteChannel(startOffset);
+            activeWriter = new Writer(startOffset);
+            return activeWriter;
+        }
+    }
+
+    private void closeWriter(Writer w) {
+        synchronized (lock) {
+            if (activeWriter != w)
+                throw new IllegalStateException();
+
+            activeWriter = null;
+            closeWriteChannel();
+        }
+    }
+
+    @Override
+    public IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, ImmutableBitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
+        return newReader();
+    }
+
+    private IGTStoreScanner newReader() throws IOException {
+        synchronized (lock) {
+            if (activeWriter != null)
+                throw new IllegalStateException();
+
+            openReadChannel();
+            Reader r = new Reader(0);
+            activeReaders.add(r);
+            return r;
+        }
+    }
+
+    private void closeReader(Reader r) throws IOException {
+        synchronized (lock) {
+            if (activeReaders.contains(r) == false)
+                throw new IllegalStateException();
+
+            activeReaders.remove(r);
+            if (activeReaders.isEmpty())
+                closeReadChannel();
+        }
+    }
+
+    private class Reader implements IGTStoreScanner {
+        final DataInputStream din;
+        long fileLen;
+        long readOffset;
+
+        GTRowBlock block = GTRowBlock.allocate(info);
+        GTRowBlock next = null;
+
+        Reader(long startOffset) throws IOException {
+            this.fileLen = diskFile.length();
+            this.readOffset = startOffset;
+
+            if (debug)
+                logger.debug(ConcurrentDiskStore.this + " read start @ " + readOffset);
+
+            InputStream in = new InputStream() {
+                byte[] tmp = new byte[1];
+
+                @Override
+                public int read() throws IOException {
+                    int n = read(tmp, 0, 1);
+                    if (n <= 0)
+                        return -1;
+                    else
+                        return (int) tmp[0];
+                }
+
+                @Override
+                public int read(byte[] b, int off, int len) throws IOException {
+                    if (available() <= 0)
+                        return -1;
+
+                    int lenToGo = Math.min(available(), len);
+                    int nRead = 0;
+                    while (lenToGo > 0) {
+                        int n = readChannel.read(ByteBuffer.wrap(b, off, lenToGo), readOffset);
+
+                        lenToGo -= n;
+                        nRead += n;
+                        off += n;
+                        readOffset += n;
+                    }
+                    return nRead;
+                }
+
+                @Override
+                public int available() throws IOException {
+                    return (int) (fileLen - readOffset);
+                }
+            };
+            din = new DataInputStream(new BufferedInputStream(in, STREAM_BUFFER_SIZE));
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (next != null)
+                return true;
+
+            try {
+                if (din.available() > 0) {
+                    block.importFrom(din);
+                    next = block;
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+
+            return next != null;
+        }
+
+        @Override
+        public GTRowBlock next() {
+            if (next == null) {
+                hasNext();
+                if (next == null)
+                    throw new NoSuchElementException();
+            }
+            GTRowBlock r = next;
+            next = null;
+            return r;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void close() throws IOException {
+            din.close();
+            closeReader(this);
+
+            if (debug)
+                logger.debug(ConcurrentDiskStore.this + " read end @ " + readOffset);
+        }
+    }
+    
+    private class Writer implements IGTStoreWriter {
+        final DataOutputStream dout;
+        long writeOffset;
+
+        Writer(long startOffset) {
+            this.writeOffset = startOffset;
+            
+            if (debug)
+                logger.debug(ConcurrentDiskStore.this + " write start @ " + writeOffset);
+
+            OutputStream out = new OutputStream() {
+                byte[] tmp = new byte[1];
+
+                @Override
+                public void write(int b) throws IOException {
+                    tmp[0] = (byte) b;
+                    write(tmp, 0, 1);
+                }
+
+                @Override
+                public void write(byte[] bytes, int offset, int length) throws IOException {
+                    while (length > 0) {
+                        int n = writeChannel.write(ByteBuffer.wrap(bytes, offset, length), writeOffset);
+                        offset += n;
+                        length -= n;
+                        writeOffset += n;
+                    }
+                }
+            };
+            dout = new DataOutputStream(new BufferedOutputStream(out, STREAM_BUFFER_SIZE));
+        }
+        
+        @Override
+        public void write(GTRowBlock block) throws IOException {
+            block.export(dout);
+        }
+        
+        @Override
+        public void close() throws IOException {
+            dout.close();
+            closeWriter(this);
+
+            if (debug)
+                logger.debug(ConcurrentDiskStore.this + " write end @ " + writeOffset);
+        }
+    }
+
+    private void openWriteChannel(long startOffset) throws IOException {
+        if (startOffset > 0) { // TODO does not support append yet
+            writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.APPEND, StandardOpenOption.WRITE);
+        } else {
+            diskFile.delete();
+            writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
+        }
+    }
+
+    private void closeWriteChannel() {
+        IOUtils.closeQuietly(writeChannel);
+        writeChannel = null;
+    }
+
+    private void openReadChannel() throws IOException {
+        if (readChannel == null) {
+            readChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.READ);
+        }
+    }
+
+    private void closeReadChannel() throws IOException {
+        IOUtils.closeQuietly(readChannel);
+        readChannel = null;
+    }
+
+    @Override
+    public void close() throws IOException {
+        synchronized (lock) {
+            if (activeWriter != null || !activeReaders.isEmpty())
+                throw new IllegalStateException();
+
+            if (delOnClose) {
+                diskFile.delete();
+            }
+
+            if (debug)
+                logger.debug(this + " closed");
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "ConcurrentDiskStore@" + (info.getTableName() == null ? this.hashCode() : info.getTableName());
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d27319b5/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
index 1207466..9e98976 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kylin.job.inmemcubing;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -55,6 +56,7 @@ import org.apache.kylin.storage.gridtable.GTRecord;
 import org.apache.kylin.storage.gridtable.GTScanRequest;
 import org.apache.kylin.storage.gridtable.GridTable;
 import org.apache.kylin.storage.gridtable.IGTScanner;
+import org.apache.kylin.storage.gridtable.IGTStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -156,9 +158,12 @@ public class InMemCubeBuilder implements Runnable {
 
     private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
         GTInfo info = CubeGridTable.newGTInfo(cubeDesc, cuboidID, dictionaryMap);
-        // could use a real budget controller, but experiment shows write directly to disk is the same fast
-        // GTMemDiskStore store = new GTMemDiskStore(info, memBudget == null ? MemoryBudgetController.ZERO_BUDGET : memBudget);
-        MemDiskStore store = new MemDiskStore(info, MemoryBudgetController.ZERO_BUDGET);
+        
+        // Before several store implementation are very similar in performance. The ConcurrentDiskStore is the simplest.
+        // MemDiskStore store = new MemDiskStore(info, memBudget == null ? MemoryBudgetController.ZERO_BUDGET : memBudget);
+        // MemDiskStore store = new MemDiskStore(info, MemoryBudgetController.ZERO_BUDGET);
+        ConcurrentDiskStore store = new ConcurrentDiskStore(info);
+        
         GridTable gridTable = new GridTable(info, store);
         return gridTable;
     }
@@ -566,7 +571,10 @@ public class InMemCubeBuilder implements Runnable {
     }
 
     private void closeStore(GridTable gt) throws IOException {
-        ((MemDiskStore) gt.getStore()).close();
+        IGTStore store = gt.getStore();
+        if (store instanceof Closeable) {
+            ((Closeable) store).close();
+        }
     }
 
     // ===========================================================================

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d27319b5/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java
index 44376e5..4e6894e 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java
@@ -61,7 +61,7 @@ public class MemDiskStore implements IGTStore, Closeable {
     private Writer ongoingWriter;
 
     public MemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl) throws IOException {
-        this(info, budgetCtrl, File.createTempFile("GTMemDiskStore", ""), true);
+        this(info, budgetCtrl, File.createTempFile("MemDiskStore", ""), true);
     }
 
     public MemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d27319b5/job/src/test/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStoreTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStoreTest.java
new file mode 100644
index 0000000..7aa4640
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStoreTest.java
@@ -0,0 +1,93 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.job.inmemcubing;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.storage.gridtable.GTBuilder;
+import org.apache.kylin.storage.gridtable.GTInfo;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.GridTable;
+import org.apache.kylin.storage.gridtable.IGTScanner;
+import org.apache.kylin.storage.gridtable.UnitTestSupport;
+import org.junit.Test;
+
+public class ConcurrentDiskStoreTest {
+
+    final GTInfo info = UnitTestSupport.advancedInfo();
+    final List<GTRecord> data = UnitTestSupport.mockupData(info, 1000000); // converts to about 34 MB data
+
+    @Test
+    public void testSingleThreadRead() throws IOException, InterruptedException {
+        long start = System.currentTimeMillis();
+        verifyOneTableWriteAndRead(1);
+        long end = System.currentTimeMillis();
+        System.out.println("Cost " + (end - start) + " millis");
+    }
+
+    @Test
+    public void testMultiThreadRead() throws IOException, InterruptedException {
+        long start = System.currentTimeMillis();
+        verifyOneTableWriteAndRead(5);
+        long end = System.currentTimeMillis();
+        System.out.println("Cost " + (end - start) + " millis");
+    }
+    
+    private void verifyOneTableWriteAndRead(int readThreads) throws IOException, InterruptedException {
+        ConcurrentDiskStore store = new ConcurrentDiskStore(info);
+        GridTable table = new GridTable(info, store);
+        verifyWriteAndRead(table, readThreads);
+    }
+
+    private void verifyWriteAndRead(final GridTable table, int readThreads) throws IOException, InterruptedException {
+        GTBuilder builder = table.rebuild();
+        for (GTRecord r : data) {
+            builder.write(r);
+        }
+        builder.close();
+
+        int nThreads = readThreads;
+        Thread[] t = new Thread[nThreads];
+        for (int i = 0; i < nThreads; i++) {
+            t[i] = new Thread() {
+                public void run() {
+                    try {
+                        IGTScanner scanner = table.scan(new GTScanRequest(table.getInfo()));
+                        int i = 0;
+                        for (GTRecord r : scanner) {
+                            assertEquals(data.get(i++), r);
+                        }
+                        scanner.close();
+                    } catch (Exception ex) {
+                        ex.printStackTrace();
+                    }
+                }
+            };
+            t[i].start();
+        }
+        for (int i = 0; i < nThreads; i++) {
+            t[i].join();
+        }
+        
+        ((ConcurrentDiskStore) table.getStore()).close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d27319b5/job/src/test/java/org/apache/kylin/job/inmemcubing/GTMemDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/GTMemDiskStoreTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/GTMemDiskStoreTest.java
deleted file mode 100644
index 9d044a0..0000000
--- a/job/src/test/java/org/apache/kylin/job/inmemcubing/GTMemDiskStoreTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements. See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License. You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.kylin.job.inmemcubing;
-
-import org.apache.kylin.storage.gridtable.*;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-public class GTMemDiskStoreTest {
-
-    final MemoryBudgetController budgetCtrl = new MemoryBudgetController(20);
-    final GTInfo info = SimpleGridTableTest.advancedInfo();
-    final List<GTRecord> data = SimpleGridTableTest.mockupData(info, 1000000); // converts to about 34 MB data
-
-    @Test
-    public void testSingleThreadWriteRead() throws IOException {
-        long start = System.currentTimeMillis();
-        verifyOneTableWriteAndRead();
-        long end = System.currentTimeMillis();
-        System.out.println("Cost " + (end - start) + " millis");
-    }
-
-    @Test
-    public void testMultiThreadWriteRead() throws IOException, InterruptedException {
-        long start = System.currentTimeMillis();
-
-        int nThreads = 5;
-        Thread[] t = new Thread[nThreads];
-        for (int i = 0; i < nThreads; i++) {
-            t[i] = new Thread() {
-                public void run() {
-                    try {
-                        verifyOneTableWriteAndRead();
-                    } catch (Exception ex) {
-                        ex.printStackTrace();
-                    }
-                }
-            };
-            t[i].start();
-        }
-        for (int i = 0; i < nThreads; i++) {
-            t[i].join();
-        }
-
-        long end = System.currentTimeMillis();
-        System.out.println("Cost " + (end - start) + " millis");
-    }
-
-    private void verifyOneTableWriteAndRead() throws IOException {
-        MemDiskStore store = new MemDiskStore(info, budgetCtrl);
-        GridTable table = new GridTable(info, store);
-        verifyWriteAndRead(table);
-    }
-
-    private void verifyWriteAndRead(GridTable table) throws IOException {
-        GTInfo info = table.getInfo();
-
-        GTBuilder builder = table.rebuild();
-        for (GTRecord r : data) {
-            builder.write(r);
-        }
-        builder.close();
-
-        IGTScanner scanner = table.scan(new GTScanRequest(info));
-        int i = 0;
-        for (GTRecord r : scanner) {
-            assertEquals(data.get(i++), r);
-        }
-        scanner.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d27319b5/job/src/test/java/org/apache/kylin/job/inmemcubing/MemDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/inmemcubing/MemDiskStoreTest.java b/job/src/test/java/org/apache/kylin/job/inmemcubing/MemDiskStoreTest.java
new file mode 100644
index 0000000..11cbb67
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/inmemcubing/MemDiskStoreTest.java
@@ -0,0 +1,96 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.job.inmemcubing;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.storage.gridtable.GTBuilder;
+import org.apache.kylin.storage.gridtable.GTInfo;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.GridTable;
+import org.apache.kylin.storage.gridtable.IGTScanner;
+import org.apache.kylin.storage.gridtable.UnitTestSupport;
+import org.junit.Test;
+
+public class MemDiskStoreTest {
+
+    final MemoryBudgetController budgetCtrl = new MemoryBudgetController(20);
+    final GTInfo info = UnitTestSupport.advancedInfo();
+    final List<GTRecord> data = UnitTestSupport.mockupData(info, 1000000); // converts to about 34 MB data
+
+    @Test
+    public void testSingleThreadWriteRead() throws IOException {
+        long start = System.currentTimeMillis();
+        verifyOneTableWriteAndRead();
+        long end = System.currentTimeMillis();
+        System.out.println("Cost " + (end - start) + " millis");
+    }
+
+    @Test
+    public void testMultiThreadWriteRead() throws IOException, InterruptedException {
+        long start = System.currentTimeMillis();
+
+        int nThreads = 5;
+        Thread[] t = new Thread[nThreads];
+        for (int i = 0; i < nThreads; i++) {
+            t[i] = new Thread() {
+                public void run() {
+                    try {
+                        verifyOneTableWriteAndRead();
+                    } catch (Exception ex) {
+                        ex.printStackTrace();
+                    }
+                }
+            };
+            t[i].start();
+        }
+        for (int i = 0; i < nThreads; i++) {
+            t[i].join();
+        }
+
+        long end = System.currentTimeMillis();
+        System.out.println("Cost " + (end - start) + " millis");
+    }
+
+    private void verifyOneTableWriteAndRead() throws IOException {
+        MemDiskStore store = new MemDiskStore(info, budgetCtrl);
+        GridTable table = new GridTable(info, store);
+        verifyWriteAndRead(table);
+    }
+
+    private void verifyWriteAndRead(GridTable table) throws IOException {
+        GTInfo info = table.getInfo();
+
+        GTBuilder builder = table.rebuild();
+        for (GTRecord r : data) {
+            builder.write(r);
+        }
+        builder.close();
+
+        IGTScanner scanner = table.scan(new GTScanRequest(info));
+        int i = 0;
+        for (GTRecord r : scanner) {
+            assertEquals(data.get(i++), r);
+        }
+        scanner.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d27319b5/storage/src/main/java/org/apache/kylin/storage/gridtable/UnitTestSupport.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/UnitTestSupport.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/UnitTestSupport.java
new file mode 100644
index 0000000..4b872a6
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/UnitTestSupport.java
@@ -0,0 +1,101 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.storage.gridtable;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.storage.gridtable.GTInfo.Builder;
+
+public class UnitTestSupport {
+
+    public static GTInfo basicInfo() {
+        Builder builder = infoBuilder();
+        GTInfo info = builder.build();
+        return info;
+    }
+
+    public static GTInfo advancedInfo() {
+        Builder builder = infoBuilder();
+        builder.enableColumnBlock(new ImmutableBitSet[] { setOf(0), setOf(1, 2), setOf(3, 4) });
+        builder.enableRowBlock(4);
+        GTInfo info = builder.build();
+        return info;
+    }
+
+    private static Builder infoBuilder() {
+        Builder builder = GTInfo.builder();
+        builder.setCodeSystem(new GTSampleCodeSystem());
+        builder.setColumns( //
+                DataType.getInstance("varchar(10)"), //
+                DataType.getInstance("varchar(10)"), //
+                DataType.getInstance("varchar(10)"), //
+                DataType.getInstance("bigint"), //
+                DataType.getInstance("decimal") //
+        );
+        builder.setPrimaryKey(setOf(0));
+        builder.setColumnPreferIndex(setOf(0));
+        return builder;
+    }
+
+    public static List<GTRecord> mockupData(GTInfo info, int nRows) {
+        List<GTRecord> result = new ArrayList<GTRecord>(nRows);
+        int round = nRows / 10;
+        for (int i = 0; i < round; i++) {
+            String d_01_14 = datePlus("2015-01-14", i * 4);
+            String d_01_15 = datePlus("2015-01-15", i * 4);
+            String d_01_16 = datePlus("2015-01-16", i * 4);
+            String d_01_17 = datePlus("2015-01-17", i * 4);
+            result.add(newRec(info, d_01_14, "Yang", "Food", new LongWritable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_14, "Luke", "Food", new LongWritable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_15, "Xu", "Food", new LongWritable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_15, "Dong", "Food", new LongWritable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_15, "Jason", "Food", new LongWritable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_16, "Mahone", "Food", new LongWritable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_16, "Shaofeng", "Food", new LongWritable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_16, "Qianhao", "Food", new LongWritable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_16, "George", "Food", new LongWritable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_17, "Kejia", "Food", new LongWritable(10), new BigDecimal("10.5")));
+        }
+        return result;
+    }
+    
+    private static String datePlus(String date, int plusDays) {
+        long millis = DateFormat.stringToMillis(date);
+        millis += (1000L * 3600L * 24L) * plusDays;
+        return DateFormat.formatToDateStr(millis);
+    }
+
+    private static GTRecord newRec(GTInfo info, String date, String name, String category, LongWritable amount, BigDecimal price) {
+        GTRecord rec = new GTRecord(info);
+        return rec.setValues(date, name, category, amount, price);
+    }
+
+    private static ImmutableBitSet setOf(int... values) {
+        BitSet set = new BitSet();
+        for (int i : values)
+            set.set(i);
+        return new ImmutableBitSet(set);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d27319b5/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
index bcf0f29..73f92c0 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
@@ -21,15 +21,11 @@ import static org.junit.Assert.*;
 
 import java.io.IOException;
 import java.math.BigDecimal;
-import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
 
 import org.apache.hadoop.io.LongWritable;
-import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.metadata.model.DataType;
-import org.apache.kylin.storage.gridtable.GTInfo.Builder;
 import org.apache.kylin.storage.gridtable.memstore.GTSimpleMemStore;
 import org.junit.Test;
 
@@ -37,7 +33,7 @@ public class SimpleGridTableTest {
 
     @Test
     public void testBasics() throws IOException {
-        GTInfo info = basicInfo();
+        GTInfo info = UnitTestSupport.basicInfo();
         GTSimpleMemStore store = new GTSimpleMemStore(info);
         GridTable table = new GridTable(info, store);
 
@@ -49,7 +45,7 @@ public class SimpleGridTableTest {
 
     @Test
     public void testAdvanced() throws IOException {
-        GTInfo info = advancedInfo();
+        GTInfo info = UnitTestSupport.advancedInfo();
         GTSimpleMemStore store = new GTSimpleMemStore(info);
         GridTable table = new GridTable(info, store);
 
@@ -61,7 +57,7 @@ public class SimpleGridTableTest {
 
     @Test
     public void testAggregate() throws IOException {
-        GTInfo info = advancedInfo();
+        GTInfo info = UnitTestSupport.advancedInfo();
         GTSimpleMemStore store = new GTSimpleMemStore(info);
         GridTable table = new GridTable(info, store);
 
@@ -73,7 +69,7 @@ public class SimpleGridTableTest {
 
     @Test
     public void testAppend() throws IOException {
-        GTInfo info = advancedInfo();
+        GTInfo info = UnitTestSupport.advancedInfo();
         GTSimpleMemStore store = new GTSimpleMemStore(info);
         GridTable table = new GridTable(info, store);
 
@@ -137,7 +133,7 @@ public class SimpleGridTableTest {
 
     static GTBuilder rebuild(GridTable table) throws IOException {
         GTBuilder builder = table.rebuild();
-        for (GTRecord rec : mockupData(table.getInfo(), 10)) {
+        for (GTRecord rec : UnitTestSupport.mockupData(table.getInfo(), 10)) {
             builder.write(rec);
         }
         builder.close();
@@ -147,41 +143,8 @@ public class SimpleGridTableTest {
         return builder;
     }
     
-    public static List<GTRecord> mockupData(GTInfo info, int nRows) {
-        List<GTRecord> result = new ArrayList<GTRecord>(nRows);
-        int round = nRows / 10;
-        for (int i = 0; i < round; i++) {
-            String d_01_14 = datePlus("2015-01-14", i * 4);
-            String d_01_15 = datePlus("2015-01-15", i * 4);
-            String d_01_16 = datePlus("2015-01-16", i * 4);
-            String d_01_17 = datePlus("2015-01-17", i * 4);
-            result.add(newRec(info, d_01_14, "Yang", "Food", new LongWritable(10), new BigDecimal("10.5")));
-            result.add(newRec(info, d_01_14, "Luke", "Food", new LongWritable(10), new BigDecimal("10.5")));
-            result.add(newRec(info, d_01_15, "Xu", "Food", new LongWritable(10), new BigDecimal("10.5")));
-            result.add(newRec(info, d_01_15, "Dong", "Food", new LongWritable(10), new BigDecimal("10.5")));
-            result.add(newRec(info, d_01_15, "Jason", "Food", new LongWritable(10), new BigDecimal("10.5")));
-            result.add(newRec(info, d_01_16, "Mahone", "Food", new LongWritable(10), new BigDecimal("10.5")));
-            result.add(newRec(info, d_01_16, "Shaofeng", "Food", new LongWritable(10), new BigDecimal("10.5")));
-            result.add(newRec(info, d_01_16, "Qianhao", "Food", new LongWritable(10), new BigDecimal("10.5")));
-            result.add(newRec(info, d_01_16, "George", "Food", new LongWritable(10), new BigDecimal("10.5")));
-            result.add(newRec(info, d_01_17, "Kejia", "Food", new LongWritable(10), new BigDecimal("10.5")));
-        }
-        return result;
-    }
-    
-    private static String datePlus(String date, int plusDays) {
-        long millis = DateFormat.stringToMillis(date);
-        millis += (1000L * 3600L * 24L) * plusDays;
-        return DateFormat.formatToDateStr(millis);
-    }
-
-    private static GTRecord newRec(GTInfo info, String date, String name, String category, LongWritable amount, BigDecimal price) {
-        GTRecord rec = new GTRecord(info);
-        return rec.setValues(date, name, category, amount, price);
-    }
-
     static void rebuildViaAppend(GridTable table) throws IOException {
-        List<GTRecord> data = mockupData(table.getInfo(), 10);
+        List<GTRecord> data = UnitTestSupport.mockupData(table.getInfo(), 10);
         GTBuilder builder;
         int i = 0;
 
@@ -216,35 +179,6 @@ public class SimpleGridTableTest {
         System.out.println("Written Row Count: " + builder.getWrittenRowCount());
     }
 
-    public static GTInfo basicInfo() {
-        Builder builder = infoBuilder();
-        GTInfo info = builder.build();
-        return info;
-    }
-
-    public static GTInfo advancedInfo() {
-        Builder builder = infoBuilder();
-        builder.enableColumnBlock(new ImmutableBitSet[] { setOf(0), setOf(1, 2), setOf(3, 4) });
-        builder.enableRowBlock(4);
-        GTInfo info = builder.build();
-        return info;
-    }
-
-    private static Builder infoBuilder() {
-        Builder builder = GTInfo.builder();
-        builder.setCodeSystem(new GTSampleCodeSystem());
-        builder.setColumns( //
-                DataType.getInstance("varchar(10)"), //
-                DataType.getInstance("varchar(10)"), //
-                DataType.getInstance("varchar(10)"), //
-                DataType.getInstance("bigint"), //
-                DataType.getInstance("decimal") //
-        );
-        builder.setPrimaryKey(setOf(0));
-        builder.setColumnPreferIndex(setOf(0));
-        return builder;
-    }
-
     private static ImmutableBitSet setOf(int... values) {
         BitSet set = new BitSet();
         for (int i : values)

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d27319b5/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java
index 6cfb32f..ee08f2a 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java
@@ -48,7 +48,7 @@ public class SimpleInvertedIndexTest {
 
     public SimpleInvertedIndexTest() {
         
-        info = SimpleGridTableTest.advancedInfo();
+        info = UnitTestSupport.advancedInfo();
         TblColRef colA = info.colRef(0);
         
         // block i contains value "i", the last is NULL