You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/04/11 00:48:00 UTC

[23/34] incubator-kylin git commit: KYLIN-655, milestone, IGTScanner done, pending transform to tuple

KYLIN-655, milestone, IGTScanner done, pending transform to tuple


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/5e633b79
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/5e633b79
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/5e633b79

Branch: refs/heads/streaming-localdict
Commit: 5e633b791b7d13887e0fdbbc140f22bb467be4f7
Parents: 84183f9
Author: Li, Yang <ya...@ebay.com>
Authored: Thu Apr 9 11:10:05 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Thu Apr 9 11:10:05 2015 +0800

----------------------------------------------------------------------
 .../storage/cube/CubeHBaseReadonlyStore.java    |  48 ++++--
 .../apache/kylin/storage/cube/CubeScanner.java  | 145 +++++++++++++++----
 .../storage/gridtable/GTAggregateScanner.java   |   3 +
 .../storage/gridtable/GTFilterScanner.java      | 113 +++------------
 .../storage/gridtable/GTInvertedIndex.java      |   2 +-
 .../kylin/storage/gridtable/GTRawScanner.java   | 136 +++++++++++++++++
 .../kylin/storage/gridtable/GTRowBlock.java     |  18 ++-
 .../kylin/storage/gridtable/GridTable.java      |   6 +-
 .../gridtable/memstore/GTSimpleMemStore.java    |   2 +-
 9 files changed, 332 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5e633b79/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java
index e8d5d23..6af9b99 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java
@@ -5,6 +5,8 @@ import java.util.BitSet;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Result;
@@ -16,7 +18,7 @@ import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowValueDecoder;
+import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
 import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
 import org.apache.kylin.cube.model.HBaseMappingDesc;
@@ -25,7 +27,6 @@ import org.apache.kylin.storage.gridtable.GTRowBlock;
 import org.apache.kylin.storage.gridtable.GTRowBlock.Writer;
 import org.apache.kylin.storage.gridtable.GTScanRequest;
 import org.apache.kylin.storage.gridtable.IGTStore;
-import org.apache.kylin.storage.hbase.CubeSegmentTupleIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,18 +69,23 @@ public class CubeHBaseReadonlyStore implements IGTStore {
     }
 
     @Override
