You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/28 01:05:01 UTC

[24/50] incubator-kylin git commit: KYLIN-625, refactor interface to use GTScanRange

KYLIN-625, refactor interface to use GTScanRange


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d1369339
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d1369339
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d1369339

Branch: refs/heads/streaming-localdict
Commit: d1369339d458dfc974de3f63ef3d7c496e910c8a
Parents: b38206d
Author: Li, Yang <ya...@ebay.com>
Authored: Fri Mar 27 13:25:47 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Mar 27 13:25:47 2015 +0800

----------------------------------------------------------------------
 .../job/hadoop/cubev2/InMemCuboidMapper.java    |   2 +-
 .../kylin/storage/gridtable/GTScanRange.java    |  61 +++
 .../storage/gridtable/GTScanRangePlanner.java   | 474 +++++++++++++++++++
 .../kylin/storage/gridtable/GTScanRequest.java  |  22 +-
 .../kylin/storage/gridtable/GridTableTest.java  |   2 +-
 5 files changed, 548 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1369339/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
index 5a3565a..ebc65a1 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
@@ -164,7 +164,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, Tex
             System.arraycopy(Bytes.toBytes(cuboidId), 0, keyBuf, 0, Bytes.toBytes(cuboidId).length);
 
             GridTable gt = cuboidsMap.get(cuboidId);
-            GTScanRequest req = new GTScanRequest(gt.getInfo(), null, null, null, null);
+            GTScanRequest req = new GTScanRequest(gt.getInfo(), null, null, null);
             IGTScanner scanner = gt.scan(req);
             int offSet = 0;
             for (GTRecord record : scanner) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1369339/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRange.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRange.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRange.java
