You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/24 01:20:33 UTC

[25/28] incubator-kylin git commit: KYLIN-875 Split job module into 'core-job', 'engine-mr', 'source-hive', 'storage-hbase'. The old job remains as an assembly project.

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
new file mode 100644
index 0000000..85ac47a
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -0,0 +1,661 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.kylin.cube.inmemcubing;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.gridtable.CubeGridTable;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.gridtable.GTAggregateScanner;
+import org.apache.kylin.gridtable.GTBuilder;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GridTable;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.metadata.measure.DoubleMutable;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Build a cube (many cuboids) in memory. Calculating multiple cuboids at the same time as long as memory permits.
+ * Assumes base cuboid fits in memory or otherwise OOM exception will occur.
+ */
+public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
+
+    private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
+    private static final LongMutable ONE = new LongMutable(1l);
+
+    private final CuboidScheduler cuboidScheduler;
+    private final long baseCuboidId;
+    private final int totalCuboidCount;
+    private final CubeJoinedFlatTableDesc intermediateTableDesc;
+    private final MeasureCodec measureCodec;
+    private final String[] metricsAggrFuncs;
+    private final int[] hbaseMeasureRefIndex;
+    private final MeasureDesc[] measureDescs;
+    private final int measureCount;
+
+    private MemoryBudgetController memBudget;
+    private Thread[] taskThreads;
+    private Throwable[] taskThreadExceptions;
+    private LinkedBlockingQueue<CuboidTask> taskPending;
+    private AtomicInteger taskCuboidCompleted = new AtomicInteger(0);
+
+    private CuboidResult baseResult;
+    private Object[] totalSumForSanityCheck;
+    private ICuboidCollector resultCollector;
+
+    public InMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
+        super(cubeDesc, dictionaryMap);
+        this.cuboidScheduler = new CuboidScheduler(cubeDesc);
+        this.baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+        this.totalCuboidCount = cuboidScheduler.getCuboidCount();
+        this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+        this.measureCodec = new MeasureCodec(cubeDesc.getMeasures());
+
+        Map<String, Integer> measureIndexMap = Maps.newHashMap();
+        List<String> metricsAggrFuncsList = Lists.newArrayList();
+        measureCount = cubeDesc.getMeasures().size();
+
+        List<MeasureDesc> measureDescsList = Lists.newArrayList();
+        hbaseMeasureRefIndex = new int[measureCount];
+        int measureRef = 0;
+        for (HBaseColumnFamilyDesc familyDesc : cubeDesc.getHbaseMapping().getColumnFamily()) {
+            for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
+                for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
+                    for (int j = 0; j < measureCount; j++) {
+                        if (cubeDesc.getMeasures().get(j).equals(measure)) {
+                            measureDescsList.add(measure);
+                            hbaseMeasureRefIndex[measureRef] = j;
+                            break;
+                        }
+                    }
+                    measureRef++;
+                }
+            }
+        }
+
+        for (int i = 0; i < measureCount; i++) {
+            MeasureDesc measureDesc = measureDescsList.get(i);
+            metricsAggrFuncsList.add(measureDesc.getFunction().getExpression());
+            measureIndexMap.put(measureDesc.getName(), i);
+        }
+        this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]);
+        this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
+    }
+
+    private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
+        GTInfo info = CubeGridTable.newGTInfo(cubeDesc, cuboidID, dictionaryMap);
+
+        // Below several store implementation are very similar in performance. The ConcurrentDiskStore is the simplest.
+        // MemDiskStore store = new MemDiskStore(info, memBudget == null ? MemoryBudgetController.ZERO_BUDGET : memBudget);
+        // MemDiskStore store = new MemDiskStore(info, MemoryBudgetController.ZERO_BUDGET);
+        ConcurrentDiskStore store = new ConcurrentDiskStore(info);
+
+        GridTable gridTable = new GridTable(info, store);
+        return gridTable;
+    }
+
+    private Pair<ImmutableBitSet, ImmutableBitSet> getDimensionAndMetricColumnBitSet(long cuboidId) {
+        BitSet bitSet = BitSet.valueOf(new long[] { cuboidId });
+        BitSet dimension = new BitSet();
+        dimension.set(0, bitSet.cardinality());
+        BitSet metrics = new BitSet();
+        metrics.set(bitSet.cardinality(), bitSet.cardinality() + this.measureCount);
+        return new Pair<ImmutableBitSet, ImmutableBitSet>(new ImmutableBitSet(dimension), new ImmutableBitSet(metrics));
+    }
+
+    @Override
+    public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
+        ConcurrentNavigableMap<Long, CuboidResult> result = build(input);
+        for (CuboidResult cuboidResult : result.values()) {
+            outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output);
+            cuboidResult.table.close();
+        }
+    }
+
+    ConcurrentNavigableMap<Long, CuboidResult> build(BlockingQueue<List<String>> input) throws IOException {
+        final ConcurrentNavigableMap<Long, CuboidResult> result = new ConcurrentSkipListMap<Long, CuboidResult>();
+        build(input, new ICuboidCollector() {
+            @Override
+            public void collect(CuboidResult cuboidResult) {
+                result.put(cuboidResult.cuboidId, cuboidResult);
+            }
+        });
+        return result;
+    }
+
+    interface ICuboidCollector {
+        void collect(CuboidResult result);
+    }
+
+    static class CuboidResult {
+        public long cuboidId;
+        public GridTable table;
+        public int nRows;
+        public long timeSpent;
+        public int aggrCacheMB;
+
+        public CuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) {
+            this.cuboidId = cuboidId;
+            this.table = table;
+            this.nRows = nRows;
+            this.timeSpent = timeSpent;
+            this.aggrCacheMB = aggrCacheMB;
+        }
+    }
+
+    private void build(BlockingQueue<List<String>> input, ICuboidCollector collector) throws IOException {
+        long startTime = System.currentTimeMillis();
+        logger.info("In Mem Cube Build start, " + cubeDesc.getName());
+
+        // multiple threads to compute cuboid in parallel
+        taskPending = new LinkedBlockingQueue<>();
+        taskCuboidCompleted.set(0);
+        taskThreads = prepareTaskThreads();
+        taskThreadExceptions = new Throwable[taskThreadCount];
+
+        // build base cuboid
+        resultCollector = collector;
+        totalSumForSanityCheck = null;
+        baseResult = createBaseCuboid(input);
+        if (baseResult.nRows == 0)
+            return;
+
+        // plan memory budget
+        makeMemoryBudget();
+
+        // kick off N-D cuboid tasks and output
+        addChildTasks(baseResult);
+        start(taskThreads);
+
+        // wait complete
+        join(taskThreads);
+
+        long endTime = System.currentTimeMillis();
+        logger.info("In Mem Cube Build end, " + cubeDesc.getName() + ", takes " + (endTime - startTime) + " ms");
+
+        throwExceptionIfAny();
+    }
+
+    public void abort() {
+        interrupt(taskThreads);
+    }
+
+    private void start(Thread... threads) {
+        for (Thread t : threads)
+            t.start();
+    }
+
+    private void interrupt(Thread... threads) {
+        for (Thread t : threads)
+            t.interrupt();
+    }
+
+    private void join(Thread... threads) throws IOException {
+        try {
+            for (Thread t : threads)
+                t.join();
+        } catch (InterruptedException e) {
+            throw new IOException("interrupted while waiting task and output complete", e);
+        }
+    }
+
+    private void throwExceptionIfAny() throws IOException {
+        ArrayList<Throwable> errors = new ArrayList<Throwable>();
+        for (int i = 0; i < taskThreadCount; i++) {
+            Throwable t = taskThreadExceptions[i];
+            if (t != null)
+                errors.add(t);
+        }
+        if (errors.isEmpty()) {
+            return;
+        } else if (errors.size() == 1) {
+            Throwable t = errors.get(0);
+            if (t instanceof IOException)
+                throw (IOException) t;
+            else
+                throw new IOException(t);
+        } else {
+            for (Throwable t : errors)
+                logger.error("Exception during in-mem cube build", t);
+            throw new IOException(errors.size() + " exceptions during in-mem cube build, cause set to the first, check log for more", errors.get(0));
+        }
+    }
+
+    private Thread[] prepareTaskThreads() {
+        Thread[] result = new Thread[taskThreadCount];
+        for (int i = 0; i < taskThreadCount; i++) {
+            result[i] = new CuboidTaskThread(i);
+        }
+        return result;
+    }
+
+    public boolean isAllCuboidDone() {
+        return taskCuboidCompleted.get() == totalCuboidCount;
+    }
+
+    private class CuboidTaskThread extends Thread {
+        private int id;
+
+        CuboidTaskThread(int id) {
+            super("CuboidTask-" + id);
+            this.id = id;
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (!isAllCuboidDone()) {
+                    CuboidTask task = null;
+                    while (task == null && taskHasNoException()) {
+                        task = taskPending.poll(15, TimeUnit.SECONDS);
+                    }
+                    // if task error occurs
+                    if (task == null)
+                        break;
+
+                    CuboidResult newCuboid = buildCuboid(task.parent, task.childCuboidId);
+                    addChildTasks(newCuboid);
+
+                    if (isAllCuboidDone()) {
+                        for (Thread t : taskThreads) {
+                            if (t != Thread.currentThread())
+                                t.interrupt();
+                        }
+                    }
+                }
+            } catch (Throwable ex) {
+                if (!isAllCuboidDone()) {
+                    logger.error("task thread exception", ex);
+                    taskThreadExceptions[id] = ex;
+                }
+            }
+        }
+    }
+
+    private boolean taskHasNoException() {
+        for (int i = 0; i < taskThreadExceptions.length; i++)
+            if (taskThreadExceptions[i] != null)
+                return false;
+        return true;
+    }
+
+    private void addChildTasks(CuboidResult parent) {
+        List<Long> children = cuboidScheduler.getSpanningCuboid(parent.cuboidId);
+        for (Long child : children) {
+            taskPending.add(new CuboidTask(parent, child));
+        }
+    }
+
+    private int getSystemAvailMB() {
+        Runtime.getRuntime().gc();
+        try {
+            Thread.sleep(500);
+        } catch (InterruptedException e) {
+            logger.error("", e);
+        }
+        return MemoryBudgetController.getSystemAvailMB();
+    }
+
+    private void makeMemoryBudget() {
+        int systemAvailMB = getSystemAvailMB();
+        logger.info("System avail " + systemAvailMB + " MB");
+        int reserve = Math.max(reserveMemoryMB, baseResult.aggrCacheMB / 3);
+        logger.info("Reserve " + reserve + " MB for system basics");
+
+        int budget = systemAvailMB - reserve;
+        if (budget < baseResult.aggrCacheMB) {
+            // make sure we have base aggr cache as minimal
+            budget = baseResult.aggrCacheMB;
+            logger.warn("!!! System avail memory (" + systemAvailMB + " MB) is less than base aggr cache (" + baseResult.aggrCacheMB + " MB) + minimal reservation (" + reserve + " MB), consider increase JVM heap -Xmx");
+        }
+
+        logger.info("Memory Budget is " + budget + " MB");
+        memBudget = new MemoryBudgetController(budget);
+    }
+
+    private CuboidResult createBaseCuboid(BlockingQueue<List<String>> input) throws IOException {
+        GridTable baseCuboid = newGridTableByCuboidID(baseCuboidId);
+        GTBuilder baseBuilder = baseCuboid.rebuild();
+        IGTScanner baseInput = new InputConverter(baseCuboid.getInfo(), input);
+
+        int mbBefore = getSystemAvailMB();
+        int mbAfter = 0;
+
+        Pair<ImmutableBitSet, ImmutableBitSet> dimensionMetricsBitSet = getDimensionAndMetricColumnBitSet(baseCuboidId);
+        GTScanRequest req = new GTScanRequest(baseCuboid.getInfo(), null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null);
+        GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req);
+
+        long startTime = System.currentTimeMillis();
+        logger.info("Calculating cuboid " + baseCuboidId);
+
+        int count = 0;
+        for (GTRecord r : aggregationScanner) {
+            if (mbAfter == 0) {
+                mbAfter = getSystemAvailMB();
+            }
+            baseBuilder.write(r);
+            count++;
+        }
+        aggregationScanner.close();
+        baseBuilder.close();
+
+        long timeSpent = System.currentTimeMillis() - startTime;
+        logger.info("Cuboid " + baseCuboidId + " has " + count + " rows, build takes " + timeSpent + "ms");
+
+        int mbBaseAggrCacheOnHeap = mbAfter == 0 ? 0 : mbBefore - mbAfter;
+        int mbEstimateBaseAggrCache = (int) (aggregationScanner.getEstimateSizeOfAggrCache() / MemoryBudgetController.ONE_MB);
+        int mbBaseAggrCache = Math.max((int) (mbBaseAggrCacheOnHeap * 1.1), mbEstimateBaseAggrCache);
+        mbBaseAggrCache = Math.max(mbBaseAggrCache, 10); // let it be 10 MB at least
+        logger.info("Base aggr cache is " + mbBaseAggrCache + " MB (heap " + mbBaseAggrCacheOnHeap + " MB, estimate " + mbEstimateBaseAggrCache + " MB)");
+
+        return updateCuboidResult(baseCuboidId, baseCuboid, count, timeSpent, mbBaseAggrCache);
+    }
+
+    private CuboidResult updateCuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) {
+        if (aggrCacheMB <= 0) {
+            aggrCacheMB = (int) Math.ceil(1.0 * nRows / baseResult.nRows * baseResult.aggrCacheMB);
+        }
+
+        CuboidResult result = new CuboidResult(cuboidId, table, nRows, timeSpent, aggrCacheMB);
+        taskCuboidCompleted.incrementAndGet();
+
+        resultCollector.collect(result);
+        return result;
+    }
+
+    private CuboidResult buildCuboid(CuboidResult parent, long cuboidId) throws IOException {
+        final String consumerName = "AggrCache@Cuboid " + cuboidId;
+        MemoryBudgetController.MemoryConsumer consumer = new MemoryBudgetController.MemoryConsumer() {
+            @Override
+            public int freeUp(int mb) {
+                return 0; // cannot free up on demand
+            }
+
+            @Override
+            public String toString() {
+                return consumerName;
+            }
+        };
+
+        // reserve memory for aggregation cache, can't be larger than the parent
+        memBudget.reserveInsist(consumer, parent.aggrCacheMB);
+        try {
+            return aggregateCuboid(parent, cuboidId);
+        } finally {
+            memBudget.reserve(consumer, 0);
+        }
+    }
+
+    private CuboidResult aggregateCuboid(CuboidResult parent, long cuboidId) throws IOException {
+        Pair<ImmutableBitSet, ImmutableBitSet> columnBitSets = getDimensionAndMetricColumnBitSet(parent.cuboidId);
+        ImmutableBitSet parentDimensions = columnBitSets.getFirst();
+        ImmutableBitSet measureColumns = columnBitSets.getSecond();
+        ImmutableBitSet childDimensions = parentDimensions;
+
+        long mask = Long.highestOneBit(parent.cuboidId);
+        long childCuboidId = cuboidId;
+        long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parent.cuboidId);
+        int index = 0;
+        for (int i = 0; i < parentCuboidIdActualLength; i++) {
+            if ((mask & parent.cuboidId) > 0) {
+                if ((mask & childCuboidId) == 0) {
+                    // this dim will be aggregated
+                    childDimensions = childDimensions.set(index, false);
+                }
+                index++;
+            }
+            mask = mask >> 1;
+        }
+
+        return scanAndAggregateGridTable(parent.table, cuboidId, childDimensions, measureColumns);
+    }
+
+    private CuboidResult scanAndAggregateGridTable(GridTable gridTable, long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException {
+        long startTime = System.currentTimeMillis();
+        logger.info("Calculating cuboid " + cuboidId);
+
+        GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, aggregationColumns, measureColumns, metricsAggrFuncs, null);
+        GTAggregateScanner scanner = (GTAggregateScanner) gridTable.scan(req);
+        GridTable newGridTable = newGridTableByCuboidID(cuboidId);
+        GTBuilder builder = newGridTable.rebuild();
+
+        ImmutableBitSet allNeededColumns = aggregationColumns.or(measureColumns);
+
+        GTRecord newRecord = new GTRecord(newGridTable.getInfo());
+        int count = 0;
+        try {
+            for (GTRecord record : scanner) {
+                count++;
+                for (int i = 0; i < allNeededColumns.trueBitCount(); i++) {
+                    int c = allNeededColumns.trueBitAt(i);
+                    newRecord.set(i, record.get(c));
+                }
+                builder.write(newRecord);
+            }
+
+            // disable sanity check for performance
+            sanityCheck(scanner.getTotalSumForSanityCheck());
+        } finally {
+            scanner.close();
+            builder.close();
+        }
+
+        long timeSpent = System.currentTimeMillis() - startTime;
+        logger.info("Cuboid " + cuboidId + " has " + count + " rows, build takes " + timeSpent + "ms");
+
+        return updateCuboidResult(cuboidId, newGridTable, count, timeSpent, 0);
+    }
+
+    //@SuppressWarnings("unused")
+    private void sanityCheck(Object[] totalSum) {
+        // double sum introduces error and causes result not exactly equal
+        for (int i = 0; i < totalSum.length; i++) {
+            if (totalSum[i] instanceof DoubleMutable) {
+                totalSum[i] = Math.round(((DoubleMutable) totalSum[i]).get());
+            }
+        }
+
+        if (totalSumForSanityCheck == null) {
+            totalSumForSanityCheck = totalSum;
+            return;
+        }
+        if (Arrays.equals(totalSumForSanityCheck, totalSum) == false) {
+            throw new IllegalStateException();
+        }
+    }
+
+    // ===========================================================================
+
+    private static class CuboidTask implements Comparable<CuboidTask> {
+        final CuboidResult parent;
+        final long childCuboidId;
+
+        CuboidTask(CuboidResult parent, long childCuboidId) {
+            this.parent = parent;
+            this.childCuboidId = childCuboidId;
+        }
+
+        @Override
+        public int compareTo(CuboidTask o) {
+            long comp = this.childCuboidId - o.childCuboidId;
+            return comp < 0 ? -1 : (comp > 0 ? 1 : 0);
+        }
+    }
+
+    // ============================================================================
+
+    private class InputConverter implements IGTScanner {
+        GTInfo info;
+        GTRecord record;
+        BlockingQueue<List<String>> input;
+
+        public InputConverter(GTInfo info, BlockingQueue<List<String>> input) {
+            this.info = info;
+            this.input = input;
+            this.record = new GTRecord(info);
+        }
+
+        @Override
+        public Iterator<GTRecord> iterator() {
+            return new Iterator<GTRecord>() {
+
+                List<String> currentObject = null;
+
+                @Override
+                public boolean hasNext() {
+                    try {
+                        currentObject = input.take();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                    return currentObject != null && currentObject.size() > 0;
+                }
+
+                @Override
+                public GTRecord next() {
+                    if (currentObject.size() == 0)
+                        throw new IllegalStateException();
+
+                    buildGTRecord(currentObject, record);
+                    return record;
+                }
+
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            };
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+
+        @Override
+        public GTInfo getInfo() {
+            return info;
+        }
+
+        @Override
+        public int getScannedRowCount() {
+            return 0;
+        }
+
+        @Override
+        public int getScannedRowBlockCount() {
+            return 0;
+        }
+
+        private void buildGTRecord(List<String> row, GTRecord record) {
+            Object[] dimensions = buildKey(row);
+            Object[] metricsValues = buildValue(row);
+            Object[] recordValues = new Object[dimensions.length + metricsValues.length];
+            System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length);
+            System.arraycopy(metricsValues, 0, recordValues, dimensions.length, metricsValues.length);
+            record.setValues(recordValues);
+        }
+
+        private Object[] buildKey(List<String> row) {
+            int keySize = intermediateTableDesc.getRowKeyColumnIndexes().length;
+            Object[] key = new Object[keySize];
+
+            for (int i = 0; i < keySize; i++) {
+                key[i] = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
+            }
+
+            return key;
+        }
+
+        private Object[] buildValue(List<String> row) {
+
+            Object[] values = new Object[measureCount];
+            MeasureDesc measureDesc = null;
+
+            for (int position = 0; position < hbaseMeasureRefIndex.length; position++) {
+                int i = hbaseMeasureRefIndex[position];
+                measureDesc = measureDescs[i];
+
+                Object value = null;
+                int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[i];
+                FunctionDesc function = cubeDesc.getMeasures().get(i).getFunction();
+                if (function.isCount() || function.isHolisticCountDistinct()) {
+                    // note for holistic count distinct, this value will be ignored
+                    value = ONE;
+                } else if (flatTableIdx == null) {
+                    value = measureCodec.getSerializer(i).valueOf(measureDesc.getFunction().getParameter().getValue());
+                } else if (flatTableIdx.length == 1) {
+                    value = measureCodec.getSerializer(i).valueOf(toBytes(row.get(flatTableIdx[0])));
+                } else {
+
+                    byte[] result = null;
+                    for (int x = 0; x < flatTableIdx.length; x++) {
+                        byte[] split = toBytes(row.get(flatTableIdx[x]));
+                        if (result == null) {
+                            result = Arrays.copyOf(split, split.length);
+                        } else {
+                            byte[] newResult = new byte[result.length + split.length];
+                            System.arraycopy(result, 0, newResult, 0, result.length);
+                            System.arraycopy(split, 0, newResult, result.length, split.length);
+                            result = newResult;
+                        }
+                    }
+                    value = measureCodec.getSerializer(i).valueOf(result);
+                }
+                values[position] = value;
+            }
+            return values;
+        }
+
+        private byte[] toBytes(String v) {
+            return v == null ? null : Bytes.toBytes(v);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/MemDiskStore.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/MemDiskStore.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/MemDiskStore.java
new file mode 100644
index 0000000..d9b0ba6
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/MemDiskStore.java
@@ -0,0 +1,679 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import static org.apache.kylin.common.util.MemoryBudgetController.*;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.NoSuchElementException;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.common.util.MemoryBudgetController.MemoryConsumer;
+import org.apache.kylin.common.util.MemoryBudgetController.NotEnoughBudgetException;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTRowBlock;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.IGTStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemDiskStore implements IGTStore, Closeable {
+
+    private static final Logger logger = LoggerFactory.getLogger(MemDiskStore.class);
+    private static final boolean debug = true;
+
+    private static final int STREAM_BUFFER_SIZE = 8192;
+    private static final int MEM_CHUNK_SIZE_MB = 5;
+
+    private final GTInfo info;
+    private final Object lock; // all public methods that read/write object states are synchronized on this lock
+    private final MemPart memPart;
+    private final DiskPart diskPart;
+    private final boolean delOnClose;
+
+    private Writer ongoingWriter;
+
+    public MemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl) throws IOException {
+        this(info, budgetCtrl, File.createTempFile("MemDiskStore", ""), true);
+    }
+
+    public MemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile) throws IOException {
+        this(info, budgetCtrl, diskFile, false);
+    }
+
+    private MemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile, boolean delOnClose) throws IOException {
+        this.info = info;
+        this.lock = this;
+        this.memPart = new MemPart(budgetCtrl);
+        this.diskPart = new DiskPart(diskFile);
+        this.delOnClose = delOnClose;
+
+        // in case user forget to call close()
+        if (delOnClose)
+            diskFile.deleteOnExit();
+    }
+
+    @Override
+    public GTInfo getInfo() {
+        return info;
+    }
+
+    @Override
+    public IGTStoreWriter rebuild(int shard) throws IOException {
+        return newWriter(0);
+    }
+
+    @Override
+    public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException {
+        return newWriter(length());
+    }
+
+    private Writer newWriter(long startOffset) throws IOException {
+        synchronized (lock) {
+            if (ongoingWriter != null)
+                throw new IllegalStateException();
+
+            ongoingWriter = new Writer(startOffset);
+            return ongoingWriter;
+        }
+    }
+
+    @Override
+    public IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, ImmutableBitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
+        synchronized (lock) {
+            return new Reader();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        // synchronized inside the parts close()
+        memPart.close();
+        diskPart.close();
+    }
+
+    public long length() {
+        synchronized (lock) {
+            return Math.max(memPart.tailOffset(), diskPart.tailOffset);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "MemDiskStore@" + (info.getTableName() == null ? this.hashCode() : info.getTableName());
+    }
+
+    private class Reader implements IGTStoreScanner {
+
+        final DataInputStream din;
+        long readOffset = 0;
+        long memRead = 0;
+        long diskRead = 0;
+        int nReadCalls = 0;
+
+        GTRowBlock block = GTRowBlock.allocate(info);
+        GTRowBlock next = null;
+
+        Reader() throws IOException {
+            diskPart.openRead();
+            if (debug)
+                logger.debug(MemDiskStore.this + " read start @ " + readOffset);
+
+            InputStream in = new InputStream() {
+                byte[] tmp = new byte[1];
+                MemChunk memChunk;
+
+                @Override
+                public int read() throws IOException {
+                    int n = read(tmp, 0, 1);
+                    if (n <= 0)
+                        return -1;
+                    else
+                        return (int) tmp[0];
+                }
+
+                @Override
+                public int read(byte[] b, int off, int len) throws IOException {
+                    synchronized (lock) {
+                        nReadCalls++;
+                        if (available() <= 0)
+                            return -1;
+
+                        if (memChunk == null && memPart.headOffset() <= readOffset && readOffset < memPart.tailOffset()) {
+                            memChunk = memPart.seekMemChunk(readOffset);
+                        }
+
+                        int lenToGo = Math.min(available(), len);
+
+                        int nRead = 0;
+                        while (lenToGo > 0) {
+                            int n;
+                            if (memChunk != null) {
+                                if (memChunk.headOffset() > readOffset) {
+                                    memChunk = null;
+                                    continue;
+                                }
+                                if (readOffset >= memChunk.tailOffset()) {
+                                    memChunk = memChunk.next;
+                                    continue;
+                                }
+                                int chunkOffset = (int) (readOffset - memChunk.headOffset());
+                                n = Math.min((int) (memChunk.tailOffset() - readOffset), lenToGo);
+                                System.arraycopy(memChunk.data, chunkOffset, b, off, n);
+                                memRead += n;
+                            } else {
+                                n = diskPart.read(readOffset, b, off, lenToGo);
+                                diskRead += n;
+                            }
+                            lenToGo -= n;
+                            nRead += n;
+                            off += n;
+                            readOffset += n;
+                        }
+                        return nRead;
+                    }
+                }
+
+                @Override
+                public int available() throws IOException {
+                    synchronized (lock) {
+                        return (int) (length() - readOffset);
+                    }
+                }
+            };
+
+            din = new DataInputStream(new BufferedInputStream(in, STREAM_BUFFER_SIZE));
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (next != null)
+                return true;
+
+            try {
+                if (din.available() > 0) {
+                    block.importFrom(din);
+                    next = block;
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+
+            return next != null;
+        }
+
+        @Override
+        public GTRowBlock next() {
+            if (next == null) {
+                hasNext();
+                if (next == null)
+                    throw new NoSuchElementException();
+            }
+            GTRowBlock r = next;
+            next = null;
+            return r;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void close() throws IOException {
+            synchronized (lock) {
+                din.close();
+                diskPart.closeRead();
+                if (debug)
+                    logger.debug(MemDiskStore.this + " read end @ " + readOffset + ", " + (memRead) + " from mem, " + (diskRead) + " from disk, " + nReadCalls + " read() calls");
+            }
+        }
+
+    }
+
+    private class Writer implements IGTStoreWriter {
+
+        final DataOutputStream dout;
+        long writeOffset;
+        long memWrite = 0;
+        long diskWrite = 0;
+        int nWriteCalls;
+        boolean closed = false;
+
+        Writer(long startOffset) throws IOException {
+            writeOffset = 0; // TODO does not support append yet
+            memPart.clear();
+            diskPart.clear();
+            diskPart.openWrite(false);
+            if (debug)
+                logger.debug(MemDiskStore.this + " write start @ " + writeOffset);
+
+            memPart.activateMemWrite();
+
+            OutputStream out = new OutputStream() {
+                byte[] tmp = new byte[1];
+                boolean memPartActivated = true;
+
+                @Override
+                public void write(int b) throws IOException {
+                    tmp[0] = (byte) b;
+                    write(tmp, 0, 1);
+                }
+
+                @Override
+                public void write(byte[] bytes, int offset, int length) throws IOException {
+                    // lock inside memPart.write() and diskPartm.write()
+                    nWriteCalls++;
+                    while (length > 0) {
+                        int n;
+                        if (memPartActivated) {
+                            n = memPart.write(bytes, offset, length, writeOffset);
+                            memWrite += n;
+                            if (n == 0) {
+                                memPartActivated = false;
+                            }
+                        } else {
+                            n = diskPart.write(writeOffset, bytes, offset, length);
+                            diskWrite += n;
+                        }
+                        offset += n;
+                        length -= n;
+                        writeOffset += n;
+                    }
+                }
+            };
+            dout = new DataOutputStream(new BufferedOutputStream(out, STREAM_BUFFER_SIZE));
+        }
+
+        @Override
+        public void write(GTRowBlock block) throws IOException {
+            block.export(dout);
+        }
+
+        @Override
+        public void close() throws IOException {
+            synchronized (lock) {
+                if (!closed) {
+                    dout.close();
+                    memPart.deactivateMemWrite();
+                }
+
+                if (memPart.asyncFlusher == null) {
+                    assert writeOffset == diskPart.tailOffset;
+                    diskPart.closeWrite();
+                    ongoingWriter = null;
+                    if (debug)
+                        logger.debug(MemDiskStore.this + " write end @ " + writeOffset + ", " + (memWrite) + " to mem, " + (diskWrite) + " to disk, " + nWriteCalls + " write() calls");
+                } else {
+                    // the asyncFlusher will call this close() again later
+                }
+                closed = true;
+            }
+        }
+    }
+
+    private static class MemChunk {
+        long diskOffset;
+        int length;
+        byte[] data;
+        MemChunk next;
+
+        boolean isFull() {
+            return length == data.length;
+        }
+
+        long headOffset() {
+            return diskOffset;
+        }
+
+        long tailOffset() {
+            return diskOffset + length;
+        }
+
+        int freeSpace() {
+            return data.length - length;
+        }
+    }
+
+    private class MemPart implements Closeable, MemoryConsumer {
+
+        final MemoryBudgetController budgetCtrl;
+
+        // async flush thread checks this flag out of sync block
+        volatile boolean writeActivated;
+        MemChunk firstChunk;
+        MemChunk lastChunk;
+        int chunkCount;
+
+        Thread asyncFlusher;
+        MemChunk asyncFlushChunk;
+        long asyncFlushDiskOffset;
+        Throwable asyncFlushException;
+
+        MemPart(MemoryBudgetController budgetCtrl) {
+            this.budgetCtrl = budgetCtrl;
+        }
+
+        long headOffset() {
+            return firstChunk == null ? 0 : firstChunk.headOffset();
+        }
+
+        long tailOffset() {
+            return lastChunk == null ? 0 : lastChunk.tailOffset();
+        }
+
+        public MemChunk seekMemChunk(long diskOffset) {
+            MemChunk c = firstChunk;
+            while (c != null && c.headOffset() <= diskOffset) {
+                if (diskOffset < c.tailOffset())
+                    break;
+                c = c.next;
+            }
+            return c;
+        }
+
+        public int write(byte[] bytes, int offset, int length, long diskOffset) {
+            int needMoreMem = 0;
+
+            synchronized (lock) {
+                if (writeActivated == false)
+                    return 0;
+
+                // write is only expected at the tail
+                if (diskOffset != tailOffset())
+                    return 0;
+
+                if (chunkCount == 0 || lastChunk.isFull())
+                    needMoreMem = (chunkCount + 1) * MEM_CHUNK_SIZE_MB;
+            }
+
+            // call to budgetCtrl.reserve() must be out of synchronized block, or deadlock may happen between MemoryConsumers
+            if (needMoreMem > 0) {
+                try {
+                    budgetCtrl.reserve(this, needMoreMem);
+                } catch (NotEnoughBudgetException ex) {
+                    deactivateMemWrite();
+                    return 0;
+                }
+            }
+
+            synchronized (lock) {
+                if (needMoreMem > 0 && (chunkCount == 0 || lastChunk.isFull())) {
+                    MemChunk chunk = new MemChunk();
+                    chunk.diskOffset = diskOffset;
+                    chunk.data = new byte[ONE_MB * MEM_CHUNK_SIZE_MB - 48]; // -48 for MemChunk overhead
+                    if (chunkCount == 0) {
+                        firstChunk = lastChunk = chunk;
+                    } else {
+                        lastChunk.next = chunk;
+                        lastChunk = chunk;
+                    }
+                    chunkCount++;
+                }
+
+                int n = Math.min(lastChunk.freeSpace(), length);
+                System.arraycopy(bytes, offset, lastChunk.data, lastChunk.length, n);
+                lastChunk.length += n;
+
+                if (n > 0)
+                    asyncFlush(lastChunk, diskOffset, n);
+
+                return n;
+            }
+        }
+
+        private void asyncFlush(MemChunk lastChunk, long diskOffset, int n) {
+            if (asyncFlushChunk == null) {
+                asyncFlushChunk = lastChunk;
+                asyncFlushDiskOffset = diskOffset;
+            }
+
+            if (asyncFlusher == null) {
+                asyncFlusher = new Thread() {
+                    public void run() {
+                        asyncFlushException = null;
+                        if (debug)
+                            logger.debug(MemDiskStore.this + " async flush started @ " + asyncFlushDiskOffset);
+                        try {
+                            while (writeActivated) {
+                                flushToDisk();
+                                Thread.sleep(10);
+                            }
+                            flushToDisk();
+
+                            if (debug)
+                                logger.debug(MemDiskStore.this + " async flush ended @ " + asyncFlushDiskOffset);
+
+                            synchronized (lock) {
+                                asyncFlusher = null;
+                                asyncFlushChunk = null;
+                                if (ongoingWriter.closed) {
+                                    ongoingWriter.close(); // call writer.close() again to clean up
+                                }
+                            }
+                        } catch (Throwable ex) {
+                            asyncFlushException = ex;
+                        }
+                    }
+                };
+                asyncFlusher.start();
+            }
+        }
+
+        private void flushToDisk() throws IOException {
+            byte[] data;
+            int offset = 0;
+            int length = 0;
+            int flushedLen = 0;
+
+            while (true) {
+                data = null;
+                synchronized (lock) {
+                    asyncFlushDiskOffset += flushedLen; // bytes written in last loop
+                    //                    if (debug)
+                    //                        logger.debug(GTMemDiskStore.this + " async flush @ " + asyncFlushDiskOffset);
+                    if (asyncFlushChunk != null && asyncFlushChunk.tailOffset() == asyncFlushDiskOffset) {
+                        asyncFlushChunk = asyncFlushChunk.next;
+                    }
+                    if (asyncFlushChunk != null) {
+                        data = asyncFlushChunk.data;
+                        offset = (int) (asyncFlushDiskOffset - asyncFlushChunk.headOffset());
+                        length = asyncFlushChunk.length - offset;
+                    }
+                }
+
+                if (data == null)
+                    break;
+
+                flushedLen = diskPart.write(asyncFlushDiskOffset, data, offset, length);
+            }
+        }
+
+        @Override
+        public int freeUp(int mb) {
+            synchronized (lock) {
+                int mbReleased = 0;
+                while (chunkCount > 0 && mbReleased < mb) {
+                    if (firstChunk == asyncFlushChunk)
+                        break;
+
+                    mbReleased += MEM_CHUNK_SIZE_MB;
+                    chunkCount--;
+                    if (chunkCount == 0) {
+                        firstChunk = lastChunk = null;
+                    } else {
+                        MemChunk next = firstChunk.next;
+                        firstChunk.next = null;
+                        firstChunk = next;
+                    }
+                }
+                return mbReleased;
+            }
+        }
+
+        public void activateMemWrite() {
+            if (budgetCtrl.getTotalBudgetMB() > 0) {
+                writeActivated = true;
+                if (debug)
+                    logger.debug(MemDiskStore.this + " mem write activated");
+            }
+        }
+
+        public void deactivateMemWrite() {
+            writeActivated = false;
+            if (debug)
+                logger.debug(MemDiskStore.this + " mem write de-activated");
+        }
+
+        public void clear() {
+            chunkCount = 0;
+            firstChunk = lastChunk = null;
+            budgetCtrl.reserve(this, 0);
+        }
+
+        @Override
+        public void close() throws IOException {
+            synchronized (lock) {
+                if (asyncFlushException != null)
+                    throwAsyncException(asyncFlushException);
+            }
+            try {
+                asyncFlusher.join();
+            } catch (NullPointerException npe) {
+                // that's fine, async flusher may not present
+            } catch (InterruptedException e) {
+                logger.warn("async join interrupted", e);
+            }
+            synchronized (lock) {
+                if (asyncFlushException != null)
+                    throwAsyncException(asyncFlushException);
+
+                clear();
+            }
+        }
+
+        private void throwAsyncException(Throwable ex) throws IOException {
+            if (ex instanceof IOException)
+                throw (IOException) ex;
+            else
+                throw new IOException(ex);
+        }
+
+        @Override
+        public String toString() {
+            return MemDiskStore.this.toString();
+        }
+
+    }
+
+    private class DiskPart implements Closeable {
+        final File diskFile;
+        FileChannel writeChannel;
+        FileChannel readChannel;
+        int readerCount = 0; // allow parallel readers
+        long tailOffset;
+
+        DiskPart(File diskFile) throws IOException {
+            this.diskFile = diskFile;
+            this.tailOffset = diskFile.length();
+            if (debug)
+                logger.debug(MemDiskStore.this + " disk file " + diskFile.getAbsolutePath());
+        }
+
+        public void openRead() throws IOException {
+            if (readChannel == null) {
+                readChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.READ);
+            }
+            readerCount++;
+        }
+
+        public int read(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
+            return readChannel.read(ByteBuffer.wrap(bytes, offset, length), diskOffset);
+        }
+
+        public void closeRead() throws IOException {
+            closeRead(false);
+        }
+
+        private void closeRead(boolean force) throws IOException {
+            readerCount--;
+            if (readerCount == 0 || force) {
+                if (readChannel != null) {
+                    readChannel.close();
+                    readChannel = null;
+                }
+            }
+        }
+
+        public void openWrite(boolean append) throws IOException {
+            if (append) {
+                writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.APPEND, StandardOpenOption.WRITE);
+                tailOffset = diskFile.length();
+            } else {
+                diskFile.delete();
+                writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
+                tailOffset = 0;
+            }
+        }
+
+        public int write(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
+            synchronized (lock) {
+                int n = writeChannel.write(ByteBuffer.wrap(bytes, offset, length), diskOffset);
+                tailOffset = Math.max(diskOffset + n, tailOffset);
+                return n;
+            }
+        }
+
+        public void closeWrite() throws IOException {
+            if (writeChannel != null) {
+                writeChannel.close();
+                writeChannel = null;
+            }
+        }
+
+        public void clear() throws IOException {
+            diskFile.delete();
+            tailOffset = 0;
+        }
+
+        @Override
+        public void close() throws IOException {
+            synchronized (lock) {
+                closeWrite();
+                closeRead(true);
+                if (delOnClose) {
+                    diskFile.delete();
+                }
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
new file mode 100644
index 0000000..38ccccd
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -0,0 +1,268 @@
+package org.apache.kylin.gridtable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.metadata.measure.HLLCAggregator;
+import org.apache.kylin.metadata.measure.LDCAggregator;
+import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class GTAggregateScanner implements IGTScanner {
+
+    @SuppressWarnings("unused")
+    private static final Logger logger = LoggerFactory.getLogger(GTAggregateScanner.class);
+
+    final GTInfo info;
+    final ImmutableBitSet dimensions; // dimensions to return, can be more than group by
+    final ImmutableBitSet groupBy;
+    final ImmutableBitSet metrics;
+    final String[] metricsAggrFuncs;
+    final IGTScanner inputScanner;
+    final AggregationCache aggrCache;
+
+    public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req) {
+        if (req.hasAggregation() == false)
+            throw new IllegalStateException();
+
+        this.info = inputScanner.getInfo();
+        this.dimensions = req.getColumns().andNot(req.getAggrMetrics());
+        this.groupBy = req.getAggrGroupBy();
+        this.metrics = req.getAggrMetrics();
+        this.metricsAggrFuncs = req.getAggrMetricsFuncs();
+        this.inputScanner = inputScanner;
+        this.aggrCache = new AggregationCache();
+    }
+
+    @Override
+    public GTInfo getInfo() {
+        return info;
+    }
+
+    @Override
+    public int getScannedRowCount() {
+        return inputScanner.getScannedRowCount();
+    }
+
+    @Override
+    public int getScannedRowBlockCount() {
+        return inputScanner.getScannedRowBlockCount();
+    }
+
+    @Override
+    public void close() throws IOException {
+        inputScanner.close();
+    }
+
+    @Override
+    public Iterator<GTRecord> iterator() {
+        for (GTRecord r : inputScanner) {
+            aggrCache.aggregate(r);
+        }
+        return aggrCache.iterator();
+    }
+
+    /** return the estimate memory size of aggregation cache */
+    public long getEstimateSizeOfAggrCache() {
+        return aggrCache.esitmateMemSize();
+    }
+
+    public Object[] getTotalSumForSanityCheck() {
+        return aggrCache.calculateTotalSumSanityCheck();
+    }
+
+    class AggregationCache {
+        final SortedMap<byte[], MeasureAggregator[]> aggBufMap;
+        final int keyLength;
+        final boolean[] compareMask;
+
+        public AggregationCache() {
+            compareMask = createCompareMask();
+            keyLength = compareMask.length;
+            aggBufMap = Maps.newTreeMap(new Comparator<byte[]>() {
+                @Override
+                public int compare(byte[] o1, byte[] o2) {
+                    int result = 0;
+                    // profiler shows this check is slow
+                    // Preconditions.checkArgument(keyLength == o1.length && keyLength == o2.length);
+                    for (int i = 0; i < keyLength; ++i) {
+                        if (compareMask[i]) {
+                            int a = (o1[i] & 0xff);
+                            int b = (o2[i] & 0xff);
+                            result = a - b;
+                            if (result == 0) {
+                                continue;
+                            } else {
+                                return result;
+                            }
+                        }
+                    }
+                    return result;
+                }
+            });
+        }
+
+        private boolean[] createCompareMask() {
+            int keyLength = 0;
+            for (int i = 0; i < dimensions.trueBitCount(); i++) {
+                int c = dimensions.trueBitAt(i);
+                int l = info.codeSystem.maxCodeLength(c);
+                keyLength += l;
+            }
+
+            boolean[] mask = new boolean[keyLength];
+            int p = 0;
+            for (int i = 0; i < dimensions.trueBitCount(); i++) {
+                int c = dimensions.trueBitAt(i);
+                int l = info.codeSystem.maxCodeLength(c);
+                boolean m = groupBy.get(c) ? true : false;
+                for (int j = 0; j < l; j++) {
+                    mask[p++] = m;
+                }
+            }
+            return mask;
+        }
+
+        private byte[] createKey(GTRecord record) {
+            byte[] result = new byte[keyLength];
+            int offset = 0;
+            for (int i = 0; i < dimensions.trueBitCount(); i++) {
+                int c = dimensions.trueBitAt(i);
+                final ByteArray byteArray = record.cols[c];
+                final int columnLength = info.codeSystem.maxCodeLength(c);
+                System.arraycopy(byteArray.array(), byteArray.offset(), result, offset, byteArray.length());
+                offset += columnLength;
+            }
+            assert offset == result.length;
+            return result;
+        }
+
+        void aggregate(GTRecord r) {
+            final byte[] key = createKey(r);
+            MeasureAggregator[] aggrs = aggBufMap.get(key);
+            if (aggrs == null) {
+                aggrs = newAggregators();
+                aggBufMap.put(key, aggrs);
+            }
+            for (int i = 0; i < aggrs.length; i++) {
+                int col = metrics.trueBitAt(i);
+                Object metrics = info.codeSystem.decodeColumnValue(col, r.cols[col].asBuffer());
+                aggrs[i].aggregate(metrics);
+            }
+        }
+
+        private MeasureAggregator[] newAggregators() {
+            return info.codeSystem.newMetricsAggregators(metrics, metricsAggrFuncs);
+        }
+
+        public Object[] calculateTotalSumSanityCheck() {
+            MeasureAggregator[] totalSum = newAggregators();
+
+            // skip expensive aggregation
+            for (int i = 0; i < totalSum.length; i++) {
+                if (totalSum[i] instanceof HLLCAggregator || totalSum[i] instanceof LDCAggregator)
+                    totalSum[i] = null;
+            }
+
+            for (MeasureAggregator[] entry : aggBufMap.values()) {
+                for (int i = 0; i < totalSum.length; i++) {
+                    if (totalSum[i] != null)
+                        totalSum[i].aggregate(entry[i].getState());
+                }
+            }
+            Object[] result = new Object[totalSum.length];
+            for (int i = 0; i < totalSum.length; i++) {
+                if (totalSum[i] != null)
+                    result[i] = totalSum[i].getState();
+            }
+            return result;
+        }
+
+        public long esitmateMemSize() {
+            if (aggBufMap.isEmpty())
+                return 0;
+
+            byte[] sampleKey = aggBufMap.firstKey();
+            MeasureAggregator<?>[] sampleValue = aggBufMap.get(sampleKey);
+            return estimateSizeOfAggrCache(sampleKey, sampleValue, aggBufMap.size());
+        }
+
+        public Iterator<GTRecord> iterator() {
+            return new Iterator<GTRecord>() {
+
+                final Iterator<Entry<byte[], MeasureAggregator[]>> it = aggBufMap.entrySet().iterator();
+
+                final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics));
+                final GTRecord secondRecord = new GTRecord(info);
+
+                @Override
+                public boolean hasNext() {
+                    return it.hasNext();
+                }
+
+                @Override
+                public GTRecord next() {
+                    Entry<byte[], MeasureAggregator[]> entry = it.next();
+                    create(entry.getKey(), entry.getValue());
+                    return secondRecord;
+                }
+
+                private void create(byte[] key, MeasureAggregator[] value) {
+                    int offset = 0;
+                    for (int i = 0; i < dimensions.trueBitCount(); i++) {
+                        int c = dimensions.trueBitAt(i);
+                        final int columnLength = info.codeSystem.maxCodeLength(c);
+                        secondRecord.set(c, new ByteArray(key, offset, columnLength));
+                        offset += columnLength;
+                    }
+                    metricsBuf.clear();
+                    for (int i = 0; i < value.length; i++) {
+                        int col = metrics.trueBitAt(i);
+                        int pos = metricsBuf.position();
+                        info.codeSystem.encodeColumnValue(col, value[i].getState(), metricsBuf);
+                        secondRecord.cols[col].set(metricsBuf.array(), pos, metricsBuf.position() - pos);
+                    }
+                }
+
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            };
+        }
+    }
+
+    public static long estimateSizeOfAggrCache(byte[] keySample, MeasureAggregator<?>[] aggrSample, int size) {
+        // Aggregation cache is basically a tree map. The tree map entry overhead is
+        // - 40 according to http://java-performance.info/memory-consumption-of-java-data-types-2/
+        // - 41~52 according to AggregationCacheMemSizeTest
+        return (estimateSizeOf(keySample) + estimateSizeOf(aggrSample) + 64) * size;
+    }
+
+    public static long estimateSizeOf(MeasureAggregator[] aggrs) {
+        // size of array, AggregationCacheMemSizeTest reports 4 for [0], 12 for [1], 12 for [2], 20 for [3] etc..
+        // Memory alignment to 8 bytes
+        long est = (aggrs.length + 1) / 2 * 8 + 4 + (4 /* extra */);
+        for (MeasureAggregator aggr : aggrs) {
+            if (aggr != null)
+                est += aggr.getMemBytesEstimate();
+        }
+        return est;
+    }
+
+    public static long estimateSizeOf(byte[] bytes) {
+        // AggregationCacheMemSizeTest reports 20 for byte[10] and 20 again for byte[16]
+        // Memory alignment to 8 bytes
+        return (bytes.length + 7) / 8 * 8 + 4 + (4 /* extra */);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java
new file mode 100644
index 0000000..7308b5c
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java
@@ -0,0 +1,74 @@
+package org.apache.kylin.gridtable;
+
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+
+import org.apache.kylin.gridtable.IGTStore.IGTStoreWriter;
+
+public class GTBuilder implements Closeable, Flushable {
+
+    @SuppressWarnings("unused")
+    final private GTInfo info;
+    final private IGTStoreWriter storeWriter;
+
+    final private GTRowBlock block;
+    final private GTRowBlock.Writer blockWriter;
+
+    private int writtenRowCount;
+    private int writtenRowBlockCount;
+
+    GTBuilder(GTInfo info, int shard, IGTStore store) throws IOException {
+        this(info, shard, store, false);
+    }
+
+    GTBuilder(GTInfo info, int shard, IGTStore store, boolean append) throws IOException {
+        this.info = info;
+
+        block = GTRowBlock.allocate(info);
+        blockWriter = block.getWriter();
+        if (append) {
+            storeWriter = store.append(shard, blockWriter);
+            if (block.isFull()) {
+                blockWriter.clearForNext();
+            }
+        } else {
+            storeWriter = store.rebuild(shard);
+        }
+    }
+
+    public void write(GTRecord r) throws IOException {
+        blockWriter.append(r);
+        writtenRowCount++;
+
+        if (block.isFull()) {
+            flush();
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        blockWriter.readyForFlush();
+        storeWriter.write(block);
+        writtenRowBlockCount++;
+        if (block.isFull()) {
+            blockWriter.clearForNext();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (block.isEmpty() == false) {
+            flush();
+        }
+        storeWriter.close();
+    }
+
+    public int getWrittenRowCount() {
+        return writtenRowCount;
+    }
+
+    public int getWrittenRowBlockCount() {
+        return writtenRowBlockCount;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
new file mode 100644
index 0000000..21fb167
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTFilterScanner.java
@@ -0,0 +1,100 @@
+package org.apache.kylin.gridtable;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.metadata.filter.IFilterCodeSystem;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
+
+public class GTFilterScanner implements IGTScanner {
+
+    final private IGTScanner inputScanner;
+    final private TupleFilter filter;
+    final private IEvaluatableTuple oneTuple; // avoid instance creation
+    
+    private GTRecord next = null;
+
+    public GTFilterScanner(IGTScanner inputScanner, GTScanRequest req) throws IOException {
+        this.inputScanner = inputScanner;
+        this.filter = req.getFilterPushDown();
+        this.oneTuple = new IEvaluatableTuple() {
+            @Override
+            public Object getValue(TblColRef col) {
+                return next.get(col.getColumnDesc().getZeroBasedIndex());
+            }
+        };
+
+        if (TupleFilter.isEvaluableRecursively(filter) == false)
+            throw new IllegalArgumentException();
+    }
+
+    @Override
+    public GTInfo getInfo() {
+        return inputScanner.getInfo();
+    }
+
+    @Override
+    public int getScannedRowCount() {
+        return inputScanner.getScannedRowCount();
+    }
+
+    @Override
+    public int getScannedRowBlockCount() {
+        return inputScanner.getScannedRowBlockCount();
+    }
+
+    @Override
+    public void close() throws IOException {
+        inputScanner.close();
+    }
+
+    @Override
+    public Iterator<GTRecord> iterator() {
+        return new Iterator<GTRecord>() {
+            
+            private Iterator<GTRecord> inputIterator = inputScanner.iterator();
+
+            @Override
+            public boolean hasNext() {
+                if (next != null)
+                    return true;
+
+                IFilterCodeSystem<ByteArray> filterCodeSystem = GTUtil.wrap(getInfo().codeSystem.getComparator());
+
+                while (inputIterator.hasNext()) {
+                    next = inputIterator.next();
+                    if (filter != null && filter.evaluate(oneTuple, filterCodeSystem) == false) {
+                        continue;
+                    }
+                    return true;
+                }
+                next = null;
+                return false;
+            }
+
+            @Override
+            public GTRecord next() {
+                // fetch next record
+                if (next == null) {
+                    hasNext();
+                    if (next == null)
+                        throw new NoSuchElementException();
+                }
+
+                GTRecord result = next;
+                next = null;
+                return result;
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
new file mode 100644
index 0000000..87d7811
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
@@ -0,0 +1,246 @@
+package org.apache.kylin.gridtable;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+
+public class GTInfo {
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    String tableName;
+    IGTCodeSystem codeSystem;
+
+    // column schema
+    int nColumns;
+    DataType[] colTypes;
+    ImmutableBitSet colAll;
+    ImmutableBitSet colPreferIndex;
+    transient TblColRef[] colRefs;
+
+    // grid info
+    ImmutableBitSet primaryKey; // order by, uniqueness is not required
+    ImmutableBitSet[] colBlocks; // primary key must be the first column block
+    ImmutableBitSet colBlocksAll;
+    int rowBlockSize; // 0: disable row block
+
+    // sharding
+    int nShards; // 0: no sharding
+
+    // must create from builder
+    private GTInfo() {
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public IGTCodeSystem getCodeSystem() {
+        return codeSystem;
+    }
+
+    public int getColumnCount() {
+        return nColumns;
+    }
+
+    public DataType getColumnType(int i) {
+        return colTypes[i];
+    }
+
+    public ImmutableBitSet getPrimaryKey() {
+        return primaryKey;
+    }
+
+    public boolean isShardingEnabled() {
+        return nShards > 0;
+    }
+
+    public boolean isRowBlockEnabled() {
+        return rowBlockSize > 0;
+    }
+
+    public int getRowBlockSize() {
+        return rowBlockSize;
+    }
+
+    public int getMaxRecordLength() {
+        return getMaxColumnLength(colAll);
+    }
+
+    public int getMaxColumnLength(ImmutableBitSet selectedCols) {
+        int result = 0;
+        for (int i = 0; i < selectedCols.trueBitCount(); i++) {
+            int c = selectedCols.trueBitAt(i);
+            result += codeSystem.maxCodeLength(c);
+        }
+        return result;
+    }
+
+    public int getMaxColumnLength() {
+        int max = 0;
+        for (int i = 0; i < nColumns; i++)
+            max = Math.max(max, codeSystem.maxCodeLength(i));
+        return max;
+    }
+
+    public ImmutableBitSet selectColumnBlocks(ImmutableBitSet columns) {
+        if (columns == null)
+            columns = colAll;
+
+        BitSet result = new BitSet();
+        for (int i = 0; i < colBlocks.length; i++) {
+            ImmutableBitSet cb = colBlocks[i];
+            if (cb.intersects(columns)) {
+                result.set(i);
+            }
+        }
+        return new ImmutableBitSet(result);
+    }
+
+    public TblColRef colRef(int i) {
+        if (colRefs == null) {
+            colRefs = new TblColRef[nColumns];
+        }
+        if (colRefs[i] == null) {
+            colRefs[i] = GTUtil.tblColRef(i, colTypes[i].toString());
+        }
+        return colRefs[i];
+    }
+
+    public void validateColRef(TblColRef ref) {
+        TblColRef expected = colRef(ref.getColumnDesc().getZeroBasedIndex());
+        if (expected.equals(ref) == false)
+            throw new IllegalArgumentException();
+    }
+
+    void validate() {
+
+        if (codeSystem == null)
+            throw new IllegalStateException();
+
+        if (primaryKey.cardinality() == 0)
+            throw new IllegalStateException();
+
+        codeSystem.init(this);
+
+        validateColumnBlocks();
+    }
+
+    private void validateColumnBlocks() {
+        colAll = new ImmutableBitSet(0, nColumns);
+
+        if (colBlocks == null) {
+            colBlocks = new ImmutableBitSet[2];
+            colBlocks[0] = primaryKey;
+            colBlocks[1] = colAll.andNot(primaryKey);
+        }
+
+        colBlocksAll = new ImmutableBitSet(0, colBlocks.length);
+
+        if (colPreferIndex == null)
+            colPreferIndex = ImmutableBitSet.EMPTY;
+
+        // column blocks must not overlap
+        for (int i = 0; i < colBlocks.length; i++) {
+            for (int j = i + 1; j < colBlocks.length; j++) {
+                if (colBlocks[i].intersects(colBlocks[j]))
+                    throw new IllegalStateException();
+            }
+        }
+
+        // column block must cover all columns
+        ImmutableBitSet merge = ImmutableBitSet.EMPTY;
+        for (int i = 0; i < colBlocks.length; i++) {
+            merge = merge.or(colBlocks[i]);
+        }
+        if (merge.equals(colAll) == false)
+            throw new IllegalStateException();
+
+        // primary key must be the first column block
+        if (primaryKey.equals(colBlocks[0]) == false)
+            throw new IllegalStateException();
+
+        // drop empty column block
+        LinkedList<ImmutableBitSet> list = new LinkedList<ImmutableBitSet>(Arrays.asList(colBlocks));
+        Iterator<ImmutableBitSet> it = list.iterator();
+        while (it.hasNext()) {
+            ImmutableBitSet cb = it.next();
+            if (cb.isEmpty())
+                it.remove();
+        }
+        colBlocks = (ImmutableBitSet[]) list.toArray(new ImmutableBitSet[list.size()]);
+    }
+
+    public static class Builder {
+        final GTInfo info;
+
+        private Builder() {
+            this.info = new GTInfo();
+        }
+
+        /** optional */
+        public Builder setTableName(String name) {
+            info.tableName = name;
+            return this;
+        }
+
+        /** required */
+        public Builder setCodeSystem(IGTCodeSystem cs) {
+            info.codeSystem = cs;
+            return this;
+        }
+
+        /** required */
+        public Builder setColumns(DataType... colTypes) {
+            info.nColumns = colTypes.length;
+            info.colTypes = colTypes;
+            return this;
+        }
+
+        /** required */
+        public Builder setPrimaryKey(ImmutableBitSet primaryKey) {
+            info.primaryKey = primaryKey;
+            return this;
+        }
+
+        /** optional */
+        public Builder enableColumnBlock(ImmutableBitSet[] columnBlocks) {
+            info.colBlocks = new ImmutableBitSet[columnBlocks.length];
+            for (int i = 0; i < columnBlocks.length; i++) {
+                info.colBlocks[i] = columnBlocks[i];
+            }
+            return this;
+        }
+
+        /** optional */
+        public Builder enableRowBlock(int rowBlockSize) {
+            info.rowBlockSize = rowBlockSize;
+            return this;
+        }
+
+        /** optional */
+        public Builder enableSharding(int nShards) {
+            info.nShards = nShards;
+            return this;
+        }
+
+        /** optional */
+        public Builder setColumnPreferIndex(ImmutableBitSet colPreferIndex) {
+            info.colPreferIndex = colPreferIndex;
+            return this;
+        }
+
+        public GTInfo build() {
+            info.validate();
+            return info;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GTInvertedIndex.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInvertedIndex.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInvertedIndex.java
new file mode 100644
index 0000000..4f99108
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInvertedIndex.java
@@ -0,0 +1,205 @@
+package org.apache.kylin.gridtable;
+
+import it.uniroma3.mat.extendedset.intset.ConciseSet;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+
+/**
+ * A thread-safe inverted index of row blocks in memory.
+ * 
+ * Note function not() must return all blocks, because index only know what block contains a value,
+ * but not sure what block does not contain a value.
+ * 
+ * @author yangli9
+ */
+public class GTInvertedIndex {
+
+    private final GTInfo info;
+    private final ImmutableBitSet colPreferIndex;
+    private final ImmutableBitSet colBlocks;
+    private final GTInvertedIndexOfColumn[] index; // for each column
+
+    private volatile int nIndexedBlocks;
+
+    public GTInvertedIndex(GTInfo info) {
+        this.info = info;
+        this.colPreferIndex = info.colPreferIndex;
+        this.colBlocks = info.selectColumnBlocks(colPreferIndex);
+
+        index = new GTInvertedIndexOfColumn[info.getColumnCount()];
+        for (int i = 0; i < colPreferIndex.trueBitCount(); i++) {
+            int c = colPreferIndex.trueBitAt(i);
+            index[c] = new GTInvertedIndexOfColumn(info.codeSystem.getComparator());
+        }
+    }
+
+    public void add(GTRowBlock block) {
+
+        @SuppressWarnings("unchecked")
+        Set<ByteArray>[] distinctValues = new Set[info.getColumnCount()];
+        for (int i = 0; i < colPreferIndex.trueBitCount(); i++) {
+            int c = colPreferIndex.trueBitAt(i);
+            distinctValues[c] = new HashSet<ByteArray>();
+        }
+
+        GTRowBlock.Reader reader = block.getReader(colBlocks);
+        GTRecord record = new GTRecord(info);
+        while (reader.hasNext()) {
+            reader.fetchNext(record);
+            for (int i = 0; i < colPreferIndex.trueBitCount(); i++) {
+                int c = colPreferIndex.trueBitAt(i);
+                distinctValues[c].add(record.get(c));
+            }
+        }
+
+        for (int i = 0; i < colPreferIndex.trueBitCount(); i++) {
+            int c = colPreferIndex.trueBitAt(i);
+            index[c].add(distinctValues[c], block.getSequenceId());
+        }
+
+        nIndexedBlocks = Math.max(nIndexedBlocks, block.seqId + 1);
+    }
+
+    public ConciseSet filter(TupleFilter filter) {
+        return filter(filter, nIndexedBlocks);
+    }
+
+    public ConciseSet filter(TupleFilter filter, int totalBlocks) {
+        // number of indexed blocks may increase as we do evaluation
+        int indexedBlocks = nIndexedBlocks;
+
+        Evaluator evaluator = new Evaluator(indexedBlocks);
+        ConciseSet r = evaluator.evaluate(filter);
+
+        // add blocks that have not been indexed
+        for (int i = indexedBlocks; i < totalBlocks; i++) {
+            r.add(i);
+        }
+
+        return r;
+    }
+
+    private class Evaluator {
+        private int indexedBlocks;
+
+        Evaluator(int indexedBlocks) {
+            this.indexedBlocks = indexedBlocks;
+        }
+
+        public ConciseSet evaluate(TupleFilter filter) {
+            if (filter == null) {
+                return all();
+            }
+
+            if (filter instanceof LogicalTupleFilter)
+                return evalLogical((LogicalTupleFilter) filter);
+
+            if (filter instanceof CompareTupleFilter)
+                return evalCompare((CompareTupleFilter) filter);
+
+            // unable to evaluate
+            return all();
+        }
+
+        @SuppressWarnings("unchecked")
+        private ConciseSet evalCompare(CompareTupleFilter filter) {
+            int col = col(filter);
+            if (index[col] == null)
+                return all();
+
+            switch (filter.getOperator()) {
+            case ISNULL:
+                return index[col].getNull();
+            case ISNOTNULL:
+                return all();
+            case EQ:
+                return index[col].getEquals((ByteArray) filter.getFirstValue());
+            case NEQ:
+                return all();
+            case IN:
+                return index[col].getIn((Iterable<ByteArray>) filter.getValues());
+            case NOTIN:
+                return all();
+            case LT:
+                return index[col].getRange(null, false, (ByteArray) filter.getFirstValue(), false);
+            case LTE:
+                return index[col].getRange(null, false, (ByteArray) filter.getFirstValue(), true);
+            case GT:
+                return index[col].getRange((ByteArray) filter.getFirstValue(), false, null, false);
+            case GTE:
+                return index[col].getRange((ByteArray) filter.getFirstValue(), true, null, false);
+            default:
+                throw new IllegalStateException("Unsupported operator " + filter.getOperator());
+            }
+        }
+
+        private ConciseSet evalLogical(LogicalTupleFilter filter) {
+            List<? extends TupleFilter> children = filter.getChildren();
+
+            switch (filter.getOperator()) {
+            case AND:
+                return evalLogicalAnd(children);
+            case OR:
+                return evalLogicalOr(children);
+            case NOT:
+                return evalLogicalNot(children);
+            default:
+                throw new IllegalStateException("Unsupported operator " + filter.getOperator());
+            }
+        }
+
+        private ConciseSet evalLogicalAnd(List<? extends TupleFilter> children) {
+            ConciseSet set = all();
+
+            for (TupleFilter c : children) {
+                ConciseSet t = evaluate(c);
+                if (t == null)
+                    continue; // because it's AND
+
+                set.retainAll(t);
+            }
+            return set;
+        }
+
+        private ConciseSet evalLogicalOr(List<? extends TupleFilter> children) {
+            ConciseSet set = new ConciseSet();
+
+            for (TupleFilter c : children) {
+                ConciseSet t = evaluate(c);
+                if (t == null)
+                    return null; // because it's OR
+
+                set.addAll(t);
+            }
+            return set;
+        }
+
+        private ConciseSet evalLogicalNot(List<? extends TupleFilter> children) {
+            return all();
+        }
+
+        private ConciseSet all() {
+            return not(new ConciseSet());
+        }
+
+        private ConciseSet not(ConciseSet set) {
+            set.add(indexedBlocks);
+            set.complement();
+            return set;
+        }
+
+        private int col(CompareTupleFilter filter) {
+            return filter.getColumn().getColumnDesc().getZeroBasedIndex();
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/core-cube/src/main/java/org/apache/kylin/gridtable/GTInvertedIndexOfColumn.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInvertedIndexOfColumn.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInvertedIndexOfColumn.java
new file mode 100644
index 0000000..4b094fd
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInvertedIndexOfColumn.java
@@ -0,0 +1,115 @@
+package org.apache.kylin.gridtable;
+
+import it.uniroma3.mat.extendedset.intset.ConciseSet;
+
+import java.util.NavigableMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.kylin.common.util.ByteArray;
+
+import com.google.common.collect.Maps;
+
+public class GTInvertedIndexOfColumn {
+
+    final private IGTComparator comparator;
+    final private ReentrantReadWriteLock rwLock;
+
+    private int nBlocks;
+    private NavigableMap<ByteArray, ConciseSet> rangeIndex;
+    private ConciseSet nullIndex;
+
+    public GTInvertedIndexOfColumn(IGTComparator comparator) {
+        this.comparator = comparator;
+        this.rwLock = new ReentrantReadWriteLock();
+        this.rangeIndex = Maps.newTreeMap(comparator);
+        this.nullIndex = new ConciseSet();
+    }
+
+    public void add(Iterable<ByteArray> codes, int blockId) {
+        rwLock.writeLock().lock();
+        try {
+            for (ByteArray code : codes) {
+                if (comparator.isNull(code)) {
+                    nullIndex.add(blockId);
+                    continue;
+                }
+                ConciseSet set = rangeIndex.get(code);
+                if (set == null) {
+                    set = new ConciseSet();
+                    rangeIndex.put(code.copy(), set);
+                }
+                set.add(blockId);
+            }
+
+            if (blockId >= nBlocks) {
+                nBlocks = blockId + 1;
+            }
+
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    public ConciseSet getNull() {
+        rwLock.readLock().lock();
+        try {
+            return nullIndex.clone();
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    public ConciseSet getEquals(ByteArray code) {
+        rwLock.readLock().lock();
+        try {
+            ConciseSet set = rangeIndex.get(code);
+            if (set == null)
+                return new ConciseSet();
+            else
+                return set.clone();
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    public ConciseSet getIn(Iterable<ByteArray> codes) {
+        rwLock.readLock().lock();
+        try {
+            ConciseSet r = new ConciseSet();
+            for (ByteArray code : codes) {
+                ConciseSet set = rangeIndex.get(code);
+                if (set != null)
+                    r.addAll(set);
+            }
+            return r;
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    public ConciseSet getRange(ByteArray from, boolean fromInclusive, ByteArray to, boolean toInclusive) {
+        rwLock.readLock().lock();
+        try {
+            ConciseSet r = new ConciseSet();
+            if (from == null && to == null) {
+                r.add(nBlocks);
+                r.complement();
+                return r;
+            }
+            NavigableMap<ByteArray, ConciseSet> subMap;
+            if (from == null) {
+                subMap = rangeIndex.headMap(to, toInclusive);
+            } else if (to == null) {
+                subMap = rangeIndex.tailMap(from, fromInclusive);
+            } else {
+                subMap = rangeIndex.subMap(from, fromInclusive, to, toInclusive);
+            }
+            for (ConciseSet set : subMap.values()) {
+                r.addAll(set);
+            }
+            return r;
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+}