You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/24 01:20:32 UTC

[24/28] incubator-kylin git commit: KYLIN-875 Split job module into 'core-job', 'engine-mr', 'source-hive', 'storage-hbase'. The old job remains as an assembly project.

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GTRawScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRawScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRawScanner.java
new file mode 100644
index 0000000..3e34bf4
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRawScanner.java
@@ -0,0 +1,111 @@
+package org.apache.kylin.gridtable;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.gridtable.IGTStore.IGTStoreScanner;
+
+public class GTRawScanner implements IGTScanner {
+
+    final GTInfo info;
+    final IGTStoreScanner storeScanner;
+    final ImmutableBitSet selectedColBlocks;
+
+    private GTRowBlock.Reader curBlockReader;
+    private GTRecord next;
+    final private GTRecord oneRecord; // avoid instance creation
+
+    private int scannedRowCount = 0;
+    private int scannedRowBlockCount = 0;
+
+    public GTRawScanner(GTInfo info, IGTStore store, GTScanRequest req) throws IOException {
+        this.info = info;
+        this.selectedColBlocks = info.selectColumnBlocks(req.getColumns());
+        this.storeScanner = store.scan(req.getPkStart(), req.getPkEnd(), selectedColBlocks, req);
+        this.oneRecord = new GTRecord(info);
+    }
+
+    @Override
+    public GTInfo getInfo() {
+        return info;
+    }
+
+    @Override
+    public int getScannedRowCount() {
+        return scannedRowCount;
+    }
+
+    @Override
+    public int getScannedRowBlockCount() {
+        return scannedRowBlockCount;
+    }
+
+    @Override
+    public void close() throws IOException {
+        storeScanner.close();
+    }
+
+    @Override
+    public Iterator<GTRecord> iterator() {
+        return new Iterator<GTRecord>() {
+
+            @Override
+            public boolean hasNext() {
+                if (next != null)
+                    return true;
+
+                if (fetchOneRecord()) {
+                    next = oneRecord;
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+
+            private boolean fetchOneRecord() {
+                while (true) {
+                    // get a block
+                    if (curBlockReader == null) {
+                        if (storeScanner.hasNext()) {
+                            curBlockReader = storeScanner.next().getReader(selectedColBlocks);
+                            scannedRowBlockCount++;
+                        } else {
+                            return false;
+                        }
+                    }
+                    // if block exhausted, try next block
+                    if (curBlockReader.hasNext() == false) {
+                        curBlockReader = null;
+                        continue;
+                    }
+                    // fetch a row
+                    curBlockReader.fetchNext(oneRecord);
+                    scannedRowCount++;
+                    return true;
+                }
+            }
+
+            @Override
+            public GTRecord next() {
+                // fetch next record
+                if (next == null) {
+                    hasNext();
+                    if (next == null)
+                        throw new NoSuchElementException();
+                }
+
+                GTRecord result = next;
+                next = null;
+                return result;
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
new file mode 100644
index 0000000..ef2efe0
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -0,0 +1,285 @@
+package org.apache.kylin.gridtable;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.BitSet;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ImmutableBitSet;
+
+public class GTRecord implements Comparable<GTRecord> {
+
+    final GTInfo info;
+    final ByteArray[] cols;
+
+    private ImmutableBitSet maskForEqualHashComp;
+
+    public GTRecord(GTInfo info, ImmutableBitSet maskForEqualHashComp) {
+        this.info = info;
+        this.cols = new ByteArray[info.getColumnCount()];
+        for (int i = 0; i < cols.length; i++) {
+            if (maskForEqualHashComp.get(i)) {
+                this.cols[i] = new ByteArray();
+            }
+        }
+        this.maskForEqualHashComp = maskForEqualHashComp;
+    }
+
+    public GTRecord(GTInfo info) {
+        this(info, info.colAll);
+    }
+
+    public GTInfo getInfo() {
+        return info;
+    }
+
+    public ByteArray get(int i) {
+        return cols[i];
+    }
+
+    public void set(int i, ByteArray data) {
+        cols[i].set(data.array(), data.offset(), data.length());
+    }
+
+    /** set record to the codes of specified values, new space allocated to hold the codes */
+    public GTRecord setValues(Object... values) {
+        setValues(info.colAll, new ByteArray(info.getMaxRecordLength()), values);
+        return this;
+    }
+
+    /** set record to the codes of specified values, reuse given space to hold the codes */
+    public GTRecord setValues(ImmutableBitSet selectedCols, ByteArray space, Object... values) {
+        assert selectedCols.cardinality() == values.length;
+        
+        ByteBuffer buf = space.asBuffer();
+        int pos = buf.position();
+        for (int i = 0; i < selectedCols.trueBitCount(); i++) {
+            int c = selectedCols.trueBitAt(i);
+            info.codeSystem.encodeColumnValue(c, values[i], buf);
+            int newPos = buf.position();
+            cols[c].set(buf.array(), buf.arrayOffset() + pos, newPos - pos);
+            pos = newPos;
+        }
+        return this;
+    }
+
+    /** decode and return the values of this record */
+    public Object[] getValues() {
+        return getValues(info.colAll, new Object[info.getColumnCount()]);
+    }
+
+    /** decode and return the values of this record */
+    public Object[] getValues(ImmutableBitSet selectedCols, Object[] result) {
+        assert selectedCols.cardinality() == result.length;
+        
+        for (int i = 0; i < selectedCols.trueBitCount(); i++) {
+            int c = selectedCols.trueBitAt(i);
+            if (cols[c] == null || cols[c].array() == null) {
+                result[i] = null;
+            } else {
+                result[i] = info.codeSystem.decodeColumnValue(c, cols[c].asBuffer());
+            }
+        }
+        return result;
+    }
+
+    public Object[] getValues(int[] selectedColumns, Object[] result) {
+        assert selectedColumns.length <= result.length;
+        for (int i = 0; i < selectedColumns.length; i++) {
+            int c = selectedColumns[i];
+            if (cols[c].array() == null) {
+                result[i] = null;
+            } else {
+                result[i] = info.codeSystem.decodeColumnValue(c, cols[c].asBuffer());
+            }
+        }
+        return result;
+    }
+
+    public GTRecord copy() {
+        return copy(info.colAll);
+    }
+
+    public GTRecord copy(ImmutableBitSet selectedCols) {
+        int len = 0;
+        for (int i = 0; i < selectedCols.trueBitCount(); i++) {
+            int c = selectedCols.trueBitAt(i);
+            len += cols[c].length();
+        }
+
+        byte[] space = new byte[len];
+
+        GTRecord copy = new GTRecord(info, this.maskForEqualHashComp);
+        int pos = 0;
+        for (int i = 0; i < selectedCols.trueBitCount(); i++) {
+            int c = selectedCols.trueBitAt(i);
+            System.arraycopy(cols[c].array(), cols[c].offset(), space, pos, cols[c].length());
+            copy.cols[c].set(space, pos, cols[c].length());
+            pos += cols[c].length();
+        }
+
+        return copy;
+    }
+
+    public ImmutableBitSet maskForEqualHashComp() {
+        return maskForEqualHashComp;
+    }
+
+    public void maskForEqualHashComp(ImmutableBitSet set) {
+        this.maskForEqualHashComp = set;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+
+        GTRecord o = (GTRecord) obj;
+        if (this.info != o.info)
+            return false;
+        if (this.maskForEqualHashComp != o.maskForEqualHashComp)
+            return false;
+        for (int i = 0; i < maskForEqualHashComp.trueBitCount(); i++) {
+            int c = maskForEqualHashComp.trueBitAt(i);
+            if (this.cols[c].equals(o.cols[c]) == false) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 1;
+        for (int i = 0; i < maskForEqualHashComp.trueBitCount(); i++) {
+            int c = maskForEqualHashComp.trueBitAt(i);
+            hash = (31 * hash) + cols[c].hashCode();
+        }
+        return hash;
+    }
+
+    @Override
+    public int compareTo(GTRecord o) {
+        assert this.info == o.info;
+        assert this.maskForEqualHashComp == o.maskForEqualHashComp; // reference equal for performance
+        IGTComparator comparator = info.codeSystem.getComparator();
+
+        int comp = 0;
+        for (int i = 0; i < maskForEqualHashComp.trueBitCount(); i++) {
+            int c = maskForEqualHashComp.trueBitAt(i);
+            comp = comparator.compare(cols[c], o.cols[c]);
+            if (comp != 0)
+                return comp;
+        }
+        return comp;
+    }
+
+    @Override
+    public String toString() {
+        return toString(maskForEqualHashComp);
+    }
+    
+    public String toString(ImmutableBitSet selectedColumns) {
+        Object[] values = new Object[selectedColumns.cardinality()];
+        getValues(selectedColumns, values);
+        return Arrays.toString(values);
+    }
+
+    // ============================================================================
+
+    public ByteArray exportColumns(ImmutableBitSet selectedCols) {
+        int len = 0;
+        for (int i = 0; i < selectedCols.trueBitCount(); i++) {
+            int c = selectedCols.trueBitAt(i);
+            len += cols[c].length();
+        }
+
+        ByteArray buf = ByteArray.allocate(len);
+        exportColumns(info.primaryKey, buf);
+        return buf;
+    }
+
+    /** write data to given buffer, like serialize */
+    public void exportColumns(ImmutableBitSet selectedCols, ByteArray buf) {
+        int pos = 0;
+        for (int i = 0; i < selectedCols.trueBitCount(); i++) {
+            int c = selectedCols.trueBitAt(i);
+            System.arraycopy(cols[c].array(), cols[c].offset(), buf.array(), buf.offset() + pos, cols[c].length());
+            pos += cols[c].length();
+        }
+        buf.setLength(pos);
+    }
+
+    /** write data to given buffer, like serialize */
+    public void exportColumns(ImmutableBitSet selectedCols, ByteBuffer buf) {
+        for (int i = 0; i < selectedCols.trueBitCount(); i++) {
+            int c = selectedCols.trueBitAt(i);
+            buf.put(cols[c].array(), cols[c].offset(), cols[c].length());
+        }
+    }
+
+    public void exportColumns(int[] fieldIndex, ByteBuffer buf) {
+        for (int i : fieldIndex) {
+            buf.put(cols[i].array(), cols[i].offset(), cols[i].length());
+        }
+    }
+
+
+    /** write data to given buffer, like serialize */
+    public void exportColumnBlock(int c, ByteBuffer buf) {
+        exportColumns(info.colBlocks[c], buf);
+    }
+
+    /** change pointers to point to data in given buffer, UNLIKE deserialize */
+    public void loadPrimaryKey(ByteBuffer buf) {
+        loadColumns(info.primaryKey, buf);
+    }
+
+    /** change pointers to point to data in given buffer, UNLIKE deserialize */
+    public void loadCellBlock(int c, ByteBuffer buf) {
+        loadColumns(info.colBlocks[c], buf);
+    }
+
+    /** change pointers to point to data in given buffer, UNLIKE deserialize */
+    public void loadColumns(ImmutableBitSet selectedCols, ByteBuffer buf) {
+        int pos = buf.position();
+        for (int i = 0; i < selectedCols.trueBitCount(); i++) {
+            int c = selectedCols.trueBitAt(i);
+            int len = info.codeSystem.codeLength(c, buf);
+            cols[c].set(buf.array(), buf.arrayOffset() + pos, len);
+            pos += len;
+            buf.position(pos);
+        }
+    }
+
+    /** similar to export(primaryKey) but will stop at the first null value */
+    public static ByteArray exportScanKey(GTRecord rec) {
+        if (rec == null)
+            return null;
+        
+        GTInfo info = rec.getInfo();
+        
+        BitSet selectedColumns = new BitSet();
+        int len = 0;
+        for (int i = 0; i < info.primaryKey.trueBitCount(); i++) {
+            int c = info.primaryKey.trueBitAt(i);
+            if (rec.cols[c].array() == null) {
+                break;
+            }
+            selectedColumns.set(c);
+            len += rec.cols[c].length();
+        }
+        
+        if (selectedColumns.cardinality() == 0)
+            return null;
+
+        ByteArray buf = ByteArray.allocate(len);
+        rec.exportColumns(new ImmutableBitSet(selectedColumns), buf);
+        return buf;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GTRowBlock.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRowBlock.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRowBlock.java
new file mode 100644
index 0000000..2c5111b
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRowBlock.java
@@ -0,0 +1,259 @@
+package org.apache.kylin.gridtable;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ImmutableBitSet;
+
+public class GTRowBlock {
+
+    /** create a row block, allocate memory, get ready for writing */
+    public static GTRowBlock allocate(GTInfo info) {
+        GTRowBlock b = new GTRowBlock(info);
+
+        byte[] array = new byte[info.getMaxColumnLength(info.primaryKey)];
+        b.primaryKey.set(array);
+
+        int maxRows = info.isRowBlockEnabled() ? info.rowBlockSize : 1;
+        for (int i = 0; i < b.cellBlocks.length; i++) {
+            array = new byte[info.getMaxColumnLength(info.colBlocks[i]) * maxRows];
+            b.cellBlocks[i].set(array);
+        }
+        return b;
+    }
+
+    final GTInfo info;
+
+    int seqId; // 0, 1, 2...
+    int nRows;
+    ByteArray primaryKey; // the primary key of the first (smallest) row
+    ByteArray[] cellBlocks; // cells for each column block
+
+    /** create a row block that has no underlying space */
+    public GTRowBlock(GTInfo info) {
+        this.info = info;
+        this.primaryKey = new ByteArray();
+        this.cellBlocks = new ByteArray[info.colBlocks.length];
+        for (int i = 0; i < this.cellBlocks.length; i++) {
+            this.cellBlocks[i] = new ByteArray();
+        }
+    }
+
+    public int getSequenceId() {
+        return seqId;
+    }
+
+    public ByteArray getPrimaryKey() {
+        return primaryKey;
+    }
+
+    public ByteArray getCellBlock(int i) {
+        return cellBlocks[i];
+    }
+
+    public Writer getWriter() {
+        return new Writer();
+    }
+
+    public class Writer {
+        ByteBuffer[] cellBlockBuffers;
+
+        Writer() {
+            cellBlockBuffers = new ByteBuffer[info.colBlocks.length];
+            for (int i = 0; i < cellBlockBuffers.length; i++) {
+                cellBlockBuffers[i] = cellBlocks[i].asBuffer();
+            }
+        }
+
+        public void copyFrom(GTRowBlock other) {
+            assert info == other.info;
+
+            seqId = other.seqId;
+            nRows = other.nRows;
+            primaryKey.copyFrom(other.primaryKey);
+            for (int i = 0; i < info.colBlocks.length; i++) {
+                cellBlockBuffers[i].clear();
+                cellBlockBuffers[i].put(other.cellBlocks[i].array(), other.cellBlocks[i].offset(), other.cellBlocks[i].length());
+            }
+        }
+
+        public void append(GTRecord r) {
+            // add record to block
+            if (isEmpty()) {
+                r.exportColumns(info.primaryKey, primaryKey);
+            }
+            for (int i = 0; i < info.colBlocks.length; i++) {
+                r.exportColumnBlock(i, cellBlockBuffers[i]);
+            }
+            nRows++;
+        }
+
+        public void readyForFlush() {
+            for (int i = 0; i < cellBlocks.length; i++) {
+                cellBlocks[i].setLength(cellBlockBuffers[i].position());
+            }
+        }
+
+        public void clearForNext() {
+            seqId++;
+            nRows = 0;
+            for (int i = 0; i < cellBlockBuffers.length; i++) {
+                cellBlockBuffers[i].clear();
+            }
+        }
+    }
+
+    public Reader getReader() {
+        return new Reader(info.colBlocksAll);
+    }
+
+    public Reader getReader(ImmutableBitSet selectedColBlocks) {
+        return new Reader(selectedColBlocks);
+    }
+
+    public class Reader {
+        int cur;
+        ByteBuffer primaryKeyBuffer;
+        ByteBuffer[] cellBlockBuffers;
+        ImmutableBitSet selectedColBlocks;
+
+        Reader(ImmutableBitSet selectedColBlocks) {
+            primaryKeyBuffer = primaryKey.asBuffer();
+            cellBlockBuffers = new ByteBuffer[info.colBlocks.length];
+            for (int i = 0; i < cellBlockBuffers.length; i++) {
+                cellBlockBuffers[i] = cellBlocks[i].asBuffer();
+            }
+            this.selectedColBlocks = selectedColBlocks;
+        }
+
+        public boolean hasNext() {
+            return cur < nRows;
+        }
+
+        public void fetchNext(GTRecord result) {
+            if (hasNext() == false)
+                throw new IllegalArgumentException();
+
+            for (int i = 0; i < selectedColBlocks.trueBitCount(); i++) {
+                int c = selectedColBlocks.trueBitAt(i);
+                result.loadCellBlock(c, cellBlockBuffers[c]);
+            }
+            cur++;
+        }
+    }
+
+    public GTRowBlock copy() {
+        GTRowBlock copy = new GTRowBlock(info);
+
+        ByteBuffer buf = ByteBuffer.allocate(this.exportLength());
+        this.export(buf);
+        buf.clear();
+        copy.load(buf);
+
+        return copy;
+    }
+
+    public boolean isEmpty() {
+        return nRows == 0;
+    }
+
+    public boolean isFull() {
+        if (info.isRowBlockEnabled())
+            return nRows >= info.rowBlockSize;
+        else
+            return nRows > 0;
+    }
+
+    public int getNumberOfRows() {
+        return nRows;
+    }
+
+    public void setNumberOfRows(int nRows) {
+        this.nRows = nRows;
+    }
+    
+    // ============================================================================
+
+    public int exportLength() {
+        int len = 4; // seq Id
+        if (info.isRowBlockEnabled())
+            len += 4; // nRows
+        len += 4 + primaryKey.length(); // PK byte array
+        for (ByteArray array : cellBlocks) {
+            len += 4 + array.length(); // cell block byte array
+        }
+        return len;
+    }
+
+    /** write data to given output stream, like serialize */
+    public void export(DataOutputStream out) throws IOException {
+        out.writeInt(seqId);
+        if (info.isRowBlockEnabled())
+            out.writeInt(nRows);
+        export(out, primaryKey);
+        for (ByteArray cb : cellBlocks) {
+            export(out, cb);
+        }
+    }
+
+    private void export(DataOutputStream out, ByteArray array) throws IOException {
+        out.writeInt(array.length());
+        out.write(array.array(), array.offset(), array.length());
+    }
+
+    /** write data to given buffer, like serialize */
+    public void export(ByteBuffer buf) {
+        buf.putInt(seqId);
+        if (info.isRowBlockEnabled())
+            buf.putInt(nRows);
+        export(primaryKey, buf);
+        for (ByteArray cb : cellBlocks) {
+            export(cb, buf);
+        }
+    }
+
+    private void export(ByteArray array, ByteBuffer buf) {
+        buf.putInt(array.length());
+        buf.put(array.array(), array.offset(), array.length());
+    }
+    
+    /** read data from given input stream, like deserialize */
+    public void importFrom(DataInputStream in) throws IOException {
+        seqId = in.readInt();
+        nRows = info.isRowBlockEnabled() ? in.readInt() : 1;
+        importFrom(in, primaryKey);
+        for (int i = 0; i < info.colBlocks.length; i++) {
+            ByteArray cb = cellBlocks[i];
+            importFrom(in, cb);
+        }
+    }
+
+    private void importFrom(DataInputStream in, ByteArray result) throws IOException {
+        byte[] data = result.array();
+        int len = in.readInt();
+        in.read(data, 0, len);
+        result.set(data, 0, len);
+    }
+
+    /** change pointers to point to data in given buffer, UNLIKE deserialize */
+    public void load(ByteBuffer buf) {
+        seqId = buf.getInt();
+        nRows = info.isRowBlockEnabled() ? buf.getInt() : 1;
+        load(primaryKey, buf);
+        for (int i = 0; i < info.colBlocks.length; i++) {
+            ByteArray cb = cellBlocks[i];
+            load(cb, buf);
+        }
+    }
+
+    private void load(ByteArray array, ByteBuffer buf) {
+        int len = buf.getInt();
+        int pos = buf.position();
+        array.set(buf.array(), buf.arrayOffset() + pos, len);
+        buf.position(pos + len);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
new file mode 100644
index 0000000..e6e00b7
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
@@ -0,0 +1,101 @@
+package org.apache.kylin.gridtable;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.apache.kylin.metadata.measure.serializer.DataTypeSerializer;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+/**
+ * This is just for example and is INCORRECT when numbers are encoded to bytes and compared in filter.
+ * 
+ * A correct implementation must ensure dimension values preserve order after encoded, e.g. by using an
+ * order preserving dictionary.
+ * 
+ * @author yangli9
+ */
+public class GTSampleCodeSystem implements IGTCodeSystem {
+
+    private GTInfo info;
+    private DataTypeSerializer[] serializers;
+    private IGTComparator comparator;
+
+    public GTSampleCodeSystem() {
+    }
+    
+    @Override
+    public void init(GTInfo info) {
+        this.info = info;
+
+        this.serializers = new DataTypeSerializer[info.getColumnCount()];
+        for (int i = 0; i < info.getColumnCount(); i++) {
+            this.serializers[i] = DataTypeSerializer.create(info.colTypes[i]);
+        }
+
+        this.comparator = new IGTComparator() {
+            @Override
+            public boolean isNull(ByteArray code) {
+                // all 0xff is null
+                byte[] array = code.array();
+                for (int i = 0, j = code.offset(), n = code.length(); i < n; i++, j++) {
+                    if (array[j] != (byte) 0xff)
+                        return false;
+                }
+                return true;
+            }
+
+            @Override
+            public int compare(ByteArray code1, ByteArray code2) {
+                return code1.compareTo(code2);
+            }
+        };
+    }
+
+    @Override
+    public int codeLength(int col, ByteBuffer buf) {
+        return serializers[col].peekLength(buf);
+    }
+
+    @Override
+    public int maxCodeLength(int col) {
+        return serializers[col].maxLength();
+    }
+
+    @Override
+    public IGTComparator getComparator() {
+        return comparator;
+    }
+
+    // ============================================================================
+
+    @Override
+    public MeasureAggregator<?>[] newMetricsAggregators(ImmutableBitSet columns, String[] aggrFunctions) {
+        assert columns.trueBitCount() == aggrFunctions.length;
+        
+        MeasureAggregator<?>[] result = new MeasureAggregator[aggrFunctions.length];
+        for (int i = 0; i < result.length; i++) {
+            int col = columns.trueBitAt(i);
+            result[i] = MeasureAggregator.create(aggrFunctions[i], info.getColumnType(col).toString());
+        }
+        return result;
+    }
+
+    @Override
+    public void encodeColumnValue(int col, Object value, ByteBuffer buf) {
+        serializers[col].serialize(value, buf);
+    }
+
+    @Override
+    public void encodeColumnValue(int col, Object value, int roundingFlag, ByteBuffer buf) {
+        // ignore rounding
+        encodeColumnValue(col, value, buf);
+    }
+
+    @Override
+    public Object decodeColumnValue(int col, ByteBuffer buf) {
+        return serializers[col].deserialize(buf);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java
new file mode 100644
index 0000000..197fde4
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java
@@ -0,0 +1,83 @@
+package org.apache.kylin.gridtable;
+
+import java.util.Collections;
+import java.util.List;
+
+public class GTScanRange {
+
+    final public GTRecord pkStart; // inclusive, record must not be null, col[pk].array() can be null to mean unbounded
+    final public GTRecord pkEnd; // inclusive, record must not be null, col[pk].array() can be null to mean unbounded
+    final public List<GTRecord> hbaseFuzzyKeys; // partial matching primary keys
+
+    public GTScanRange(GTRecord pkStart, GTRecord pkEnd) {
+        this(pkStart, pkEnd, null);
+    }
+
+    public GTScanRange(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> hbaseFuzzyKeys) {
+        GTInfo info = pkStart.info;
+        assert info == pkEnd.info;
+
+        validateRangeKey(pkStart);
+        validateRangeKey(pkEnd);
+
+        this.pkStart = pkStart;
+        this.pkEnd = pkEnd;
+        this.hbaseFuzzyKeys = hbaseFuzzyKeys == null ? Collections.<GTRecord> emptyList() : hbaseFuzzyKeys;
+    }
+
+    private void validateRangeKey(GTRecord pk) {
+        pk.maskForEqualHashComp(pk.info.primaryKey);
+        boolean afterNull = false;
+        for (int i = 0; i < pk.info.primaryKey.trueBitCount(); i++) {
+            int c = pk.info.primaryKey.trueBitAt(i);
+            if (afterNull) {
+                pk.cols[c].set(null, 0, 0);
+            } else {
+                afterNull = pk.cols[c].array() == null;
+            }
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((hbaseFuzzyKeys == null) ? 0 : hbaseFuzzyKeys.hashCode());
+        result = prime * result + ((pkEnd == null) ? 0 : pkEnd.hashCode());
+        result = prime * result + ((pkStart == null) ? 0 : pkStart.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        GTScanRange other = (GTScanRange) obj;
+        if (hbaseFuzzyKeys == null) {
+            if (other.hbaseFuzzyKeys != null)
+                return false;
+        } else if (!hbaseFuzzyKeys.equals(other.hbaseFuzzyKeys))
+            return false;
+        if (pkEnd == null) {
+            if (other.pkEnd != null)
+                return false;
+        } else if (!pkEnd.equals(other.pkEnd))
+            return false;
+        if (pkStart == null) {
+            if (other.pkStart != null)
+                return false;
+        } else if (!pkStart.equals(other.pkStart))
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return (pkStart == null ? "null" : pkStart.toString(pkStart.info.primaryKey)) //
+                + "-" + (pkEnd == null ? "null" : pkEnd.toString(pkEnd.info.primaryKey));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
new file mode 100644
index 0000000..c09ecf0
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
@@ -0,0 +1,486 @@
+package org.apache.kylin.gridtable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class GTScanRangePlanner {
+
+    private static final int MAX_HBASE_FUZZY_KEYS = 100;
+
+    final private GTInfo info;
+    final private ComparatorEx<ByteArray> byteUnknownIsSmaller;
+    final private ComparatorEx<ByteArray> byteUnknownIsBigger;
+    final private ComparatorEx<GTRecord> recordUnknownIsSmaller;
+    final private ComparatorEx<GTRecord> recordUnknownIsBigger;
+
+    public GTScanRangePlanner(GTInfo info) {
+        this.info = info;
+
+        IGTComparator comp = info.codeSystem.getComparator();
+        this.byteUnknownIsSmaller = byteComparatorTreatsUnknownSmaller(comp);
+        this.byteUnknownIsBigger = byteComparatorTreatsUnknownBigger(comp);
+        this.recordUnknownIsSmaller = recordComparatorTreatsUnknownSmaller(comp);
+        this.recordUnknownIsBigger = recordComparatorTreatsUnknownBigger(comp);
+    }
+
+    // return empty list meaning filter is always false
+    public List<GTScanRange> planScanRanges(TupleFilter filter) {
+        return planScanRanges(filter, Integer.MAX_VALUE);
+    }
+
+    // return empty list meaning filter is always false
+    public List<GTScanRange> planScanRanges(TupleFilter filter, int maxRanges) {
+
+        TupleFilter flatFilter = flattenToOrAndFilter(filter);
+
+        List<Collection<ColumnRange>> orAndDimRanges = translateToOrAndDimRanges(flatFilter);
+
+        List<GTScanRange> scanRanges = Lists.newArrayListWithCapacity(orAndDimRanges.size());
+        for (Collection<ColumnRange> andDimRanges : orAndDimRanges) {
+            GTScanRange scanRange = newScanRange(andDimRanges);
+            scanRanges.add(scanRange);
+        }
+
+        List<GTScanRange> mergedRanges = mergeOverlapRanges(scanRanges);
+        mergedRanges = mergeTooManyRanges(mergedRanges, maxRanges);
+
+        return mergedRanges;
+    }
+
+    private GTScanRange newScanRange(Collection<ColumnRange> andDimRanges) {
+        GTRecord pkStart = new GTRecord(info);
+        GTRecord pkEnd = new GTRecord(info);
+        List<GTRecord> hbaseFuzzyKeys = Lists.newArrayList();
+
+        for (ColumnRange range : andDimRanges) {
+            int col = range.column.getColumnDesc().getZeroBasedIndex();
+            if (info.primaryKey.get(col) == false)
+                continue;
+
+            pkStart.set(col, range.begin);
+            pkEnd.set(col, range.end);
+
+            if (range.equals != null) {
+                ImmutableBitSet fuzzyMask = new ImmutableBitSet(col);
+                for (ByteArray v : range.equals) {
+                    GTRecord fuzzy = new GTRecord(info);
+                    fuzzy.set(col, v);
+                    fuzzy.maskForEqualHashComp(fuzzyMask);
+                    hbaseFuzzyKeys.add(fuzzy);
+                }
+            }
+        }
+
+        return new GTScanRange(pkStart, pkEnd, hbaseFuzzyKeys);
+    }
+
+    private TupleFilter flattenToOrAndFilter(TupleFilter filter) {
+        if (filter == null)
+            return null;
+
+        TupleFilter flatFilter = filter.flatFilter();
+
+        // normalize to OR-AND filter
+        if (flatFilter.getOperator() == FilterOperatorEnum.AND) {
+            LogicalTupleFilter f = new LogicalTupleFilter(FilterOperatorEnum.OR);
+            f.addChild(flatFilter);
+            flatFilter = f;
+        }
+
+        if (flatFilter.getOperator() != FilterOperatorEnum.OR)
+            throw new IllegalStateException();
+
+        return flatFilter;
+    }
+
+    private List<Collection<ColumnRange>> translateToOrAndDimRanges(TupleFilter flatFilter) {
+        List<Collection<ColumnRange>> result = Lists.newArrayList();
+
+        if (flatFilter == null) {
+            result.add(Collections.<ColumnRange> emptyList());
+            return result;
+        }
+
+        for (TupleFilter andFilter : flatFilter.getChildren()) {
+            if (andFilter.getOperator() != FilterOperatorEnum.AND)
+                throw new IllegalStateException("Filter should be AND instead of " + andFilter);
+
+            Collection<ColumnRange> andRanges = translateToAndDimRanges(andFilter.getChildren());
+            result.add(andRanges);
+        }
+
+        return preEvaluateConstantConditions(result);
+    }
+
+    private Collection<ColumnRange> translateToAndDimRanges(List<? extends TupleFilter> andFilters) {
+        Map<TblColRef, ColumnRange> rangeMap = new HashMap<TblColRef, ColumnRange>();
+        for (TupleFilter filter : andFilters) {
+            if ((filter instanceof CompareTupleFilter) == false) {
+                continue;
+            }
+
+            CompareTupleFilter comp = (CompareTupleFilter) filter;
+            if (comp.getColumn() == null) {
+                continue;
+            }
+
+            @SuppressWarnings("unchecked")
+            ColumnRange newRange = new ColumnRange(comp.getColumn(), (Set<ByteArray>) comp.getValues(), comp.getOperator());
+            ColumnRange existing = rangeMap.get(newRange.column);
+            if (existing == null) {
+                rangeMap.put(newRange.column, newRange);
+            } else {
+                existing.andMerge(newRange);
+            }
+        }
+        return rangeMap.values();
+    }
+
+    private List<Collection<ColumnRange>> preEvaluateConstantConditions(List<Collection<ColumnRange>> orAndRanges) {
+        boolean globalAlwaysTrue = false;
+        Iterator<Collection<ColumnRange>> iterator = orAndRanges.iterator();
+        while (iterator.hasNext()) {
+            Collection<ColumnRange> andRanges = iterator.next();
+            Iterator<ColumnRange> iterator2 = andRanges.iterator();
+            boolean hasAlwaysFalse = false;
+            while (iterator2.hasNext()) {
+                ColumnRange range = iterator2.next();
+                if (range.satisfyAll())
+                    iterator2.remove();
+                else if (range.satisfyNone())
+                    hasAlwaysFalse = true;
+            }
+            if (hasAlwaysFalse) {
+                iterator.remove();
+            } else if (andRanges.isEmpty()) {
+                globalAlwaysTrue = true;
+                break;
+            }
+        }
+        // return empty OR list means global false
+        // return an empty AND collection inside OR list means global true
+        if (globalAlwaysTrue) {
+            orAndRanges.clear();
+            orAndRanges.add(Collections.<ColumnRange> emptyList());
+        }
+        return orAndRanges;
+    }
+
+    private List<GTScanRange> mergeOverlapRanges(List<GTScanRange> ranges) {
+        if (ranges.size() <= 1) {
+            return ranges;
+        }
+
+        // sort ranges by start key
+        Collections.sort(ranges, new Comparator<GTScanRange>() {
+            @Override
+            public int compare(GTScanRange a, GTScanRange b) {
+                return recordUnknownIsSmaller.compare(a.pkStart, b.pkStart);
+            }
+        });
+
+        // merge the overlap range
+        List<GTScanRange> mergedRanges = new ArrayList<GTScanRange>();
+        int mergeBeginIndex = 0;
+        GTRecord mergeEnd = ranges.get(0).pkEnd;
+        for (int index = 0; index < ranges.size(); index++) {
+            GTScanRange range = ranges.get(index);
+
+            // if overlap, swallow it
+            if (recordUnknownIsSmaller.min(range.pkStart, mergeEnd) == range.pkStart //
+                    || recordUnknownIsBigger.max(mergeEnd, range.pkStart) == mergeEnd) {
+                mergeEnd = recordUnknownIsBigger.max(mergeEnd, range.pkEnd);
+                continue;
+            }
+
+            // not overlap, split here
+            GTScanRange mergedRange = mergeKeyRange(ranges.subList(mergeBeginIndex, index));
+            mergedRanges.add(mergedRange);
+
+            // start new split
+            mergeBeginIndex = index;
+            mergeEnd = recordUnknownIsBigger.max(mergeEnd, range.pkEnd);
+        }
+
+        // don't miss the last range
+        GTScanRange mergedRange = mergeKeyRange(ranges.subList(mergeBeginIndex, ranges.size()));
+        mergedRanges.add(mergedRange);
+
+        return mergedRanges;
+    }
+
+    private GTScanRange mergeKeyRange(List<GTScanRange> ranges) {
+        GTScanRange first = ranges.get(0);
+        if (ranges.size() == 1)
+            return first;
+
+        GTRecord start = first.pkStart;
+        GTRecord end = first.pkEnd;
+        List<GTRecord> newFuzzyKeys = new ArrayList<GTRecord>();
+
+        boolean hasNonFuzzyRange = false;
+        for (GTScanRange range : ranges) {
+            hasNonFuzzyRange = hasNonFuzzyRange || range.hbaseFuzzyKeys.isEmpty();
+            newFuzzyKeys.addAll(range.hbaseFuzzyKeys);
+            end = recordUnknownIsBigger.max(end, range.pkEnd);
+        }
+
+        // if any range is non-fuzzy, then all fuzzy keys must be cleared
+        // also too many fuzzy keys will slow down HBase scan
+        if (hasNonFuzzyRange || newFuzzyKeys.size() > MAX_HBASE_FUZZY_KEYS) {
+            newFuzzyKeys.clear();
+        }
+
+        return new GTScanRange(start, end, newFuzzyKeys);
+    }
+
+    private List<GTScanRange> mergeTooManyRanges(List<GTScanRange> ranges, int maxRanges) {
+        if (ranges.size() <= maxRanges) {
+            return ranges;
+        }
+
+        // TODO: check the distance between range and merge the large distance range
+        List<GTScanRange> result = new ArrayList<GTScanRange>(1);
+        GTScanRange mergedRange = mergeKeyRange(ranges);
+        result.add(mergedRange);
+        return result;
+    }
+
+    private class ColumnRange {
+        private TblColRef column;
+        private ByteArray begin = ByteArray.EMPTY;
+        private ByteArray end = ByteArray.EMPTY;
+        private Set<ByteArray> equals;
+
+        public ColumnRange(TblColRef column, Set<ByteArray> values, FilterOperatorEnum op) {
+            this.column = column;
+
+            switch (op) {
+            case EQ:
+            case IN:
+                equals = new HashSet<ByteArray>(values);
+                refreshBeginEndFromEquals();
+                break;
+            case LT:
+            case LTE:
+                end = byteUnknownIsBigger.max(values);
+                break;
+            case GT:
+            case GTE:
+                begin = byteUnknownIsSmaller.min(values);
+                break;
+            case NEQ:
+            case NOTIN:
+            case ISNULL:
+            case ISNOTNULL:
+                // let Optiq filter it!
+                break;
+            default:
+                throw new UnsupportedOperationException(op.name());
+            }
+        }
+
+        void copy(TblColRef column, ByteArray beginValue, ByteArray endValue, Set<ByteArray> equalValues) {
+            this.column = column;
+            this.begin = beginValue;
+            this.end = endValue;
+            this.equals = equalValues;
+        }
+
+        private void refreshBeginEndFromEquals() {
+            if (equals.isEmpty()) {
+                begin = ByteArray.EMPTY;
+                end = ByteArray.EMPTY;
+            } else {
+                begin = byteUnknownIsSmaller.min(equals);
+                end = byteUnknownIsBigger.max(equals);
+            }
+        }
+
+        public boolean satisfyAll() {
+            return begin.array() == null && end.array() == null; // the NEQ case
+        }
+
+        public boolean satisfyNone() {
+            if (equals != null) {
+                return equals.isEmpty();
+            } else if (begin.array() != null && end.array() != null) {
+                return info.codeSystem.getComparator().compare(begin, end) > 0;
+            } else {
+                return false;
+            }
+        }
+
+        public void andMerge(ColumnRange another) {
+            assert this.column.equals(another.column);
+
+            if (another.satisfyAll()) {
+                return;
+            }
+
+            if (this.satisfyAll()) {
+                copy(another.column, another.begin, another.end, another.equals);
+                return;
+            }
+
+            if (this.equals != null && another.equals != null) {
+                this.equals.retainAll(another.equals);
+                refreshBeginEndFromEquals();
+                return;
+            }
+
+            if (this.equals != null) {
+                this.equals = filter(this.equals, another.begin, another.end);
+                refreshBeginEndFromEquals();
+                return;
+            }
+
+            if (another.equals != null) {
+                this.equals = filter(another.equals, this.begin, this.end);
+                refreshBeginEndFromEquals();
+                return;
+            }
+
+            this.begin = byteUnknownIsSmaller.max(this.begin, another.begin);
+            this.end = byteUnknownIsBigger.min(this.end, another.end);
+        }
+
+        private Set<ByteArray> filter(Set<ByteArray> equalValues, ByteArray beginValue, ByteArray endValue) {
+            Set<ByteArray> result = Sets.newHashSetWithExpectedSize(equalValues.size());
+            for (ByteArray v : equalValues) {
+                if (byteUnknownIsSmaller.compare(beginValue, v) <= 0 && byteUnknownIsBigger.compare(v, endValue) <= 0) {
+                    result.add(v);
+                }
+            }
+            return equalValues;
+        }
+
+        public String toString() {
+            if (equals == null) {
+                return column.getName() + " between " + begin + " and " + end;
+            } else {
+                return column.getName() + " in " + equals;
+            }
+        }
+    }
+
+    public static abstract class ComparatorEx<T> implements Comparator<T> {
+
+        public T min(Collection<T> v) {
+            if (v.size() <= 0) {
+                return null;
+            }
+
+            Iterator<T> iterator = v.iterator();
+            T min = iterator.next();
+            while (iterator.hasNext()) {
+                min = min(min, iterator.next());
+            }
+            return min;
+        }
+
+        public T max(Collection<T> v) {
+            if (v.size() <= 0) {
+                return null;
+            }
+
+            Iterator<T> iterator = v.iterator();
+            T max = iterator.next();
+            while (iterator.hasNext()) {
+                max = max(max, iterator.next());
+            }
+            return max;
+        }
+
+        public T min(T a, T b) {
+            return compare(a, b) <= 0 ? a : b;
+        }
+
+        public T max(T a, T b) {
+            return compare(a, b) >= 0 ? a : b;
+        }
+
+        public boolean between(T v, T start, T end) {
+            return compare(start, v) <= 0 && compare(v, end) <= 0;
+        }
+    }
+
+    public static ComparatorEx<ByteArray> byteComparatorTreatsUnknownSmaller(final IGTComparator comp) {
+        return new ComparatorEx<ByteArray>() {
+            @Override
+            public int compare(ByteArray a, ByteArray b) {
+                if (a.array() == null)
+                    return -1;
+                else if (b.array() == null)
+                    return 1;
+                else
+                    return comp.compare(a, b);
+            }
+        };
+    }
+
+    public static ComparatorEx<ByteArray> byteComparatorTreatsUnknownBigger(final IGTComparator comp) {
+        return new ComparatorEx<ByteArray>() {
+            @Override
+            public int compare(ByteArray a, ByteArray b) {
+                if (a.array() == null)
+                    return 1;
+                else if (b.array() == null)
+                    return -1;
+                else
+                    return comp.compare(a, b);
+            }
+        };
+    }
+
+    public static ComparatorEx<GTRecord> recordComparatorTreatsUnknownSmaller(IGTComparator comp) {
+        return new RecordComparator(byteComparatorTreatsUnknownSmaller(comp));
+    }
+
+    public static ComparatorEx<GTRecord> recordComparatorTreatsUnknownBigger(IGTComparator comp) {
+        return new RecordComparator(byteComparatorTreatsUnknownBigger(comp));
+    }
+
+    private static class RecordComparator extends ComparatorEx<GTRecord> {
+        final ComparatorEx<ByteArray> comparator;
+
+        RecordComparator(ComparatorEx<ByteArray> byteComparator) {
+            this.comparator = byteComparator;
+        }
+
+        @Override
+        public int compare(GTRecord a, GTRecord b) {
+            assert a.info == b.info;
+            assert a.maskForEqualHashComp() == b.maskForEqualHashComp();
+            ImmutableBitSet mask = a.maskForEqualHashComp();
+
+            int comp = 0;
+            for (int i = 0; i < mask.trueBitCount(); i++) {
+                int c = mask.trueBitAt(i);
+                comp = comparator.compare(a.cols[c], b.cols[c]);
+                if (comp != 0)
+                    return comp;
+            }
+            return 0; // equals
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
new file mode 100644
index 0000000..c5443b6
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -0,0 +1,155 @@
+package org.apache.kylin.gridtable;
+
+import java.util.Arrays;
+import java.util.Set;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.collect.Sets;
+
+public class GTScanRequest {
+
+    // basic
+    private GTInfo info;
+    private GTScanRange range;
+    private ImmutableBitSet columns;
+
+    // optional filtering
+    private TupleFilter filterPushDown;
+
+    // optional aggregation
+    private ImmutableBitSet aggrGroupBy;
+    private ImmutableBitSet aggrMetrics;
+    private String[] aggrMetricsFuncs;
+
+    public GTScanRequest(GTInfo info) {
+        this(info, null, null, null);
+    }
+
+    public GTScanRequest(GTInfo info, GTScanRange range, ImmutableBitSet columns, TupleFilter filterPushDown) {
+        this.info = info;
+        this.range = range == null ? new GTScanRange(new GTRecord(info), new GTRecord(info)) : range;
+        this.columns = columns;
+        this.filterPushDown = filterPushDown;
+        validate();
+    }
+
+    public GTScanRequest(GTInfo info, GTScanRange range, ImmutableBitSet aggrGroupBy, ImmutableBitSet aggrMetrics, //
+            String[] aggrMetricsFuncs, TupleFilter filterPushDown) {
+        this(info, range, null, aggrGroupBy, aggrMetrics, aggrMetricsFuncs, filterPushDown);
+    }
+
+    public GTScanRequest(GTInfo info, GTScanRange range, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, //
+            ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown) {
+        this.info = info;
+        this.range = range == null ? new GTScanRange(new GTRecord(info), new GTRecord(info)) : range;
+        this.columns = dimensions;
+        this.filterPushDown = filterPushDown;
+
+        this.aggrGroupBy = aggrGroupBy;
+        this.aggrMetrics = aggrMetrics;
+        this.aggrMetricsFuncs = aggrMetricsFuncs;
+
+        validate();
+    }
+
+    private void validate() {
+        if (range == null)
+            range = new GTScanRange(null, null);
+
+        if (hasAggregation()) {
+            if (aggrGroupBy.intersects(aggrMetrics))
+                throw new IllegalStateException();
+            if (aggrMetrics.cardinality() != aggrMetricsFuncs.length)
+                throw new IllegalStateException();
+
+            if (columns == null)
+                columns = ImmutableBitSet.EMPTY;
+            columns = columns.or(aggrGroupBy);
+            columns = columns.or(aggrMetrics);
+        }
+
+        if (columns == null)
+            columns = info.colAll;
+
+        if (hasFilterPushDown()) {
+            validateFilterPushDown();
+        }
+    }
+
+    private void validateFilterPushDown() {
+        if (hasFilterPushDown() == false)
+            return;
+
+        Set<TblColRef> filterColumns = Sets.newHashSet();
+        TupleFilter.collectColumns(filterPushDown, filterColumns);
+
+        for (TblColRef col : filterColumns) {
+            // filter columns must belong to the table
+            info.validateColRef(col);
+            // filter columns must be returned to satisfy upper layer evaluation (calcite)
+            columns = columns.set(col.getColumnDesc().getZeroBasedIndex());
+        }
+
+        // un-evaluatable filter must be removed
+        if (TupleFilter.isEvaluableRecursively(filterPushDown) == false) {
+            Set<TblColRef> unevaluableColumns = Sets.newHashSet();
+            filterPushDown = GTUtil.convertFilterUnevaluatable(filterPushDown, info, unevaluableColumns);
+
+            // columns in un-evaluatable filter must be returned without loss so upper layer can do final evaluation
+            if (hasAggregation()) {
+                for (TblColRef col : unevaluableColumns) {
+                    aggrGroupBy = aggrGroupBy.set(col.getColumnDesc().getZeroBasedIndex());
+                }
+            }
+        }
+    }
+
+    public boolean hasFilterPushDown() {
+        return filterPushDown != null;
+    }
+
+    public boolean hasAggregation() {
+        return aggrGroupBy != null && aggrMetrics != null && aggrMetricsFuncs != null;
+    }
+
+    public GTInfo getInfo() {
+        return info;
+    }
+
+    public GTRecord getPkStart() {
+        return range.pkStart;
+    }
+
+    public GTRecord getPkEnd() {
+        return range.pkEnd;
+    }
+
+    public ImmutableBitSet getColumns() {
+        return columns;
+    }
+
+    public TupleFilter getFilterPushDown() {
+        return filterPushDown;
+    }
+
+    public ImmutableBitSet getAggrGroupBy() {
+        return aggrGroupBy;
+    }
+
+    public ImmutableBitSet getAggrMetrics() {
+        return aggrMetrics;
+    }
+
+    public String[] getAggrMetricsFuncs() {
+        return aggrMetricsFuncs;
+    }
+
+    @Override
+    public String toString() {
+        return "GTScanRequest [range=" + range + ", columns=" + columns + ", filterPushDown=" + filterPushDown + ", aggrGroupBy=" + aggrGroupBy + ", aggrMetrics=" + aggrMetrics + ", aggrMetricsFuncs=" + Arrays.toString(aggrMetricsFuncs) + "]";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
new file mode 100644
index 0000000..94ac755
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
@@ -0,0 +1,221 @@
+package org.apache.kylin.gridtable;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.IFilterCodeSystem;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilterSerializer;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.collect.Sets;
+
+public class GTUtil {
+
+    static final TableDesc MOCKUP_TABLE = TableDesc.mockup("GT_MOCKUP_TABLE");
+
+    static TblColRef tblColRef(int col, String datatype) {
+        ColumnDesc desc = ColumnDesc.mockup(MOCKUP_TABLE, col + 1, "" + col, datatype);
+        return new TblColRef(desc);
+    }
+
+    public static TupleFilter convertFilterUnevaluatable(TupleFilter rootFilter, GTInfo info, //
+            Set<TblColRef> unevaluatableColumnCollector) {
+        return convertFilter(rootFilter, info, null, false, unevaluatableColumnCollector);
+    }
+
+    public static TupleFilter convertFilterConstants(TupleFilter rootFilter, GTInfo info) {
+        return convertFilter(rootFilter, info, null, true, null);
+    }
+
+    public static TupleFilter convertFilterColumnsAndConstants(TupleFilter rootFilter, GTInfo info, //
+            List<TblColRef> colMapping, Set<TblColRef> unevaluatableColumnCollector) {
+        return convertFilter(rootFilter, info, colMapping, true, unevaluatableColumnCollector);
+    }
+
+    // converts TblColRef to GridTable column, encode constants, drop unEvaluatable parts
+    private static TupleFilter convertFilter(TupleFilter rootFilter, final GTInfo info, //
+            final List<TblColRef> colMapping, final boolean encodeConstants, //
+            final Set<TblColRef> unevaluatableColumnCollector) {
+
+        IFilterCodeSystem<ByteArray> filterCodeSystem = wrap(info.codeSystem.getComparator());
+        
+        byte[] bytes = TupleFilterSerializer.serialize(rootFilter, new TupleFilterSerializer.Decorator() {
+            @Override
+            public TupleFilter onSerialize(TupleFilter filter) {
+                if (filter == null)
+                    return null;
+
+                // In case of NOT(unEvaluatableFilter), we should immediately replace it as TRUE,
+                // Otherwise, unEvaluatableFilter will later be replace with TRUE and NOT(unEvaluatableFilter)
+                // will always return FALSE.
+                if (filter.getOperator() == TupleFilter.FilterOperatorEnum.NOT && !TupleFilter.isEvaluableRecursively(filter)) {
+                    TupleFilter.collectColumns(filter, unevaluatableColumnCollector);
+                    return ConstantTupleFilter.TRUE;
+                }
+
+                // shortcut for unEvaluatable filter
+                if (filter.isEvaluable() == false) {
+                    TupleFilter.collectColumns(filter, unevaluatableColumnCollector);
+                    return ConstantTupleFilter.TRUE;
+                }
+
+                // map to column onto grid table
+                if (colMapping != null && filter instanceof ColumnTupleFilter) {
+                    ColumnTupleFilter colFilter = (ColumnTupleFilter) filter;
+                    int gtColIdx = colMapping.indexOf(colFilter.getColumn());
+                    return new ColumnTupleFilter(info.colRef(gtColIdx));
+                }
+
+                // encode constants
+                if (encodeConstants && filter instanceof CompareTupleFilter) {
+                    return encodeConstants((CompareTupleFilter) filter);
+                }
+
+                return filter;
+            }
+
+            @SuppressWarnings({ "rawtypes", "unchecked" })
+            private TupleFilter encodeConstants(CompareTupleFilter oldCompareFilter) {
+                // extract ColumnFilter & ConstantFilter
+                TblColRef externalCol = oldCompareFilter.getColumn();
+
+                if (externalCol == null) {
+                    return oldCompareFilter;
+                }
+
+                Collection constValues = oldCompareFilter.getValues();
+                if (constValues == null || constValues.isEmpty()) {
+                    return oldCompareFilter;
+                }
+
+                CompareTupleFilter newCompareFilter = new CompareTupleFilter(oldCompareFilter.getOperator());
+                newCompareFilter.addChild(new ColumnTupleFilter(externalCol));
+
+                Object firstValue = constValues.iterator().next();
+                int col = colMapping == null ? externalCol.getColumnDesc().getZeroBasedIndex() : colMapping.indexOf(externalCol);
+
+                TupleFilter result;
+                ByteArray code;
+
+                // translate constant into code
+                switch (newCompareFilter.getOperator()) {
+                case EQ:
+                case IN:
+                    Set newValues = Sets.newHashSet();
+                    for (Object value : constValues) {
+                        code = translate(col, value, 0);
+                        if (code != null)
+                            newValues.add(code);
+                    }
+                    if (newValues.isEmpty()) {
+                        result = ConstantTupleFilter.FALSE;
+                    } else {
+                        newCompareFilter.addChild(new ConstantTupleFilter(newValues));
+                        result = newCompareFilter;
+                    }
+                    break;
+                case NEQ:
+                    code = translate(col, firstValue, 0);
+                    if (code == null) {
+                        result = ConstantTupleFilter.TRUE;
+                    } else {
+                        newCompareFilter.addChild(new ConstantTupleFilter(code));
+                        result = newCompareFilter;
+                    }
+                    break;
+                case LT:
+                    code = translate(col, firstValue, 1);
+                    if (code == null) {
+                        result = ConstantTupleFilter.TRUE;
+                    } else {
+                        newCompareFilter.addChild(new ConstantTupleFilter(code));
+                        result = newCompareFilter;
+                    }
+                    break;
+                case LTE:
+                    code = translate(col, firstValue, -1);
+                    if (code == null) {
+                        result = ConstantTupleFilter.FALSE;
+                    } else {
+                        newCompareFilter.addChild(new ConstantTupleFilter(code));
+                        result = newCompareFilter;
+                    }
+                    break;
+                case GT:
+                    code = translate(col, firstValue, -1);
+                    if (code == null) {
+                        result = ConstantTupleFilter.TRUE;
+                    } else {
+                        newCompareFilter.addChild(new ConstantTupleFilter(code));
+                        result = newCompareFilter;
+                    }
+                    break;
+                case GTE:
+                    code = translate(col, firstValue, 1);
+                    if (code == null) {
+                        result = ConstantTupleFilter.FALSE;
+                    } else {
+                        newCompareFilter.addChild(new ConstantTupleFilter(code));
+                        result = newCompareFilter;
+                    }
+                    break;
+                default:
+                    throw new IllegalStateException("Cannot handle operator " + newCompareFilter.getOperator());
+                }
+                return result;
+            }
+
+            transient ByteBuffer buf = ByteBuffer.allocate(info.getMaxColumnLength());
+
+            private ByteArray translate(int col, Object value, int roundingFlag) {
+                try {
+                    buf.clear();
+                    info.codeSystem.encodeColumnValue(col, value, roundingFlag, buf);
+                    return ByteArray.copyOf(buf.array(), 0, buf.position());
+                } catch (IllegalArgumentException ex) {
+                    return null;
+                }
+            }
+        }, filterCodeSystem);
+
+        return TupleFilterSerializer.deserialize(bytes, filterCodeSystem);
+    }
+
+    public static IFilterCodeSystem<ByteArray> wrap(final IGTComparator comp) {
+        return new IFilterCodeSystem<ByteArray>() {
+
+            @Override
+            public int compare(ByteArray o1, ByteArray o2) {
+                return comp.compare(o1, o2);
+            }
+
+            @Override
+            public boolean isNull(ByteArray code) {
+                return comp.isNull(code);
+            }
+
+            @Override
+            public void serialize(ByteArray code, ByteBuffer buffer) {
+                if (code == null)
+                    BytesUtil.writeByteArray(null, 0, 0, buffer);
+                else
+                    BytesUtil.writeByteArray(code.array(), code.offset(), code.length(), buffer);
+            }
+
+            @Override
+            public ByteArray deserialize(ByteBuffer buffer) {
+                return new ByteArray(BytesUtil.readByteArray(buffer));
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GridTable.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GridTable.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GridTable.java
new file mode 100644
index 0000000..b8e59f6
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GridTable.java
@@ -0,0 +1,61 @@
+package org.apache.kylin.gridtable;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public class GridTable implements Closeable {
+
+    final GTInfo info;
+    final IGTStore store;
+
+    public GridTable(GTInfo info, IGTStore store) {
+        this.info = info;
+        this.store = store;
+    }
+
+    public GTBuilder rebuild() throws IOException {
+        assert info.isShardingEnabled() == false;
+        return rebuild(-1);
+    }
+
+    public GTBuilder rebuild(int shard) throws IOException {
+        assert shard < info.nShards;
+        return new GTBuilder(info, shard, store);
+    }
+
+    public GTBuilder append() throws IOException {
+        assert info.isShardingEnabled() == false;
+        return append(-1);
+    }
+
+    public GTBuilder append(int shard) throws IOException {
+        return new GTBuilder(info, shard, store, true);
+    }
+
+    public IGTScanner scan(GTScanRequest req) throws IOException {
+        IGTScanner result = new GTRawScanner(info, store, req);
+        
+        if (req.hasFilterPushDown()) {
+            result = new GTFilterScanner(result, req);
+        }
+        if (req.hasAggregation()) {
+            result = new GTAggregateScanner(result, req);
+        }
+        return result;
+    }
+
+    public GTInfo getInfo() {
+        return info;
+    }
+
+    public IGTStore getStore() {
+        return store;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (store instanceof Closeable) {
+            ((Closeable) store).close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java
new file mode 100644
index 0000000..bb715b2
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java
@@ -0,0 +1,43 @@
+package org.apache.kylin.gridtable;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.metadata.measure.MeasureAggregator;
+
+public interface IGTCodeSystem {
+    
+    void init(GTInfo info);
+
+    IGTComparator getComparator();
+    
+    /** Return the length of code starting at the specified buffer, buffer position must not change after return */
+    int codeLength(int col, ByteBuffer buf);
+    
+    /** Return the max possible length of a column */
+    int maxCodeLength(int col);
+    
+    /**
+     * Encode a value into code.
+     * 
+     * @throws IllegalArgumentException if the value is not in dictionary
+     */
+    void encodeColumnValue(int col, Object value, ByteBuffer buf) throws IllegalArgumentException;
+    
+    /**
+     * Encode a value into code, with option to floor rounding -1, no rounding 0,  or ceiling rounding 1
+     * 
+     * @throws IllegalArgumentException
+     * - if rounding=0 and the value is not in dictionary
+     * - if rounding=-1 and there's no equal or smaller value in dictionary
+     * - if rounding=1 and there's no equal or bigger value in dictionary
+     */
+    void encodeColumnValue(int col, Object value, int roundingFlag, ByteBuffer buf) throws IllegalArgumentException;
+    
+    /** Decode a code into value */
+    Object decodeColumnValue(int col, ByteBuffer buf);
+    
+    /** Return aggregators for metrics */
+    MeasureAggregator<?>[] newMetricsAggregators(ImmutableBitSet columns, String[] aggrFunctions);
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/IGTComparator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTComparator.java b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTComparator.java
new file mode 100644
index 0000000..4edc4db
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTComparator.java
@@ -0,0 +1,15 @@
+package org.apache.kylin.gridtable;
+
+import java.util.Comparator;
+
+import org.apache.kylin.common.util.ByteArray;
+
+public interface IGTComparator extends Comparator<ByteArray> {
+
+    /** if given code represents the NULL value */
+    boolean isNull(ByteArray code);
+
+    /** compare two values by their codes */
+    // int compare(T code1, T code2);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/IGTScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTScanner.java
new file mode 100644
index 0000000..42e2dec
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTScanner.java
@@ -0,0 +1,13 @@
+package org.apache.kylin.gridtable;
+
+import java.io.Closeable;
+
+public interface IGTScanner extends Iterable<GTRecord>, Closeable {
+    
+    GTInfo getInfo();
+    
+    int getScannedRowCount();
+    
+    int getScannedRowBlockCount();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStore.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStore.java b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStore.java
new file mode 100644
index 0000000..70620b3
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTStore.java
@@ -0,0 +1,26 @@
+package org.apache.kylin.gridtable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+
+public interface IGTStore {
+
+    GTInfo getInfo();
+    
+    IGTStoreWriter rebuild(int shard) throws IOException;
+    
+    IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException;
+    
+    IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, ImmutableBitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException;
+
+    interface IGTStoreWriter extends Closeable {
+        void write(GTRowBlock block) throws IOException;
+    }
+    
+    interface IGTStoreScanner extends Iterator<GTRowBlock>, Closeable {
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java b/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
new file mode 100644
index 0000000..c6af8e5
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
@@ -0,0 +1,101 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.gridtable.GTInfo.Builder;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.model.DataType;
+
+public class UnitTestSupport {
+
+    public static GTInfo basicInfo() {
+        Builder builder = infoBuilder();
+        GTInfo info = builder.build();
+        return info;
+    }
+
+    public static GTInfo advancedInfo() {
+        Builder builder = infoBuilder();
+        builder.enableColumnBlock(new ImmutableBitSet[] { setOf(0), setOf(1, 2), setOf(3, 4) });
+        builder.enableRowBlock(4);
+        GTInfo info = builder.build();
+        return info;
+    }
+
+    private static Builder infoBuilder() {
+        Builder builder = GTInfo.builder();
+        builder.setCodeSystem(new GTSampleCodeSystem());
+        builder.setColumns( //
+                DataType.getInstance("varchar(10)"), //
+                DataType.getInstance("varchar(10)"), //
+                DataType.getInstance("varchar(10)"), //
+                DataType.getInstance("bigint"), //
+                DataType.getInstance("decimal") //
+        );
+        builder.setPrimaryKey(setOf(0));
+        builder.setColumnPreferIndex(setOf(0));
+        return builder;
+    }
+
+    public static List<GTRecord> mockupData(GTInfo info, int nRows) {
+        List<GTRecord> result = new ArrayList<GTRecord>(nRows);
+        int round = nRows / 10;
+        for (int i = 0; i < round; i++) {
+            String d_01_14 = datePlus("2015-01-14", i * 4);
+            String d_01_15 = datePlus("2015-01-15", i * 4);
+            String d_01_16 = datePlus("2015-01-16", i * 4);
+            String d_01_17 = datePlus("2015-01-17", i * 4);
+            result.add(newRec(info, d_01_14, "Yang", "Food", new LongMutable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_14, "Luke", "Food", new LongMutable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_15, "Xu", "Food", new LongMutable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_15, "Dong", "Food", new LongMutable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_15, "Jason", "Food", new LongMutable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_16, "Mahone", "Food", new LongMutable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_16, "Shaofeng", "Food", new LongMutable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_16, "Qianhao", "Food", new LongMutable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_16, "George", "Food", new LongMutable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_17, "Kejia", "Food", new LongMutable(10), new BigDecimal("10.5")));
+        }
+        return result;
+    }
+    
+    private static String datePlus(String date, int plusDays) {
+        long millis = DateFormat.stringToMillis(date);
+        millis += (1000L * 3600L * 24L) * plusDays;
+        return DateFormat.formatToDateStr(millis);
+    }
+
+    private static GTRecord newRec(GTInfo info, String date, String name, String category, LongMutable amount, BigDecimal price) {
+        GTRecord rec = new GTRecord(info);
+        return rec.setValues(date, name, category, amount, price);
+    }
+
+    private static ImmutableBitSet setOf(int... values) {
+        BitSet set = new BitSet();
+        for (int i : values)
+            set.set(i);
+        return new ImmutableBitSet(set);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/memstore/GTSimpleMemStore.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/memstore/GTSimpleMemStore.java b/core-cube/src/main/java/org/apache/kylin/gridtable/memstore/GTSimpleMemStore.java
new file mode 100644
index 0000000..0011d83
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/memstore/GTSimpleMemStore.java
@@ -0,0 +1,112 @@
+package org.apache.kylin.gridtable.memstore;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTRowBlock;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.IGTStore;
+
+public class GTSimpleMemStore implements IGTStore {
+
+    final GTInfo info;
+    final List<GTRowBlock> rowBlockList;
+
+    public GTSimpleMemStore(GTInfo info) {
+        this.info = info;
+        this.rowBlockList = new ArrayList<GTRowBlock>();
+
+        if (info.isShardingEnabled())
+            throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public GTInfo getInfo() {
+        return info;
+    }
+
+    public long memoryUsage() {
+        if (rowBlockList.size() == 0) {
+            return 0;
+        } else {
+            return rowBlockList.get(0).exportLength() * Long.valueOf(rowBlockList.size());
+        }
+    }
+
+    @Override
+    public IGTStoreWriter rebuild(int shard) {
+        rowBlockList.clear();
+        return new Writer(rowBlockList);
+    }
+
+    @Override
+    public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) {
+        if (rowBlockList.size() > 0) {
+            GTRowBlock last = rowBlockList.get(rowBlockList.size() - 1);
+            fillLast.copyFrom(last);
+        }
+        return new Writer(rowBlockList);
+    }
+
+    private static class Writer implements IGTStoreWriter {
+
+        private final List<GTRowBlock> rowBlockList;
+
+        Writer(List<GTRowBlock> rowBlockList) {
+            this.rowBlockList = rowBlockList;
+        }
+        @Override
+        public void close() throws IOException {
+        }
+
+        @Override
+        public void write(GTRowBlock block) throws IOException {
+            GTRowBlock copy = block.copy();
+            int id = block.getSequenceId();
+            if (id < rowBlockList.size()) {
+                rowBlockList.set(id, copy);
+            } else {
+                assert id == rowBlockList.size();
+                rowBlockList.add(copy);
+            }
+        }
+    }
+
+    @Override
+    public IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, ImmutableBitSet selectedColBlocks, GTScanRequest additionalPushDown) {
+
+        return new IGTStoreScanner() {
+            Iterator<GTRowBlock> it = rowBlockList.iterator();
+
+            @Override
+            public boolean hasNext() {
+                return it.hasNext();
+            }
+
+            @Override
+            public GTRowBlock next() {
+                return it.next();
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public void close() throws IOException {
+            }
+        };
+    }
+
+    public void drop() throws IOException {
+        //will there be any concurrent issue? If yes, ArrayList should be replaced
+        rowBlockList.clear();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java
new file mode 100644
index 0000000..0b4a3c6
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java
@@ -0,0 +1,94 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.cube.inmemcubing.ConcurrentDiskStore;
+import org.apache.kylin.gridtable.GTBuilder;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GridTable;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.UnitTestSupport;
+import org.junit.Test;
+
+public class ConcurrentDiskStoreTest {
+
+    final GTInfo info = UnitTestSupport.advancedInfo();
+    final List<GTRecord> data = UnitTestSupport.mockupData(info, 1000000); // converts to about 34 MB data
+
+    @Test
+    public void testSingleThreadRead() throws IOException, InterruptedException {
+        long start = System.currentTimeMillis();
+        verifyOneTableWriteAndRead(1);
+        long end = System.currentTimeMillis();
+        System.out.println("Cost " + (end - start) + " millis");
+    }
+
+    @Test
+    public void testMultiThreadRead() throws IOException, InterruptedException {
+        long start = System.currentTimeMillis();
+        verifyOneTableWriteAndRead(20);
+        long end = System.currentTimeMillis();
+        System.out.println("Cost " + (end - start) + " millis");
+    }
+    
+    private void verifyOneTableWriteAndRead(int readThreads) throws IOException, InterruptedException {
+        ConcurrentDiskStore store = new ConcurrentDiskStore(info);
+        GridTable table = new GridTable(info, store);
+        verifyWriteAndRead(table, readThreads);
+    }
+
+    private void verifyWriteAndRead(final GridTable table, int readThreads) throws IOException, InterruptedException {
+        GTBuilder builder = table.rebuild();
+        for (GTRecord r : data) {
+            builder.write(r);
+        }
+        builder.close();
+
+        int nThreads = readThreads;
+        Thread[] t = new Thread[nThreads];
+        for (int i = 0; i < nThreads; i++) {
+            t[i] = new Thread() {
+                public void run() {
+                    try {
+                        IGTScanner scanner = table.scan(new GTScanRequest(table.getInfo()));
+                        int i = 0;
+                        for (GTRecord r : scanner) {
+                            assertEquals(data.get(i++), r);
+                        }
+                        scanner.close();
+                    } catch (Exception ex) {
+                        ex.printStackTrace();
+                    }
+                }
+            };
+            t[i].start();
+        }
+        for (int i = 0; i < nThreads; i++) {
+            t[i].join();
+        }
+        
+        ((ConcurrentDiskStore) table.getStore()).close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderStressTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderStressTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderStressTest.java
new file mode 100644
index 0000000..de949ba
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderStressTest.java
@@ -0,0 +1,97 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class DoggedCubeBuilderStressTest extends LocalFileMetadataTestCase {
+
+    @SuppressWarnings("unused")
+    private static final Logger logger = LoggerFactory.getLogger(DoggedCubeBuilderStressTest.class);
+
+    // CI sandbox memory is no more than 512MB, this many input should hit memory threshold
+    private static final int INPUT_ROWS = 200000;
+    private static final int THREADS = 4;
+
+    private static CubeInstance cube;
+    private static String flatTable;
+    private static Map<TblColRef, Dictionary<?>> dictionaryMap;
+
+    @BeforeClass
+    public static void before() throws IOException {
+        staticCreateTestMetadata();
+
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+
+        cube = cubeManager.getCube("test_kylin_cube_without_slr_left_join_empty");
+        flatTable = "../examples/test_case_data/localmeta/data/flatten_data_for_without_slr_left_join.csv";
+        dictionaryMap = InMemCubeBuilderTest.getDictionaryMap(cube, flatTable);
+    }
+
+    @AfterClass
+    public static void after() throws Exception {
+        staticCleanupTestMetadata();
+    }
+
+    @Test
+    public void test() throws Exception {
+
+        ArrayBlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(1000);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        long randSeed = System.currentTimeMillis();
+
+        DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
+        doggedBuilder.setConcurrentThreads(THREADS);
+
+        {
+            Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, new NoopWriter()));
+            InMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed);
+            future.get();
+        }
+    }
+
+    class NoopWriter implements ICuboidWriter {
+        @Override
+        public void write(long cuboidId, GTRecord record) throws IOException {
+        }
+    }
+}
\ No newline at end of file