-    public IGTStoreScanner scan(ByteArray pkStart, ByteArray pkEnd, BitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
+    public IGTStoreScanner scan(ByteArray pkStart, ByteArray pkEnd, BitSet selectedColumnBlocks, GTScanRequest additionalPushDown) throws IOException {
         // TODO enable coprocessor
 
+        // primary key (also the 0th column block) is always selected
+        final BitSet selectedColBlocks = (BitSet) selectedColumnBlocks.clone();
+        selectedColBlocks.set(0);
+
         // globally shared connection, does not require close
         HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
-        
+
         final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier());
         final List<Pair<byte[], byte[]>> hbaseColumns = makeHBaseColumns(selectedColBlocks);
 
         Scan hbaseScan = buildScan(pkStart, pkEnd, hbaseColumns);
         final ResultScanner scanner = hbaseTable.getScanner(hbaseScan);
         final Iterator<Result> iterator = scanner.iterator();
+        final GTRowBlock oneBlock = new GTRowBlock(info); // avoid object creation
 
         return new IGTStoreScanner() {
 
@@ -90,12 +96,22 @@ public class CubeHBaseReadonlyStore implements IGTStore {
 
             @Override
             public GTRowBlock next() {
+                // row block is always disabled for cubes, row block contains only one record
                 Result result = iterator.next();
-                result.getRow();
-                for (Pair<byte[], byte[]> hbaseColumn : hbaseColumns) {
-                    result.getColumnLatestCell(hbaseColumn.getFirst(), hbaseColumn.getSecond());
+
+                // dimensions, set to primary key, also the 0th column block
+                byte[] rowkey = result.getRow();
+                oneBlock.getPrimaryKey().set(rowkey, RowConstants.ROWKEY_CUBOIDID_LEN, rowkey.length - RowConstants.ROWKEY_CUBOIDID_LEN);
+                oneBlock.getCellBlock(0).set(rowkey, RowConstants.ROWKEY_CUBOIDID_LEN, rowkey.length - RowConstants.ROWKEY_CUBOIDID_LEN);
+
+                // metrics
+                int hbaseColIdx = 0;
+                for (int colBlockIdx = selectedColBlocks.nextSetBit(1); colBlockIdx >= 0; colBlockIdx = selectedColBlocks.nextSetBit(colBlockIdx + 1)) {
+                    Pair<byte[], byte[]> hbaseColumn = hbaseColumns.get(hbaseColIdx++);
+                    Cell cell = result.getColumnLatestCell(hbaseColumn.getFirst(), hbaseColumn.getSecond());
+                    oneBlock.getCellBlock(colBlockIdx).set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                 }
-                return null;
+                return oneBlock;
             }
 
             @Override
@@ -123,15 +139,25 @@ public class CubeHBaseReadonlyStore implements IGTStore {
             scan.addColumn(byteFamily, byteQualifier);
         }
 
-        scan.setStartRow(pkStart.copy().array());
-        scan.setStopRow(pkEnd.copy().array());
+        scan.setStartRow(makeRowKeyToScan(pkStart));
+        scan.setStopRow(makeRowKeyToScan(pkEnd));
         return scan;
     }
 
+    private byte[] makeRowKeyToScan(ByteArray pk) {
+        if (pk == null || pk.array() == null)
+            return HConstants.EMPTY_BYTE_ARRAY; // from the very beginning, or to the end
+
+        byte[] buf = new byte[pk.length() + RowConstants.ROWKEY_CUBOIDID_LEN];
+        System.arraycopy(cuboid.getBytes(), 0, buf, 0, RowConstants.ROWKEY_CUBOIDID_LEN);
+        System.arraycopy(pk.array(), pk.offset(), buf, RowConstants.ROWKEY_CUBOIDID_LEN, pk.length());
+        return buf;
+    }
+
     private List<Pair<byte[], byte[]>> makeHBaseColumns(BitSet selectedColBlocks) {
         List<Pair<byte[], byte[]>> result = Lists.newArrayList();
 
-        int colBlockIdx = 0;
+        int colBlockIdx = 1; // start from 1; the 0th column block is primary key which maps to rowkey
         HBaseMappingDesc hbaseMapping = cubeSeg.getCubeDesc().getHbaseMapping();
         for (HBaseColumnFamilyDesc familyDesc : hbaseMapping.getColumnFamily()) {
             byte[] byteFamily = Bytes.toBytes(familyDesc.getName());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5e633b79/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java
index a8a0547..ea5526d 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeScanner.java
@@ -5,6 +5,7 @@ import java.util.BitSet;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.Set;
 
 import org.apache.kylin.cube.CubeSegment;
@@ -15,6 +16,7 @@ import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.gridtable.GTInfo;
+import org.apache.kylin.storage.gridtable.GTRawScanner;
 import org.apache.kylin.storage.gridtable.GTRecord;
 import org.apache.kylin.storage.gridtable.GTScanRange;
 import org.apache.kylin.storage.gridtable.GTScanRangePlanner;
@@ -24,33 +26,38 @@ import org.apache.kylin.storage.gridtable.IGTScanner;
 
 import com.google.common.collect.Lists;
 
-public class CubeScanner {
+public class CubeScanner implements IGTScanner {
 
     private static final int MAX_SCAN_RANGES = 200;
-    
+
     final GTInfo info;
+    final CubeHBaseReadonlyStore store;
     final List<GTScanRequest> scanRequests;
+    final Scanner scanner;
 
     public CubeScanner(CubeSegment cubeSeg, Set<TblColRef> dimensions, Set<TblColRef> groups, //
             Collection<FunctionDesc> metrics, TupleFilter filter, StorageContext context) {
         Cuboid cuboid = context.getCuboid();
         info = CubeGridTable.newGTInfo(cubeSeg, cuboid.getId());
-        
+        store = new CubeHBaseReadonlyStore(info, cubeSeg, cuboid);
+
         TupleFilter gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, cuboid.getColumns(), groups);
         BitSet gtDimensions = makeGridTableColumns(cuboid, dimensions);
         BitSet gtAggrGroups = makeGridTableColumns(cuboid, groups);
         BitSet gtAggrMetrics = makeGridTableColumns(cubeSeg.getCubeDesc(), cuboid, metrics);
         String[] gtAggrFuncs = makeAggrFuncs(metrics);
-        
+
         GTScanRangePlanner scanRangePlanner = new GTScanRangePlanner(info);
         List<GTScanRange> scanRanges = scanRangePlanner.planScanRanges(gtFilter, MAX_SCAN_RANGES);
-        
+
         scanRequests = Lists.newArrayListWithCapacity(scanRanges.size());
         for (GTScanRange range : scanRanges) {
             scanRequests.add(new GTScanRequest(info, range, gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter));
         }
+        
+        scanner = new Scanner();
     }
-    
+
     private BitSet makeGridTableColumns(Cuboid cuboid, Set<TblColRef> dimensions) {
         BitSet result = new BitSet();
         List<TblColRef> dimCols = cuboid.getColumns();
@@ -69,7 +76,7 @@ public class CubeScanner {
             int index = cubeDesc.getMeasures().indexOf(metric);
             if (index < 0)
                 throw new IllegalStateException(metric + " not found in " + cubeDesc);
-            
+
             result.set(metricsIndexStart + index);
         }
         return result;
@@ -84,44 +91,118 @@ public class CubeScanner {
         return result;
     }
 
-    public IGTScanner scan() {
-        return new Scanner();
+    @Override
+    public Iterator<GTRecord> iterator() {
+        return scanner.iterator();
     }
-    
-    private class Scanner implements IGTScanner {
-        int curRequestIndex = 0;
-        
-        // TODO hbase metrics
-        int scannedRowCount = 0;
-        int scannedRowBlockCount = 0;
 
-        @Override
+    @Override
+    public void close() throws IOException {
+        scanner.close();
+    }
+
+    @Override
+    public GTInfo getInfo() {
+        return info;
+    }
+
+    @Override
+    public int getScannedRowCount() {
+        return scanner.getScannedRowCount();
+    }
+
+    @Override
+    public int getScannedRowBlockCount() {
+        return scanner.getScannedRowBlockCount();
+    }
+
+    private class Scanner {
+        final IGTScanner[] inputScanners = new IGTScanner[scanRequests.size()];
+        int cur = 0;
+        Iterator<GTRecord> curIterator = null;
+        GTRecord next = null;
+
         public Iterator<GTRecord> iterator() {
-            // TODO Auto-generated method stub
-            return null;
-        }
+            return new Iterator<GTRecord>() {
 
-        @Override
-        public void close() throws IOException {
-            // TODO Auto-generated method stub
-            
+                @Override
+                public boolean hasNext() {
+                    if (next != null)
+                        return true;
+                    
+                    if (curIterator == null) {
+                        if (cur >= scanRequests.size())
+                            return false;
+
+                        try {
+                            inputScanners[cur] = new GTRawScanner(info, store, scanRequests.get(cur));
+                            curIterator = inputScanners[cur].iterator();
+                        } catch (IOException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                    
+                    if (curIterator.hasNext() == false) {
+                        curIterator = null;
+                        cur++;
+                        return hasNext();
+                    }
+                    
+                    next = curIterator.next();
+                    return true;
+                }
+
+                @Override
+                public GTRecord next() {
+                    // fetch next record
+                    if (next == null) {
+                        hasNext();
+                        if (next == null)
+                            throw new NoSuchElementException();
+                    }
+
+                    GTRecord result = next;
+                    next = null;
+                    return result;
+                }
+
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            };
         }
 
-        @Override
-        public GTInfo getInfo() {
-            return info;
+        public void close() throws IOException {
+            for (int i = 0; i < inputScanners.length; i++) {
+                if (inputScanners[i] != null) {
+                    inputScanners[i].close();
+                }
+            }
         }
 
-        @Override
         public int getScannedRowCount() {
-            return scannedRowCount;
+            int result = 0;
+            for (int i = 0; i < inputScanners.length; i++) {
+                if (inputScanners[i] == null)
+                    break;
+                
+                result += inputScanners[i].getScannedRowCount();
+            }
+            return result;
         }
 
-        @Override
         public int getScannedRowBlockCount() {
-            return scannedRowBlockCount;
+            int result = 0;
+            for (int i = 0; i < inputScanners.length; i++) {
+                if (inputScanners[i] == null)
+                    break;
+                
+                result += inputScanners[i].getScannedRowBlockCount();
+            }
+            return result;
         }
-        
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5e633b79/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
index 7db8eed..14a3efa 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
@@ -21,6 +21,9 @@ public class GTAggregateScanner implements IGTScanner {
     final IGTScanner inputScanner;
 
     public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req) {
+        if (req.hasAggregation() == false)
+            throw new IllegalStateException();
+        
         this.info = inputScanner.getInfo();
         this.dimensions = (BitSet) req.getColumns().clone();
         this.dimensions.andNot(req.getAggrMetrics());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5e633b79/storage/src/main/java/org/apache/kylin/storage/gridtable/GTFilterScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTFilterScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTFilterScanner.java
index 63090e6..ebb6ce9 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTFilterScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTFilterScanner.java
@@ -1,7 +1,6 @@
 package org.apache.kylin.storage.gridtable;
 
 import java.io.IOException;
-import java.util.BitSet;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
@@ -10,123 +9,73 @@ import org.apache.kylin.metadata.filter.IFilterCodeSystem;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
-import org.apache.kylin.storage.gridtable.IGTStore.IGTStoreScanner;
 
 public class GTFilterScanner implements IGTScanner {
 
-    final GTInfo info;
-    final IGTStoreScanner storeScanner;
-    final TupleFilter filter;
-    final BitSet selectedColBlocks;
+    final private IGTScanner inputScanner;
+    final private TupleFilter filter;
+    final private IEvaluatableTuple oneTuple; // avoid instance creation
+    
+    private GTRecord next = null;
 
-    private GTRowBlock.Reader curBlockReader;
-    private GTRecord next;
-    final private GTRecord oneRecord; // avoid instance creation
-    final private TupleAdapter oneTuple; // avoid instance creation
-
-    private int scannedRowCount = 0;
-    private int scannedRowBlockCount = 0;
-
-    public GTFilterScanner(GTInfo info, IGTStore store, GTScanRequest req) throws IOException {
-        this.info = info;
+    public GTFilterScanner(IGTScanner inputScanner, GTScanRequest req) throws IOException {
+        this.inputScanner = inputScanner;
         this.filter = req.getFilterPushDown();
+        this.oneTuple = new IEvaluatableTuple() {
+            @Override
+            public Object getValue(TblColRef col) {
+                return next.get(col.getColumn().getZeroBasedIndex());
+            }
+        };
 
         if (TupleFilter.isEvaluableRecursively(filter) == false)
             throw new IllegalArgumentException();
-
-        ByteArray start = makeScanKey(req.getPkStart());
-        ByteArray end = makeScanKey(req.getPkEnd());
-        this.selectedColBlocks = info.selectColumnBlocks(req.getColumns());
-
-        this.storeScanner = store.scan(start, end, selectedColBlocks, req);
-        this.oneRecord = new GTRecord(info);
-        this.oneTuple = new TupleAdapter(oneRecord);
-    }
-
-    private ByteArray makeScanKey(GTRecord rec) {
-        int firstPKCol = info.primaryKey.nextSetBit(0);
-        if (rec == null || rec.cols[firstPKCol].array() == null)
-            return null;
-
-        BitSet selectedColumns = new BitSet();
-        int len = 0;
-        for (int i = info.primaryKey.nextSetBit(0); i >= 0; i = info.primaryKey.nextSetBit(i + 1)) {
-            if (rec.cols[i].array() == null) {
-                break;
-            }
-            selectedColumns.set(i);
-            len += rec.cols[i].length();
-        }
-
-        ByteArray buf = ByteArray.allocate(len);
-        rec.exportColumns(selectedColumns, buf);
-        return buf;
     }
 
     @Override
     public GTInfo getInfo() {
-        return info;
+        return inputScanner.getInfo();
     }
 
     @Override
     public int getScannedRowCount() {
-        return scannedRowCount;
+        return inputScanner.getScannedRowCount();
     }
 
     @Override
     public int getScannedRowBlockCount() {
-        return scannedRowBlockCount;
+        return inputScanner.getScannedRowBlockCount();
     }
 
     @Override
     public void close() throws IOException {
-        storeScanner.close();
+        inputScanner.close();
     }
 
     @Override
     public Iterator<GTRecord> iterator() {
         return new Iterator<GTRecord>() {
+            
+            private Iterator<GTRecord> inputIterator = inputScanner.iterator();
 
             @Override
             public boolean hasNext() {
                 if (next != null)
                     return true;
 
-                IFilterCodeSystem<ByteArray> filterCodeSystem = info.codeSystem.getFilterCodeSystem();
+                IFilterCodeSystem<ByteArray> filterCodeSystem = getInfo().codeSystem.getFilterCodeSystem();
 
-                while (fetchNext()) {
+                while (inputIterator.hasNext()) {
+                    next = inputIterator.next();
                     if (filter != null && filter.evaluate(oneTuple, filterCodeSystem) == false) {
                         continue;
                     }
-                    next = oneRecord;
                     return true;
                 }
+                next = null;
                 return false;
             }
 
-            private boolean fetchNext() {
-                while (true) {
-                    // get a block
-                    if (curBlockReader == null) {
-                        if (storeScanner.hasNext()) {
-                            curBlockReader = storeScanner.next().getReader(selectedColBlocks);
-                            scannedRowBlockCount++;
-                        } else {
-                            return false;
-                        }
-                    }
-                    // if block exhausted, try next block
-                    if (curBlockReader.hasNext() == false) {
-                        curBlockReader = null;
-                        continue;
-                    }
-                    // fetch a row
-                    curBlockReader.fetchNext(oneRecord);
-                    scannedRowCount++;
-                    return true;
-                }
-            }
-
             @Override
             public GTRecord next() {
                 // fetch next record
@@ -148,20 +97,4 @@ public class GTFilterScanner implements IGTScanner {
 
         };
     }
-
-    private static class TupleAdapter implements IEvaluatableTuple {
-
-        private GTRecord r;
-
-        private TupleAdapter(GTRecord r) {
-            this.r = r;
-        }
-
-        @Override
-        public Object getValue(TblColRef col) {
-            return r.get(col.getColumn().getZeroBasedIndex());
-        }
-
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5e633b79/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java
index 3c8d862..449b174 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java
@@ -58,7 +58,7 @@ public class GTInvertedIndex {
         }
 
         for (int i = colPreferIndex.nextSetBit(0); i >= 0; i = colPreferIndex.nextSetBit(i + 1)) {
-            index[i].add(distinctValues[i], block.sequenceId());
+            index[i].add(distinctValues[i], block.getSequenceId());
         }
 
         nIndexedBlocks = Math.max(nIndexedBlocks, block.seqId + 1);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5e633b79/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRawScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRawScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRawScanner.java
new file mode 100644
index 0000000..895ccf3
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRawScanner.java
@@ -0,0 +1,136 @@
+package org.apache.kylin.storage.gridtable;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.storage.gridtable.IGTStore.IGTStoreScanner;
+
+public class GTRawScanner implements IGTScanner {
+
+    final GTInfo info;
+    final IGTStoreScanner storeScanner;
+    final BitSet selectedColBlocks;
+
+    private GTRowBlock.Reader curBlockReader;
+    private GTRecord next;
+    final private GTRecord oneRecord; // avoid instance creation
+
+    private int scannedRowCount = 0;
+    private int scannedRowBlockCount = 0;
+
+    public GTRawScanner(GTInfo info, IGTStore store, GTScanRequest req) throws IOException {
+        this.info = info;
+
+        ByteArray start = makeScanKey(req.getPkStart());
+        ByteArray end = makeScanKey(req.getPkEnd());
+        this.selectedColBlocks = info.selectColumnBlocks(req.getColumns());
+
+        this.storeScanner = store.scan(start, end, selectedColBlocks, req);
+        this.oneRecord = new GTRecord(info);
+    }
+
+    private ByteArray makeScanKey(GTRecord rec) {
+        int firstPKCol = info.primaryKey.nextSetBit(0);
+        if (rec == null || rec.cols[firstPKCol].array() == null)
+            return null;
+
+        BitSet selectedColumns = new BitSet();
+        int len = 0;
+        for (int i = info.primaryKey.nextSetBit(0); i >= 0; i = info.primaryKey.nextSetBit(i + 1)) {
+            if (rec.cols[i].array() == null) {
+                break;
+            }
+            selectedColumns.set(i);
+            len += rec.cols[i].length();
+        }
+
+        ByteArray buf = ByteArray.allocate(len);
+        rec.exportColumns(selectedColumns, buf);
+        return buf;
+    }
+
+    @Override
+    public GTInfo getInfo() {
+        return info;
+    }
+
+    @Override
+    public int getScannedRowCount() {
+        return scannedRowCount;
+    }
+
+    @Override
+    public int getScannedRowBlockCount() {
+        return scannedRowBlockCount;
+    }
+
+    @Override
+    public void close() throws IOException {
+        storeScanner.close();
+    }
+
+    @Override
+    public Iterator<GTRecord> iterator() {
+        return new Iterator<GTRecord>() {
+
+            @Override
+            public boolean hasNext() {
+                if (next != null)
+                    return true;
+
+                if (fetchOneRecord()) {
+                    next = oneRecord;
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+
+            private boolean fetchOneRecord() {
+                while (true) {
+                    // get a block
+                    if (curBlockReader == null) {
+                        if (storeScanner.hasNext()) {
+                            curBlockReader = storeScanner.next().getReader(selectedColBlocks);
+                            scannedRowBlockCount++;
+                        } else {
+                            return false;
+                        }
+                    }
+                    // if block exhausted, try next block
+                    if (curBlockReader.hasNext() == false) {
+                        curBlockReader = null;
+                        continue;
+                    }
+                    // fetch a row
+                    curBlockReader.fetchNext(oneRecord);
+                    scannedRowCount++;
+                    return true;
+                }
+            }
+
+            @Override
+            public GTRecord next() {
+                // fetch next record
+                if (next == null) {
+                    hasNext();
+                    if (next == null)
+                        throw new NoSuchElementException();
+                }
+
+                GTRecord result = next;
+                next = null;
+                return result;
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5e633b79/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
index 62da0b4..4732142 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
@@ -32,17 +32,25 @@ public class GTRowBlock {
     /** create a row block that has no underlying space */
     public GTRowBlock(GTInfo info) {
         this.info = info;
-        primaryKey = new ByteArray();
-        cellBlocks = new ByteArray[info.colBlocks.length];
-        for (int i = 0; i < cellBlocks.length; i++) {
-            cellBlocks[i] = new ByteArray();
+        this.primaryKey = new ByteArray();
+        this.cellBlocks = new ByteArray[info.colBlocks.length];
+        for (int i = 0; i < this.cellBlocks.length; i++) {
+            this.cellBlocks[i] = new ByteArray();
         }
     }
     
-    public int sequenceId() {
+    public int getSequenceId() {
         return seqId;
     }
     
+    public ByteArray getPrimaryKey() {
+        return primaryKey;
+    }
+    
+    public ByteArray getCellBlock(int i) {
+        return cellBlocks[i];
+    }
+    
     public Writer getWriter() {
         return new Writer();
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5e633b79/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java
index f2f775b..20b543a 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GridTable.java
@@ -32,7 +32,11 @@ public class GridTable {
     }
 
     public IGTScanner scan(GTScanRequest req) throws IOException {
-        IGTScanner result = new GTFilterScanner(info, store, req);
+        IGTScanner result = new GTRawScanner(info, store, req);
+        
+        if (req.hasFilterPushDown()) {
+            result = new GTFilterScanner(result, req);
+        }
         if (req.hasAggregation()) {
             result = new GTAggregateScanner(result, req);
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5e633b79/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
index ba92f8d..32c7f36 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
@@ -58,7 +58,7 @@ public class GTSimpleMemStore implements IGTStore {
         @Override
         public void write(GTRowBlock block) throws IOException {
             GTRowBlock copy = block.copy();
-            int id = block.sequenceId();
+            int id = block.getSequenceId();
             if (id < rowBlockList.size()) {
                 rowBlockList.set(id, copy);
             } else {