new file mode 100644
index 0000000..08513f7
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRange.java
@@ -0,0 +1,61 @@
+package org.apache.kylin.storage.gridtable;
+
+import java.util.Collections;
+import java.util.List;
+
+public class GTScanRange {
+
+    final public GTRecord pkStart; // inclusive
+    final public GTRecord pkEnd; // inclusive
+    final public List<GTRecord> hbaseFuzzyKeys; // partial matching primary keys
+
+    public GTScanRange(GTRecord pkStart, GTRecord pkEnd) {
+        this(pkStart, pkEnd, null);
+    }
+
+    public GTScanRange(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> hbaseFuzzyKeys) {
+        assert pkStart.info == pkEnd.info;
+        assert pkStart.maskForEqualHashComp() == pkStart.info.primaryKey;
+        assert pkEnd.maskForEqualHashComp() == pkEnd.info.primaryKey;
+        this.pkStart = pkStart;
+        this.pkEnd = pkEnd;
+        this.hbaseFuzzyKeys = hbaseFuzzyKeys == null ? Collections.<GTRecord>emptyList() : hbaseFuzzyKeys;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((hbaseFuzzyKeys == null) ? 0 : hbaseFuzzyKeys.hashCode());
+        result = prime * result + ((pkEnd == null) ? 0 : pkEnd.hashCode());
+        result = prime * result + ((pkStart == null) ? 0 : pkStart.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        GTScanRange other = (GTScanRange) obj;
+        if (hbaseFuzzyKeys == null) {
+            if (other.hbaseFuzzyKeys != null)
+                return false;
+        } else if (!hbaseFuzzyKeys.equals(other.hbaseFuzzyKeys))
+            return false;
+        if (pkEnd == null) {
+            if (other.pkEnd != null)
+                return false;
+        } else if (!pkEnd.equals(other.pkEnd))
+            return false;
+        if (pkStart == null) {
+            if (other.pkStart != null)
+                return false;
+        } else if (!pkStart.equals(other.pkStart))
+            return false;
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1369339/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRangePlanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRangePlanner.java
new file mode 100644
index 0000000..cc58253
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRangePlanner.java
@@ -0,0 +1,474 @@
+package org.apache.kylin.storage.gridtable;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.IFilterCodeSystem;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class GTScanRangePlanner {
+
+    private static final int MAX_HBASE_FUZZY_KEYS = 100;
+    
+    final private GTInfo info;
+    final private ComparatorEx<ByteArray> byteUnknownIsSmaller;
+    final private ComparatorEx<ByteArray> byteUnknownIsBigger;
+    final private ComparatorEx<GTRecord> recordUnknownIsSmaller;
+    final private ComparatorEx<GTRecord> recordUnknownIsBigger;
+
+    public GTScanRangePlanner(GTInfo info) {
+        this.info = info;
+        
+        IFilterCodeSystem<ByteArray> cs = info.codeSystem.getFilterCodeSystem();
+        this.byteUnknownIsSmaller = byteComparatorTreatsUnknownSmaller(cs);
+        this.byteUnknownIsBigger = byteComparatorTreatsUnknownBigger(cs);
+        this.recordUnknownIsSmaller = recordComparatorTreatsUnknownSmaller(cs);
+        this.recordUnknownIsBigger = recordComparatorTreatsUnknownBigger(cs);
+    }
+
+    public List<GTScanRange> planScanRanges(TupleFilter filter, int maxRanges) {
+
+        TupleFilter flatFilter = flattenToOrAndFilter(filter);
+
+        List<Collection<ColumnRange>> orAndDimRanges = translateToOrAndDimRanges(flatFilter);
+
+        List<GTScanRange> scanRanges = Lists.newArrayListWithCapacity(orAndDimRanges.size());
+        for (Collection<ColumnRange> andDimRanges : orAndDimRanges) {
+            GTScanRange scanRange = newScanRange(andDimRanges);
+            scanRanges.add(scanRange);
+        }
+
+        List<GTScanRange> mergedRanges = mergeOverlapRanges(scanRanges);
+        mergedRanges = mergeTooManyRanges(mergedRanges, maxRanges);
+
+        return mergedRanges;
+    }
+
+    private GTScanRange newScanRange(Collection<ColumnRange> andDimRanges) {
+        GTRecord pkStart = new GTRecord(info);
+        GTRecord pkEnd = new GTRecord(info);
+        List<GTRecord> hbaseFuzzyKeys = Lists.newArrayList();
+
+        for (ColumnRange range : andDimRanges) {
+            int col = range.column.getColumn().getZeroBasedIndex();
+            if (info.primaryKey.get(col) == false)
+                continue;
+
+            pkStart.set(col, range.begin);
+            pkEnd.set(col, range.end);
+
+            BitSet fuzzyMask = new BitSet();
+            fuzzyMask.set(col);
+            for (ByteArray v : range.equals) {
+                GTRecord fuzzy = new GTRecord(info);
+                fuzzy.set(col, v);
+                fuzzy.maskForEqualHashComp(fuzzyMask);
+                hbaseFuzzyKeys.add(fuzzy);
+            }
+        }
+
+        pkStart.maskForEqualHashComp(info.primaryKey);
+        pkEnd.maskForEqualHashComp(info.primaryKey);
+        return new GTScanRange(pkStart, pkEnd, hbaseFuzzyKeys);
+    }
+
+    private TupleFilter flattenToOrAndFilter(TupleFilter filter) {
+        if (filter == null)
+            return null;
+
+        TupleFilter flatFilter = filter.flatFilter();
+
+        // normalize to OR-AND filter
+        if (flatFilter.getOperator() == FilterOperatorEnum.AND) {
+            LogicalTupleFilter f = new LogicalTupleFilter(FilterOperatorEnum.OR);
+            f.addChild(flatFilter);
+            flatFilter = f;
+        }
+
+        if (flatFilter.getOperator() != FilterOperatorEnum.OR)
+            throw new IllegalStateException();
+
+        return flatFilter;
+    }
+
+    private List<Collection<ColumnRange>> translateToOrAndDimRanges(TupleFilter flatFilter) {
+        List<Collection<ColumnRange>> result = Lists.newArrayList();
+
+        if (flatFilter == null) {
+            result.add(Collections.<ColumnRange> emptyList());
+            return result;
+        }
+
+        for (TupleFilter andFilter : flatFilter.getChildren()) {
+            if (andFilter.getOperator() != FilterOperatorEnum.AND)
+                throw new IllegalStateException("Filter should be AND instead of " + andFilter);
+
+            Collection<ColumnRange> andRanges = translateToAndDimRanges(andFilter.getChildren());
+            result.add(andRanges);
+        }
+
+        return preprocessConstantConditions(result);
+    }
+
+    private Collection<ColumnRange> translateToAndDimRanges(List<? extends TupleFilter> andFilters) {
+        Map<TblColRef, ColumnRange> rangeMap = new HashMap<TblColRef, ColumnRange>();
+        for (TupleFilter filter : andFilters) {
+            if ((filter instanceof CompareTupleFilter) == false) {
+                continue;
+            }
+
+            CompareTupleFilter comp = (CompareTupleFilter) filter;
+            if (comp.getColumn() == null) {
+                continue;
+            }
+
+            @SuppressWarnings("unchecked")
+            ColumnRange newRange = new ColumnRange(comp.getColumn(), (Set<ByteArray>) comp.getValues(), comp.getOperator());
+            ColumnRange existing = rangeMap.get(newRange.column);
+            if (existing == null) {
+                rangeMap.put(newRange.column, newRange);
+            } else {
+                existing.andMerge(newRange);
+            }
+        }
+        return rangeMap.values();
+    }
+
+    private List<Collection<ColumnRange>> preprocessConstantConditions(List<Collection<ColumnRange>> orAndRanges) {
+        boolean globalAlwaysTrue = false;
+        Iterator<Collection<ColumnRange>> iterator = orAndRanges.iterator();
+        while (iterator.hasNext()) {
+            Collection<ColumnRange> andRanges = iterator.next();
+            Iterator<ColumnRange> iterator2 = andRanges.iterator();
+            boolean hasAlwaysFalse = false;
+            while (iterator2.hasNext()) {
+                ColumnRange range = iterator2.next();
+                if (range.satisfyAll())
+                    iterator2.remove();
+                else if (range.satisfyNone())
+                    hasAlwaysFalse = true;
+            }
+            if (hasAlwaysFalse) {
+                iterator.remove();
+            } else if (andRanges.isEmpty()) {
+                globalAlwaysTrue = true;
+                break;
+            }
+        }
+        if (globalAlwaysTrue) {
+            orAndRanges.clear();
+            orAndRanges.add(Collections.<ColumnRange> emptyList());
+        }
+        return orAndRanges;
+    }
+
+    private List<GTScanRange> mergeOverlapRanges(List<GTScanRange> ranges) {
+        if (ranges.size() <= 1) {
+            return ranges;
+        }
+
+        // sort ranges by start key
+        Collections.sort(ranges, new Comparator<GTScanRange>() {
+            @Override
+            public int compare(GTScanRange a, GTScanRange b) {
+                return recordUnknownIsSmaller.compare(a.pkStart, b.pkStart);
+            }
+        });
+
+        // merge the overlap range
+        List<GTScanRange> mergedRanges = new ArrayList<GTScanRange>();
+        int mergeBeginIndex = 0;
+        GTRecord mergeEnd = ranges.get(0).pkEnd;
+        for (int index = 0; index < ranges.size(); index++) {
+            GTScanRange range = ranges.get(index);
+            
+            // if overlap, swallow it
+            if (recordUnknownIsSmaller.min(range.pkStart, mergeEnd) == range.pkStart //
+                    || recordUnknownIsBigger.max(mergeEnd, range.pkStart) == mergeEnd) {
+                mergeEnd = recordUnknownIsBigger.max(mergeEnd, range.pkEnd);
+                continue;
+            }
+            
+            // not overlap, split here
+            GTScanRange mergedRange = mergeKeyRange(ranges.subList(mergeBeginIndex, index));
+            mergedRanges.add(mergedRange);
+            
+            // start new split
+            mergeBeginIndex = index;
+            mergeEnd = recordUnknownIsBigger.max(mergeEnd, range.pkEnd);
+        }
+        
+        // don't miss the last range
+        GTScanRange mergedRange = mergeKeyRange(ranges.subList(mergeBeginIndex, ranges.size()));
+        mergedRanges.add(mergedRange);
+        
+        return mergedRanges;
+    }
+
+    private GTScanRange mergeKeyRange(List<GTScanRange> ranges) {
+        GTScanRange first = ranges.get(0);
+        if (ranges.size() == 1)
+            return first;
+
+        GTRecord start = first.pkStart;
+        GTRecord end = first.pkEnd;
+        List<GTRecord> newFuzzyKeys = new ArrayList<GTRecord>();
+
+        boolean hasNonFuzzyRange = false;
+        for (GTScanRange range : ranges) {
+            hasNonFuzzyRange = hasNonFuzzyRange || range.hbaseFuzzyKeys.isEmpty();
+            newFuzzyKeys.addAll(range.hbaseFuzzyKeys);
+            end = recordUnknownIsBigger.max(end, range.pkEnd);
+        }
+
+        // if any range is non-fuzzy, then all fuzzy keys must be cleared
+        // also too many fuzzy keys will slow down HBase scan
+        if (hasNonFuzzyRange || newFuzzyKeys.size() > MAX_HBASE_FUZZY_KEYS) {
+            newFuzzyKeys.clear();
+        }
+
+        return new GTScanRange(start, end, newFuzzyKeys);
+    }
+
+    private List<GTScanRange> mergeTooManyRanges(List<GTScanRange> ranges, int maxRanges) {
+        if (ranges.size() < maxRanges) {
+            return ranges;
+        }
+        
+        // TODO: check the distance between range and merge the large distance range
+        List<GTScanRange> result = new ArrayList<GTScanRange>(1);
+        GTScanRange mergedRange = mergeKeyRange(ranges);
+        result.add(mergedRange);
+        return result;
+    }
+
+    private class ColumnRange {
+        private TblColRef column;
+        private ByteArray begin = new ByteArray();
+        private ByteArray end = new ByteArray();
+        private Set<ByteArray> equals;
+
+        public ColumnRange(TblColRef column, Set<ByteArray> values, FilterOperatorEnum op) {
+            this.column = column;
+
+            switch (op) {
+            case EQ:
+            case IN:
+                equals = new HashSet<ByteArray>(values);
+                refreshBeginEndFromEquals();
+                break;
+            case LT:
+            case LTE:
+                end = byteUnknownIsBigger.max(values);
+                break;
+            case GT:
+            case GTE:
+                begin = byteUnknownIsSmaller.min(values);
+                break;
+            case NEQ:
+            case NOTIN:
+            case ISNULL:
+            case ISNOTNULL:
+                // let Optiq filter it!
+                break;
+            default:
+                throw new UnsupportedOperationException(op.name());
+            }
+        }
+
+        void copy(TblColRef column, ByteArray beginValue, ByteArray endValue, Set<ByteArray> equalValues) {
+            this.column = column;
+            this.begin = beginValue;
+            this.end = endValue;
+            this.equals = equalValues;
+        }
+
+        private void refreshBeginEndFromEquals() {
+            this.begin = byteUnknownIsSmaller.min(this.equals);
+            this.end = byteUnknownIsBigger.max(this.equals);
+        }
+
+        public boolean satisfyAll() {
+            return begin.array() == null && end.array() == null; // the NEQ case
+        }
+
+        public boolean satisfyNone() {
+            if (equals != null) {
+                return equals.isEmpty();
+            } else if (begin.array() != null && end.array() != null) {
+                return info.codeSystem.getFilterCodeSystem().compare(begin, end) > 0;
+            } else {
+                return false;
+            }
+        }
+
+        public void andMerge(ColumnRange another) {
+            assert this.column.equals(another.column);
+
+            if (another.satisfyAll()) {
+                return;
+            }
+
+            if (this.satisfyAll()) {
+                copy(another.column, another.begin, another.end, another.equals);
+                return;
+            }
+
+            if (this.equals != null && another.equals != null) {
+                this.equals.retainAll(another.equals);
+                refreshBeginEndFromEquals();
+                return;
+            }
+
+            if (this.equals != null) {
+                this.equals = filter(this.equals, another.begin, another.end);
+                refreshBeginEndFromEquals();
+                return;
+            }
+
+            if (another.equals != null) {
+                this.equals = filter(another.equals, this.begin, this.end);
+                refreshBeginEndFromEquals();
+                return;
+            }
+
+            this.begin = byteUnknownIsSmaller.min(this.begin, another.begin);
+            this.end = byteUnknownIsBigger.max(this.end, another.end);
+        }
+
+        private Set<ByteArray> filter(Set<ByteArray> equalValues, ByteArray beginValue, ByteArray endValue) {
+            Set<ByteArray> result = Sets.newHashSetWithExpectedSize(equalValues.size());
+            for (ByteArray v : equalValues) {
+                if (byteUnknownIsSmaller.compare(beginValue, v) <= 0 && byteUnknownIsBigger.compare(v, endValue) <= 0) {
+                    result.add(v);
+                }
+            }
+            return equalValues;
+        }
+
+        public String toString() {
+            if (equals == null) {
+                return column.getName() + " between " + begin + " and " + end;
+            } else {
+                return column.getName() + " in " + equals;
+            }
+        }
+    }
+
+    public static abstract class ComparatorEx<T> implements Comparator<T> {
+
+        public T min(Collection<T> v) {
+            if (v.size() < 0) {
+                return null;
+            }
+
+            Iterator<T> iterator = v.iterator();
+            T min = iterator.next();
+            while (iterator.hasNext()) {
+                min = min(min, iterator.next());
+            }
+            return min;
+        }
+
+        public T max(Collection<T> v) {
+            if (v.size() < 0) {
+                return null;
+            }
+
+            Iterator<T> iterator = v.iterator();
+            T max = iterator.next();
+            while (iterator.hasNext()) {
+                max = max(max, iterator.next());
+            }
+            return max;
+        }
+
+        public T min(T a, T b) {
+            return compare(a, b) <= 0 ? a : b;
+        }
+
+        public T max(T a, T b) {
+            return compare(a, b) >= 0 ? a : b;
+        }
+
+        public boolean between(T v, T start, T end) {
+            return compare(start, v) <= 0 && compare(v, end) <= 0;
+        }
+    }
+
+    public static ComparatorEx<ByteArray> byteComparatorTreatsUnknownSmaller(final IFilterCodeSystem<ByteArray> cs) {
+        return new ComparatorEx<ByteArray>() {
+            @Override
+            public int compare(ByteArray a, ByteArray b) {
+                if (a.array() == null)
+                    return -1;
+                else if (b.array() == null)
+                    return 1;
+                else
+                    return cs.compare(a, b);
+            }
+        };
+    }
+
+    public static ComparatorEx<ByteArray> byteComparatorTreatsUnknownBigger(final IFilterCodeSystem<ByteArray> cs) {
+        return new ComparatorEx<ByteArray>() {
+            @Override
+            public int compare(ByteArray a, ByteArray b) {
+                if (a.array() == null)
+                    return 1;
+                else if (b.array() == null)
+                    return -1;
+                else
+                    return cs.compare(a, b);
+            }
+        };
+    }
+
+    public static ComparatorEx<GTRecord> recordComparatorTreatsUnknownSmaller(IFilterCodeSystem<ByteArray> cs) {
+        return new RecordComparator(byteComparatorTreatsUnknownSmaller(cs));
+    }
+
+    public static ComparatorEx<GTRecord> recordComparatorTreatsUnknownBigger(IFilterCodeSystem<ByteArray> cs) {
+        return new RecordComparator(byteComparatorTreatsUnknownBigger(cs));
+    }
+
+    private static class RecordComparator extends ComparatorEx<GTRecord> {
+        final ComparatorEx<ByteArray> comparator;
+
+        RecordComparator(ComparatorEx<ByteArray> byteComparator) {
+            this.comparator = byteComparator;
+        }
+
+        @Override
+        public int compare(GTRecord a, GTRecord b) {
+            assert a.info == b.info;
+            assert a.maskForEqualHashComp() == b.maskForEqualHashComp();
+            BitSet mask = a.maskForEqualHashComp();
+
+            int comp = 0;
+            for (int i = mask.nextSetBit(0); i >= 0; i = mask.nextSetBit(i + 1)) {
+                comp = comparator.compare(a.cols[i], b.cols[i]);
+                if (comp != 0)
+                    return comp;
+            }
+            return 0; // equals
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1369339/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRequest.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRequest.java
index 977363c..c92cba4 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRequest.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTScanRequest.java
@@ -12,8 +12,7 @@ public class GTScanRequest {
 
     // basic
     private GTInfo info;
-    private GTRecord pkStart; // inclusive
-    private GTRecord pkEnd; // inclusive
+    private GTScanRange range;
     private BitSet columns;
 
     // optional filtering
@@ -25,23 +24,21 @@ public class GTScanRequest {
     private String[] aggrMetricsFuncs;
     
     public GTScanRequest(GTInfo info) {
-        this(info, null, null, null, null);
+        this(info, null, null, null);
     }
 
-    public GTScanRequest(GTInfo info, GTRecord pkStart, GTRecord pkEnd, BitSet columns, TupleFilter filterPushDown) {
+    public GTScanRequest(GTInfo info, GTScanRange range, BitSet columns, TupleFilter filterPushDown) {
         this.info = info;
-        this.pkStart = pkStart;
-        this.pkEnd = pkEnd;
+        this.range = range;
         this.columns = columns;
         this.filterPushDown = filterPushDown;
         validate();
     }
     
-    public GTScanRequest(GTInfo info, GTRecord pkStart, GTRecord pkEnd, BitSet aggrGroupBy, BitSet aggrMetrics, //
+    public GTScanRequest(GTInfo info, GTScanRange range, BitSet aggrGroupBy, BitSet aggrMetrics, //
             String[] aggrMetricsFuncs, TupleFilter filterPushDown) {
         this.info = info;
-        this.pkStart = pkStart;
-        this.pkEnd = pkEnd;
+        this.range = range;
         this.columns = new BitSet();
         this.filterPushDown = filterPushDown;
         
@@ -53,6 +50,9 @@ public class GTScanRequest {
     }
     
     private void validate() {
+        if (range == null)
+            range = new GTScanRange(null, null);
+        
         if (columns == null)
             columns = (BitSet) info.colAll.clone();
         
@@ -111,11 +111,11 @@ public class GTScanRequest {
     }
 
     public GTRecord getPkStart() {
-        return pkStart;
+        return range.pkStart;
     }
 
     public GTRecord getPkEnd() {
-        return pkEnd;
+        return range.pkEnd;
     }
 
     public BitSet getColumns() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d1369339/storage/src/test/java/org/apache/kylin/storage/gridtable/GridTableTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/GridTableTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/GridTableTest.java
index e2071d6..1a69138 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/GridTableTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/GridTableTest.java
@@ -80,7 +80,7 @@ public class GridTableTest {
     }
 
     private IGTScanner scanAndAggregate(GridTable table) throws IOException {
-        GTScanRequest req = new GTScanRequest(table.getInfo(), null, null, setOf(0, 2), setOf(3, 4), new String[] { "count", "sum" }, null);
+        GTScanRequest req = new GTScanRequest(table.getInfo(), null, setOf(0, 2), setOf(3, 4), new String[] { "count", "sum" }, null);
         IGTScanner scanner = table.scan(req);
         int i = 0;
         for (GTRecord r : scanner) {