You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/07/08 09:03:10 UTC

[kylin] 04/06: KYLIN-3370 enhance segment pruning

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 05baf2123fd0b0fcfee613e1fe18c93722c8ed9b
Author: Li Yang <li...@apache.org>
AuthorDate: Tue May 22 12:00:19 2018 +0800

    KYLIN-3370 enhance segment pruning
    
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 .../java/org/apache/kylin/cube/CubeSegment.java    |  12 +
 .../org/apache/kylin/cube/DimensionRangeInfo.java  | 104 ++++++++
 .../apache/kylin/cube/common/SegmentPruner.java    | 179 ++++++++++++++
 .../kylin/cube/gridtable/ScanRangePlannerBase.java |   4 +-
 .../inmemcubing/InputConverterUnitForRawData.java  |   2 -
 .../apache/kylin/cube/kv/RowKeyColumnOrder.java    | 108 --------
 .../kylin/cube/model/CubeJoinedFlatTableDesc.java  |  13 +-
 .../apache/kylin/cube/DimensionRangeInfoTest.java  |  87 +++++++
 .../kylin/cube/common/SegmentPrunerTest.java       | 195 +++++++++++++++
 .../apache/kylin/metadata/datatype/DataType.java   |  26 +-
 .../kylin/metadata/datatype/DataTypeOrder.java     | 155 ++++++++++++
 .../kylin/metadata/filter/CompareTupleFilter.java  |   2 +-
 .../apache/kylin/metadata/filter/TupleFilter.java  | 116 +++++++--
 .../kylin/metadata/datatype/DataTypeOrderTest.java |  57 +++++
 .../kylin/metadata/filter/TupleFilterTest.java     |  77 ++++++
 .../storage/gtrecord/CubeScanRangePlanner.java     |  30 +--
 .../storage/gtrecord/GTCubeStorageQueryBase.java   |   9 +-
 .../kylin/storage/translate/ColumnValueRange.java  | 214 ----------------
 .../storage/translate/DerivedFilterTranslator.java |   6 +-
 .../kylin/storage/translate/HBaseKeyRange.java     | 273 ---------------------
 .../kylin/storage/gtrecord/DictGridTableTest.java  |  69 ++----
 .../storage/translate/ColumnValueRangeTest.java    | 126 ----------
 .../kylin/engine/mr/common/BaseCuboidBuilder.java  |   3 +-
 .../engine/mr/steps/BaseCuboidMapperBase.java      |   9 +-
 .../mr/steps/CalculateStatsFromBaseCuboidJob.java  |   1 -
 .../mr/steps/FactDistinctColumnPartitioner.java    |   5 +-
 .../engine/mr/steps/FactDistinctColumnsJob.java    |   2 +-
 .../engine/mr/steps/FactDistinctColumnsMapper.java |  43 +---
 .../mr/steps/FactDistinctColumnsMapperBase.java    |  20 +-
 .../mr/steps/FactDistinctColumnsReducer.java       |  90 ++++---
 .../steps/FactDistinctColumnsReducerMapping.java   |  87 +++----
 .../mr/steps/UpdateCubeInfoAfterBuildStep.java     |  83 ++++---
 .../mr/steps/UpdateCubeInfoAfterMergeStep.java     |  20 +-
 .../mr/steps/UpdateCubeInfoAfterOptimizeStep.java  |   1 +
 .../FactDistinctColumnsReducerMappingTest.java     |  12 +-
 .../cube/ssb_cube_with_dimention_range.json        | 110 +++++++++
 .../cube_desc/ssb_cube_with_dimention_range.json   | 269 ++++++++++++++++++++
 .../kylin/provision/BuildCubeWithEngine.java       |  87 +++++--
 .../query/optrule/AggregateMultipleExpandRule.java |  16 +-
 .../apache/kylin/query/relnode/OLAPFilterRel.java  |  26 +-
 .../org/apache/kylin/source/hive/HiveMRInput.java  |   1 -
 .../apache/kylin/source/hive/HiveTableReader.java  |   2 +-
 42 files changed, 1669 insertions(+), 1082 deletions(-)

diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index d49c273..75c90a4 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -119,6 +119,10 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
     @JsonInclude(JsonInclude.Include.NON_EMPTY)
     private Map<String, String> additionalInfo = new LinkedHashMap<String, String>();
 
+    @JsonProperty("dimension_range_info_map")
+    @JsonInclude(JsonInclude.Include.NON_EMPTY)
+    private Map<String, DimensionRangeInfo> dimensionRangeInfoMap = Maps.newHashMap();
+
     private Map<Long, Short> cuboidBaseShards = Maps.newConcurrentMap(); // cuboid id ==> base(starting) shard for this cuboid
 
     // lazy init
@@ -575,4 +579,12 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
     public void setSourcePartitionOffsetStart(Map<Integer, Long> sourcePartitionOffsetStart) {
         this.sourcePartitionOffsetStart = sourcePartitionOffsetStart;
     }
+
+    public Map<String, DimensionRangeInfo> getDimensionRangeInfoMap() {
+        return dimensionRangeInfoMap;
+    }
+
+    public void setDimensionRangeInfoMap(Map<String, DimensionRangeInfo> dimensionRangeInfoMap) {
+        this.dimensionRangeInfoMap = dimensionRangeInfoMap;
+    }
 }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java b/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
new file mode 100644
index 0000000..e36ca96
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube;
+
+import java.util.Map;
+
+import org.apache.kylin.metadata.datatype.DataTypeOrder;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Maps;
+
+@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
+public class DimensionRangeInfo {
+
+    private static final Logger logger = LoggerFactory.getLogger(DimensionRangeInfo.class);
+
+    public static Map<String, DimensionRangeInfo> mergeRangeMap(DataModelDesc model, Map<String, DimensionRangeInfo> m1,
+            Map<String, DimensionRangeInfo> m2) {
+
+        if (!m1.keySet().equals(m2.keySet())) {
+            logger.warn("Merging incompatible maps of DimensionRangeInfo, keys in m1 " + m1.keySet() + ", keys in m2 "
+                    + m2.keySet());
+        }
+
+        Map<String, DimensionRangeInfo> result = Maps.newHashMap();
+        
+        for (String colId : m1.keySet()) {
+            if (!m2.containsKey(colId))
+                continue;
+
+            DimensionRangeInfo r1 = m1.get(colId);
+            DimensionRangeInfo r2 = m2.get(colId);
+            
+            DimensionRangeInfo newR;
+            if (r1.getMin() == null && r1.getMax() == null) {
+                newR = r2; // when r1 is all null or has 0 records
+            } else if (r2.getMin() == null && r2.getMax() == null) {
+                newR = r1; // when r2 is all null or has 0 records
+            } else {
+                DataTypeOrder order = model.findColumn(colId).getType().getOrder();
+                String newMin = order.min(r1.getMin(), r2.getMin());
+                String newMax = order.max(r1.getMax(), r2.getMax());
+                newR = new DimensionRangeInfo(newMin, newMax);
+            }
+            
+            result.put(colId, newR);
+        }
+        
+        return result;
+    }
+    
+    // ============================================================================
+
+    @JsonProperty("min")
+    private String min;
+
+    @JsonProperty("max")
+    private String max;
+
+    public DimensionRangeInfo() {}
+
+    public DimensionRangeInfo(String min, String max) {
+        if (min == null && max != null || min != null && max == null)
+            throw new IllegalStateException();
+        
+        this.min = min;
+        this.max = max;
+    }
+
+    public String getMin() {
+        return min;
+    }
+
+    public String getMax() {
+        return max;
+    }
+    
+    @Override
+    public String toString() {
+        return "[" + min + ", " + max + "]";
+    }
+
+}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/common/SegmentPruner.java b/core-cube/src/main/java/org/apache/kylin/cube/common/SegmentPruner.java
new file mode 100644
index 0000000..de77511
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/common/SegmentPruner.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.common;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.DimensionRangeInfo;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataTypeOrder;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SegmentPruner {
+    private static final Logger logger = LoggerFactory.getLogger(SegmentPruner.class);
+
+    final private Set<CompareTupleFilter> mustTrueCompares;
+
+    public SegmentPruner(TupleFilter filter) {
+        this.mustTrueCompares = filter == null ? Collections.<CompareTupleFilter> emptySet()
+                : filter.findMustTrueCompareFilters();
+    }
+
+    public List<CubeSegment> listSegmentsForQuery(CubeInstance cube) {
+        List<CubeSegment> r = new ArrayList<>();
+        for (CubeSegment seg : cube.getSegments(SegmentStatusEnum.READY)) {
+            if (check(seg))
+                r.add(seg);
+        }
+        return r;
+    }
+    
+    public boolean check(CubeSegment seg) {
+        
+        if (seg.getInputRecords() == 0) {
+            if (seg.getConfig().isSkippingEmptySegments()) {
+                logger.debug("Prune segment {} due to 0 input record", seg);
+                return false;
+            } else {
+                logger.debug("Insist scan of segment {} having 0 input record", seg);
+            }
+        }
+
+        Map<String, DimensionRangeInfo> segDimRangInfoMap = seg.getDimensionRangeInfoMap();
+        for (CompareTupleFilter comp : mustTrueCompares) {
+            TblColRef col = comp.getColumn();
+            
+            DimensionRangeInfo dimRangeInfo = segDimRangInfoMap.get(col.getIdentity());
+            if (dimRangeInfo == null)
+                dimRangeInfo = tryDeduceRangeFromPartitionCol(seg, col);
+            if (dimRangeInfo == null)
+                continue;
+            
+            String minVal = dimRangeInfo.getMin();
+            String maxVal = dimRangeInfo.getMax();
+            
+            if (!satisfy(comp, minVal, maxVal)) {
+                logger.debug("Prune segment {} due to given filter", seg);
+                return false;
+            }
+        }
+
+        logger.debug("Pruner passed on segment {}", seg);
+        return true;
+    }
+
+    private DimensionRangeInfo tryDeduceRangeFromPartitionCol(CubeSegment seg, TblColRef col) {
+        DataModelDesc model = seg.getModel();
+        PartitionDesc part = model.getPartitionDesc();
+        
+        if (!part.isPartitioned())
+            return null;
+        if (!col.equals(part.getPartitionDateColumnRef()))
+            return null;
+        
+        // deduce the dim range from TSRange
+        TSRange tsRange = seg.getTSRange();
+        if (tsRange.start.isMin || tsRange.end.isMax)
+            return null; // DimensionRangeInfo cannot express infinite
+        
+        String min = tsRangeToStr(tsRange.start.v, part);
+        String max = tsRangeToStr(tsRange.end.v - 1, part); // note the -1, end side is exclusive
+        return new DimensionRangeInfo(min, max);
+    }
+
+    private String tsRangeToStr(long ts, PartitionDesc part) {
+        String value;
+        DataType partitionColType = part.getPartitionDateColumnRef().getType();
+        if (partitionColType.isDate()) {
+            value = DateFormat.formatToDateStr(ts);
+        } else if (partitionColType.isTimeFamily()) {
+            value = DateFormat.formatToTimeWithoutMilliStr(ts);
+        } else if (partitionColType.isStringFamily() || partitionColType.isIntegerFamily()) {//integer like 20160101
+            String partitionDateFormat = part.getPartitionDateFormat();
+            if (StringUtils.isEmpty(partitionDateFormat)) {
+                value = "" + ts;
+            } else {
+                value = DateFormat.formatToDateStr(ts, partitionDateFormat);
+            }
+        } else {
+            throw new RuntimeException("Type " + partitionColType + " is not valid partition column type");
+        }
+        return value;
+    }
+
+    private boolean satisfy(CompareTupleFilter comp, String minVal, String maxVal) {
+
+        // When both min and max are null, it means all cells of the column are null.
+        // In such case, return true to let query engine scan the segment, since the
+        // result of null comparison is query engine specific.
+        if (minVal == null && maxVal == null)
+            return true;
+        
+        // pass on non-constant filter
+        if (comp.getChildren().size() > 1 && !(comp.getChildren().get(1) instanceof ConstantTupleFilter))
+            return true;
+
+        TblColRef col = comp.getColumn();
+        DataTypeOrder order = col.getType().getOrder();
+        String filterVal = toString(comp.getFirstValue());
+        
+        switch (comp.getOperator()) {
+        case EQ:
+        case IN:
+            String filterMin = order.min((Set<String>) comp.getValues());
+            String filterMax = order.max((Set<String>) comp.getValues());
+            return order.compare(filterMin, maxVal) <= 0 && order.compare(minVal, filterMax) <= 0;
+        case LT:
+            return order.compare(minVal, filterVal) < 0;
+        case LTE:
+            return order.compare(minVal, filterVal) <= 0;
+        case GT:
+            return order.compare(maxVal, filterVal) > 0;
+        case GTE:
+            return order.compare(maxVal, filterVal) >= 0;
+        case NEQ:
+        case NOTIN:
+        case ISNULL:
+        case ISNOTNULL:
+        default:
+            return true;
+        }
+    }
+
+    private String toString(Object v) {
+        return v == null ? null : v.toString();
+    }
+}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java
index 811d512..97e4dc3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java
@@ -29,7 +29,6 @@ import java.util.Set;
 
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.metadata.expression.TupleExpression;
@@ -49,14 +48,13 @@ public abstract class ScanRangePlannerBase {
     //GT 
     protected GTInfo gtInfo;
     protected TupleFilter gtFilter;
-    protected Pair<ByteArray, ByteArray> gtStartAndEnd;
-    protected TblColRef gtPartitionCol;
     protected ImmutableBitSet gtDimensions;
     protected ImmutableBitSet gtAggrGroups;
     protected ImmutableBitSet gtAggrMetrics;
     protected String[] gtAggrFuncs;
     protected TupleFilter havingFilter;
     protected boolean isPartitionColUsingDatetimeEncoding = true;
+    protected int onlyShardId = -1;
 
     protected RecordComparator rangeStartComparator;
     protected RecordComparator rangeEndComparator;
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
index fc34f37..2ff7ee0 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
@@ -43,7 +43,6 @@ public class InputConverterUnitForRawData implements InputConverterUnit<String[]
     @SuppressWarnings("unused")
     private static final Logger logger = LoggerFactory.getLogger(InputConverterUnitForRawData.class);
     
-    public static final byte[] HIVE_NULL = Bytes.toBytes("\\N");
     public static final String[] END_ROW = new String[0];
     public static final String[] CUT_ROW = { "" };
 
@@ -149,7 +148,6 @@ public class InputConverterUnitForRawData implements InputConverterUnit<String[]
 
     private void initNullBytes(CubeDesc cubeDesc) {
         nullBytes = Lists.newArrayList();
-        nullBytes.add(HIVE_NULL);
         String[] nullStrings = cubeDesc.getNullStrings();
         if (nullStrings != null) {
             for (String s : nullStrings) {
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java
deleted file mode 100644
index 3f4f6f4..0000000
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.cube.kv;
-
-import java.util.Collection;
-import java.util.Comparator;
-
-import org.apache.kylin.metadata.datatype.DataType;
-
-/**
- * @author yangli9
- */
-abstract public class RowKeyColumnOrder implements Comparator<String> {
-
-    public static final NumberOrder NUMBER_ORDER = new NumberOrder();
-    public static final StringOrder STRING_ORDER = new StringOrder();
-
-    public static RowKeyColumnOrder getInstance(DataType type) {
-        if (type.isNumberFamily() || type.isDateTimeFamily())
-            return NUMBER_ORDER;
-        else
-            return STRING_ORDER;
-    }
-
-    public String max(Collection<String> values) {
-        String max = null;
-        for (String v : values) {
-            if (max == null || compare(max, v) < 0)
-                max = v;
-        }
-        return max;
-    }
-
-    public String min(Collection<String> values) {
-        String min = null;
-        for (String v : values) {
-            if (min == null || compare(min, v) > 0)
-                min = v;
-        }
-        return min;
-    }
-
-    public String min(String v1, String v2) {
-        if (v1 == null)
-            return v2;
-        else if (v2 == null)
-            return v1;
-        else
-            return compare(v1, v2) <= 0 ? v1 : v2;
-    }
-
-    public String max(String v1, String v2) {
-        if (v1 == null)
-            return v2;
-        else if (v2 == null)
-            return v1;
-        else
-            return compare(v1, v2) >= 0 ? v1 : v2;
-    }
-
-    @Override
-    public int compare(String o1, String o2) {
-        // consider null
-        if (o1 == o2)
-            return 0;
-        if (o1 == null)
-            return -1;
-        if (o2 == null)
-            return 1;
-
-        return compareNonNull(o1, o2);
-    }
-
-    abstract int compareNonNull(String o1, String o2);
-
-    private static class StringOrder extends RowKeyColumnOrder {
-        @Override
-        public int compareNonNull(String o1, String o2) {
-            return o1.compareTo(o2);
-        }
-    }
-
-    private static class NumberOrder extends RowKeyColumnOrder {
-        @Override
-        public int compareNonNull(String o1, String o2) {
-            double d1 = Double.parseDouble(o1);
-            double d2 = Double.parseDouble(o2);
-            return Double.compare(d1, d2);
-        }
-    }
-
-}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index 2ab7aac..467a294 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.kylin.common.util.BytesSplitter;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.DataModelDesc;
@@ -147,16 +146,10 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializab
         return factColumns;
     }
 
-    // sanity check the input record (in bytes) matches what's expected
-    public void sanityCheck(BytesSplitter bytesSplitter) {
-        if (columnCount != bytesSplitter.getBufferSize()) {
-            throw new IllegalArgumentException("Expect " + columnCount + " columns, but see "
-                    + bytesSplitter.getBufferSize() + " -- " + bytesSplitter);
-        }
-
-        // TODO: check data types here
+    public int getColumnCount() {
+        return columnCount;
     }
-
+    
     @Override
     public String getTableName() {
         return tableName;
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/DimensionRangeInfoTest.java b/core-cube/src/test/java/org/apache/kylin/cube/DimensionRangeInfoTest.java
new file mode 100644
index 0000000..43175c9
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/DimensionRangeInfoTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.DataModelManager;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DimensionRangeInfoTest extends LocalFileMetadataTestCase {
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testMergeRangeMap() {
+        DataModelDesc model = DataModelManager.getInstance(getTestConfig()).getDataModelDesc("ci_inner_join_model");
+        String colId = "TEST_KYLIN_FACT.CAL_DT";
+
+        // normal merge
+        {
+            Map<String, DimensionRangeInfo> m1 = new HashMap<>();
+            m1.put(colId, new DimensionRangeInfo("2012-01-01", "2012-05-31"));
+
+            Map<String, DimensionRangeInfo> m2 = new HashMap<>();
+            m2.put(colId, new DimensionRangeInfo("2012-06-01", "2013-06-30"));
+
+            DimensionRangeInfo r1 = DimensionRangeInfo.mergeRangeMap(model, m1, m2).get(colId);
+            Assert.assertEquals("2012-01-01", r1.getMin());
+            Assert.assertEquals("2013-06-30", r1.getMax());
+        }
+        
+        // missing column on one side
+        {
+            Map<String, DimensionRangeInfo> m1 = new HashMap<>();
+            m1.put(colId, new DimensionRangeInfo("2012-01-01", "2012-05-31"));
+
+            Map<String, DimensionRangeInfo> m2 = new HashMap<>();
+
+            Assert.assertTrue(DimensionRangeInfo.mergeRangeMap(model, m1, m2).isEmpty());
+            Assert.assertTrue(DimensionRangeInfo.mergeRangeMap(model, m2, m1).isEmpty());
+        }
+        
+        // null min/max value (happens on empty segment, or all-null columns)
+        {
+            Map<String, DimensionRangeInfo> m1 = new HashMap<>();
+            m1.put(colId, new DimensionRangeInfo(null, null));
+
+            Map<String, DimensionRangeInfo> m2 = new HashMap<>();
+            m2.put(colId, new DimensionRangeInfo("2012-06-01", "2013-06-30"));
+
+            DimensionRangeInfo r1 = DimensionRangeInfo.mergeRangeMap(model, m1, m2).get(colId);
+            Assert.assertEquals("2012-06-01", r1.getMin());
+            Assert.assertEquals("2013-06-30", r1.getMax());
+        }
+        
+    }
+}
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/common/SegmentPrunerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/common/SegmentPrunerTest.java
new file mode 100644
index 0000000..603db97
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/common/SegmentPrunerTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.common;
+
+import static org.apache.kylin.metadata.filter.TupleFilter.compare;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.common.util.SetAndUnsetSystemProp;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+public class SegmentPrunerTest extends LocalFileMetadataTestCase {
+    private CubeInstance cube;
+
+    @Before
+    public void setUp() {
+        this.createTestMetadata();
+        cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube("ssb_cube_with_dimention_range");
+    }
+
+    @After
+    public void after() {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testEmptySegment() {
+        CubeSegment seg = cube.getFirstSegment();
+        TblColRef col = cube.getModel().findColumn("CUSTOMER.C_NATION");
+
+        // a normal hit
+        TupleFilter f = compare(col, FilterOperatorEnum.EQ, "CHINA");
+        SegmentPruner segmentPruner = new SegmentPruner(f);
+        Assert.assertTrue(segmentPruner.check(seg));
+
+        // make the segment empty, it should be pruned
+        seg.setInputRecords(0);
+        Assert.assertFalse(segmentPruner.check(seg));
+    }
+
+    @Test
+    public void testDimensionRangeCheck() {
+        CubeSegment cubeSegment = cube.getSegments().getFirstSegment();
+
+        //integer
+        TblColRef qtyCol = cube.getModel().findColumn("V_LINEORDER.LO_QUANTITY");
+        TblColRef revCol = cube.getModel().findColumn("V_LINEORDER.V_REVENUE");
+        
+        TupleFilter constFilter_LO_QUANTITY0 = new ConstantTupleFilter(Sets.newHashSet("8", "18", "28"));//between min and max value
+        TupleFilter constFilter_LO_QUANTITY1 = new ConstantTupleFilter("1");//min value
+        TupleFilter constFilter_LO_QUANTITY2 = new ConstantTupleFilter("50");//max value
+        TupleFilter constFilter_LO_QUANTITY3 = new ConstantTupleFilter("0");//lt min value
+        TupleFilter constFilter_LO_QUANTITY4 = new ConstantTupleFilter("200");//gt max value
+        TupleFilter constFilter_LO_QUANTITY5 = new ConstantTupleFilter(Sets.newHashSet("51", "52", "53"));//gt max values
+
+        // non-constant filter
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.EQ, revCol);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertTrue(segmentPruner.check(cubeSegment));
+        }
+        
+        // is null
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.ISNULL);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertTrue(segmentPruner.check(cubeSegment));
+        }
+
+        //lt min value
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.LT, constFilter_LO_QUANTITY1);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertFalse(segmentPruner.check(cubeSegment));
+        }
+
+        //lte min value
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.LTE, constFilter_LO_QUANTITY1);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertTrue(segmentPruner.check(cubeSegment));
+        }
+
+        //lt max value
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.LT, constFilter_LO_QUANTITY2);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertTrue(segmentPruner.check(cubeSegment));
+        }
+
+        //gt max value
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.GT, constFilter_LO_QUANTITY2);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertFalse(segmentPruner.check(cubeSegment));
+        }
+
+        //gte max value
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.GTE, constFilter_LO_QUANTITY2);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertTrue(segmentPruner.check(cubeSegment));
+        }
+
+        //gt min value
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.GT, constFilter_LO_QUANTITY1);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertTrue(segmentPruner.check(cubeSegment));
+        }
+
+        //in over-max values
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.IN, constFilter_LO_QUANTITY5);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertFalse(segmentPruner.check(cubeSegment));
+        }
+
+        //in normal values
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.IN, constFilter_LO_QUANTITY0);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertTrue(segmentPruner.check(cubeSegment));
+        }
+
+        //lte under-min value
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.LTE, constFilter_LO_QUANTITY3);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertFalse(segmentPruner.check(cubeSegment));
+        }
+
+        //gte over-max value
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.GTE, constFilter_LO_QUANTITY4);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertFalse(segmentPruner.check(cubeSegment));
+        }
+    }
+
+    @Test
+    public void testLegacyCubeSeg() {
+        // legacy cube segments does not have DimensionRangeInfo, but with TSRange can do some pruning
+        CubeInstance cube = CubeManager.getInstance(getTestConfig())
+                .getCube("test_kylin_cube_without_slr_left_join_ready_2_segments");
+
+        TblColRef col = cube.getModel().findColumn("TEST_KYLIN_FACT.CAL_DT");
+        CubeSegment seg = cube.getSegments(SegmentStatusEnum.READY).get(0);
+        TSRange tsRange = seg.getTSRange();
+        long start = tsRange.start.v;
+
+        try (SetAndUnsetSystemProp sns = new SetAndUnsetSystemProp("kylin.query.skip-empty-segments", "false")) {
+            {
+                TupleFilter f = compare(col, FilterOperatorEnum.LTE, start);
+                SegmentPruner segmentPruner = new SegmentPruner(f);
+                Assert.assertTrue(segmentPruner.check(seg));
+            }
+            {
+                TupleFilter f = compare(col, FilterOperatorEnum.LT, start);
+                SegmentPruner segmentPruner = new SegmentPruner(f);
+                Assert.assertFalse(segmentPruner.check(seg));
+            }
+        }
+    }
+}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java
index 5ccc1f3..d9eefb5 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java
@@ -19,7 +19,6 @@
 package org.apache.kylin.metadata.datatype;
 
 import java.io.Serializable;
-import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -35,7 +34,6 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.measure.MeasureTypeFactory;
 import org.apache.kylin.metadata.model.TblColRef.InnerDataTypeEnum;
 
@@ -162,6 +160,7 @@ public class DataType implements Serializable {
     private String name;
     private int precision;
     private int scale;
+    private transient DataTypeOrder order;
 
     public DataType(String name, int precision, int scale) {
         this.name = name;
@@ -224,24 +223,17 @@ public class DataType implements Serializable {
                 scale = KylinConfig.getInstanceFromEnv().getDefaultDecimalScale();
             }
         }
-
     }
 
+    public DataTypeOrder getOrder() {
+        if (order == null)
+            order = DataTypeOrder.getInstance(this);
+        
+        return order;
+    }
+    
     public int compare(String value1, String value2) {
-        if (isDateTimeFamily()) {
-            Long millis1 = DateFormat.stringToMillis(value1);
-            Long millis2 = DateFormat.stringToMillis(value2);
-            return millis1.compareTo(millis2);
-        } else if (isIntegerFamily()) {
-            Long l1 = new Long(value1);
-            Long l2 = new Long(value2);
-            return l1.compareTo(l2);
-        } else if (isNumberFamily()) {
-            BigDecimal bigDecimal1 = new BigDecimal(value1);
-            BigDecimal bigDecimal2 = new BigDecimal(value2);
-            return bigDecimal1.compareTo(bigDecimal2);
-        }
-        return value1.compareTo(value2);
+        return getOrder().compare(value1,  value2);
     }
 
     private String replaceLegacy(String str) {
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeOrder.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeOrder.java
new file mode 100644
index 0000000..091e2ae
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeOrder.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import java.math.BigDecimal;
+import java.util.Collection;
+import java.util.Comparator;
+
+import org.apache.kylin.common.util.DateFormat;
+
+/**
+ * Define order for string literals based on its underlying data type.
+ * 
+ * Null is the smallest.
+ */
+abstract public class DataTypeOrder implements Comparator<String> {
+
+    public static final DataTypeOrder INTEGER_ORDER = new IntegerOrder();
+    public static final DataTypeOrder DOUBLE_ORDER = new DoubleOrder();
+    public static final DataTypeOrder DECIMAL_ORDER = new DecimalOrder();
+    public static final DataTypeOrder DATETIME_ORDER = new DateTimeOrder();
+    public static final DataTypeOrder STRING_ORDER = new StringOrder();
+
+    // package private, access via DataType.getOrder()
+    static DataTypeOrder getInstance(DataType type) throws IllegalArgumentException {
+        if (type.isStringFamily())
+            return STRING_ORDER;
+        else if (type.isDateTimeFamily())
+            return DATETIME_ORDER;
+        else if (type.isIntegerFamily())
+            return INTEGER_ORDER;
+        else if (type.isFloat() || type.isDouble())
+            return DOUBLE_ORDER;
+        else if (type.isDecimal())
+            return DECIMAL_ORDER;
+        else
+            throw new IllegalArgumentException("Unsupported data type " + type);
+    }
+
+    public String max(Collection<String> values) {
+        String max = null;
+        for (String v : values) {
+            max = max(max, v);
+        }
+        return max;
+    }
+
+    public String min(Collection<String> values) {
+        String min = null;
+        for (String v : values) {
+            min = min(min, v);
+        }
+        return min;
+    }
+
+    public String min(String v1, String v2) {
+        if (v1 == null)
+            return v2;
+        else if (v2 == null)
+            return v1;
+        else
+            return compare(v1, v2) <= 0 ? v1 : v2;
+    }
+
+    public String max(String v1, String v2) {
+        if (v1 == null)
+            return v2;
+        else if (v2 == null)
+            return v1;
+        else
+            return compare(v1, v2) >= 0 ? v1 : v2;
+    }
+
+    @Override
+    public int compare(String s1, String s2) {
+        Comparable o1 = toComparable(s1);
+        Comparable o2 = toComparable(s2);
+        
+        // consider null
+        if (o1 == o2)
+            return 0;
+        if (o1 == null)
+            return -1;
+        if (o2 == null)
+            return 1;
+
+        return o1.compareTo(o2);
+    }
+
+    abstract Comparable toComparable(String s);
+
+    private static class StringOrder extends DataTypeOrder {
+        @Override
+        public String toComparable(String s) {
+            return s;
+        }
+    }
+
+    private static class IntegerOrder extends DataTypeOrder {
+        @Override
+        public Long toComparable(String s) {
+            if (s == null || s.isEmpty())
+                return null;
+            else
+                return Long.parseLong(s);
+        }
+    }
+
+    private static class DoubleOrder extends DataTypeOrder {
+        @Override
+        public Double toComparable(String s) {
+            if (s == null || s.isEmpty())
+                return null;
+            else
+                return Double.parseDouble(s);
+        }
+    }
+    
+    private static class DecimalOrder extends DataTypeOrder {
+        @Override
+        public BigDecimal toComparable(String s) {
+            if (s == null || s.isEmpty())
+                return null;
+            else
+                return new BigDecimal(s);
+        }
+    }
+    
+    private static class DateTimeOrder extends DataTypeOrder {
+        @Override
+        public Long toComparable(String s) {
+            if (s == null || s.isEmpty())
+                return null;
+            else
+                return DateFormat.stringToMillis(s);
+        }
+    }
+    
+}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
index 16f165a..2c75ec1 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
@@ -33,7 +33,7 @@ import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
  * @author xjiang
  */
 public class CompareTupleFilter extends TupleFilter implements IOptimizeableTupleFilter {
-
+    
     public enum CompareResultType {
         AlwaysTrue, AlwaysFalse, Unknown
     }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
index 09b41f5..672aba0 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
@@ -20,6 +20,7 @@ package org.apache.kylin.metadata.filter;
 
 import java.nio.ByteBuffer;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -31,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 /**
  * 
@@ -81,6 +83,44 @@ public abstract class TupleFilter {
         SWAP_OP_MAP.put(FilterOperatorEnum.LT, FilterOperatorEnum.GT);
         SWAP_OP_MAP.put(FilterOperatorEnum.GTE, FilterOperatorEnum.LTE);
     }
+    
+    public static CompareTupleFilter compare(TblColRef col, FilterOperatorEnum op) {
+        CompareTupleFilter r = new CompareTupleFilter(op);
+        r.addChild(new ColumnTupleFilter(col));
+        return r;
+    }
+    
+    public static CompareTupleFilter compare(TblColRef col, FilterOperatorEnum op, Object val) {
+        CompareTupleFilter r = new CompareTupleFilter(op);
+        r.addChild(new ColumnTupleFilter(col));
+        if (val instanceof TupleFilter)
+            r.addChild((TupleFilter) val);
+        else if (val instanceof TblColRef)
+            r.addChild(new ColumnTupleFilter((TblColRef) col));
+        else
+            r.addChild(new ConstantTupleFilter(val));
+        return r;
+    }
+
+    public static LogicalTupleFilter and(TupleFilter... children) {
+        LogicalTupleFilter r = new LogicalTupleFilter(FilterOperatorEnum.AND);
+        r.addChildren(children);
+        return r;
+    }
+    
+    public static LogicalTupleFilter or(TupleFilter... children) {
+        LogicalTupleFilter r = new LogicalTupleFilter(FilterOperatorEnum.OR);
+        r.addChildren(children);
+        return r;
+    }
+    
+    public static LogicalTupleFilter not(TupleFilter child) {
+        LogicalTupleFilter r = new LogicalTupleFilter(FilterOperatorEnum.NOT);
+        r.addChild(child);
+        return r;
+    }
+    
+    // ============================================================================
 
     protected final List<TupleFilter> children;
     protected FilterOperatorEnum operator;
@@ -245,7 +285,61 @@ public abstract class TupleFilter {
         }
         return oldProductFilters;
     }
+    
+    public HashMap<TblColRef, Object> findMustEqualColsAndValues(Collection<TblColRef> lookingForCols) {
+        HashMap<TblColRef, Object> result = new HashMap<>();
+        findMustEqualColsAndValues(this, lookingForCols, result);
+        return result;
+    }
 
+    private void findMustEqualColsAndValues(TupleFilter filter, Collection<TblColRef> lookingForCols, HashMap<TblColRef, Object> result) {
+        if (filter instanceof CompareTupleFilter) {
+            CompareTupleFilter comp = (CompareTupleFilter) filter;
+            TblColRef col = comp.getColumn();
+            if (lookingForCols.contains(col)) {
+                if (comp.getOperator() == FilterOperatorEnum.EQ)
+                    result.put(col, comp.getFirstValue());
+                else if (comp.getOperator() == FilterOperatorEnum.ISNULL)
+                    result.put(col, null);
+            }
+            return;
+        }
+
+        if (filter instanceof LogicalTupleFilter) {
+            LogicalTupleFilter logic = (LogicalTupleFilter) filter;
+            if (logic.getOperator() == FilterOperatorEnum.AND) {
+                for (TupleFilter child : logic.getChildren())
+                    findMustEqualColsAndValues(child, lookingForCols, result);
+            }
+            return;
+        }
+    }
+
+    //find must true compareTupleFilter
+    public Set<CompareTupleFilter> findMustTrueCompareFilters() {
+        Set<CompareTupleFilter> result = Sets.newHashSet();
+        findMustTrueCompareFilters(this, result);
+        return result;
+    }
+
+    private void findMustTrueCompareFilters(TupleFilter filter, Set<CompareTupleFilter> result) {
+        if (filter instanceof CompareTupleFilter) {
+            if (((CompareTupleFilter) filter).getColumn() != null) {
+                result.add((CompareTupleFilter) filter);
+            }
+            return;
+        }
+        
+        if (filter instanceof LogicalTupleFilter) {
+            if (filter.getOperator() == FilterOperatorEnum.AND) {
+                for (TupleFilter child : filter.getChildren()) {
+                    findMustTrueCompareFilters(child, result);
+                }
+            }
+            return;
+        }
+    }
+    
     public abstract boolean isEvaluable();
 
     public abstract boolean evaluate(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs);
@@ -284,26 +378,4 @@ public abstract class TupleFilter {
         }
     }
 
-    public static TupleFilter and(TupleFilter f1, TupleFilter f2) {
-        if (f1 == null)
-            return f2;
-        if (f2 == null)
-            return f1;
-
-        if (f1.getOperator() == FilterOperatorEnum.AND) {
-            f1.addChild(f2);
-            return f1;
-        }
-
-        if (f2.getOperator() == FilterOperatorEnum.AND) {
-            f2.addChild(f1);
-            return f2;
-        }
-
-        LogicalTupleFilter and = new LogicalTupleFilter(FilterOperatorEnum.AND);
-        and.addChild(f1);
-        and.addChild(f2);
-        return and;
-    }
-
 }
diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/DataTypeOrderTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/DataTypeOrderTest.java
new file mode 100644
index 0000000..7666ffe
--- /dev/null
+++ b/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/DataTypeOrderTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Set;
+
+public class DataTypeOrderTest {
+    @Test
+    public void testDataTypeOrder() {
+        DataType intType = DataType.getType("integer");
+        DataTypeOrder dataTypeOrder = intType.getOrder();
+        Set<String> integers = Sets.newHashSet("100000", "2", "1000", "100", "77", "10", "9", "2000000", "-10000", "0");
+        Assert.assertEquals("2000000", dataTypeOrder.max(integers));
+        Assert.assertEquals("-10000", dataTypeOrder.min(integers));
+
+        DataType doubleType = DataType.getType("double");
+        dataTypeOrder = doubleType.getOrder();
+        Set<String> doubels = Sets.newHashSet("1.1", "-299.5", "100000", "1.000", "4.000000001", "0.00", "-1000000.231231", "8000000",
+                "10", "10.00");
+        Assert.assertEquals("8000000", dataTypeOrder.max(doubels));
+        Assert.assertEquals("-1000000.231231", dataTypeOrder.min(doubels));
+
+        DataType datetimeType = DataType.getType("date");
+        dataTypeOrder = datetimeType.getOrder();
+        Set<String> datetimes = Sets.newHashSet("2010-01-02", "2888-08-09", "2018-05-26", "1527512082000", "2010-02-03 23:59:59",
+                "2000-12-12 12:00:00", "1970-01-19 00:18:32", "1998-12-02", "2018-05-28 10:00:00.255", "1995-09-20 20:00:00.220");
+        Assert.assertEquals("2888-08-09", dataTypeOrder.max(datetimes));
+        Assert.assertEquals("1970-01-19 00:18:32", dataTypeOrder.min(datetimes));
+
+        DataType stringType = new DataType("varchar", 256, 10);
+        dataTypeOrder = stringType.getOrder();
+        Set<String> strings = Sets.newHashSet(null, "", "中国", "China No.1", "神兽麒麟", "Rocket", "Apache Kylin", "google", "NULL",
+                "empty");
+        Assert.assertEquals("神兽麒麟", dataTypeOrder.max(strings));
+        Assert.assertEquals("", dataTypeOrder.min(strings));
+    }
+}
diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/filter/TupleFilterTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/filter/TupleFilterTest.java
index e17f4d1..95609eb 100644
--- a/core-metadata/src/test/java/org/apache/kylin/metadata/filter/TupleFilterTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/metadata/filter/TupleFilterTest.java
@@ -18,9 +18,21 @@
 
 package org.apache.kylin.metadata.filter;
 
+import static org.apache.kylin.metadata.filter.TupleFilter.and;
+import static org.apache.kylin.metadata.filter.TupleFilter.compare;
+import static org.apache.kylin.metadata.filter.TupleFilter.not;
+
+import java.util.HashMap;
+import java.util.Set;
+
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.google.common.collect.Sets;
+
 public class TupleFilterTest {
     @Test
     // true ==> true
@@ -62,4 +74,69 @@ public class TupleFilterTest {
         andFilter2.addChildren(ConstantTupleFilter.FALSE, ConstantTupleFilter.FALSE);
         Assert.assertEquals(andFilter2, andFilter.removeNot());
     }
+    
+    @Test
+    public void testFindMustEqualColsAndValues() {
+        TableDesc tbl = TableDesc.mockup("mockup_table");
+        TblColRef colA = TblColRef.mockup(tbl, 0, "A", "bigint");
+        TblColRef colB = TblColRef.mockup(tbl, 1, "B", "char(256)");
+        Set<TblColRef> cols = Sets.newHashSet(colA, colB);
+        
+        {
+            TupleFilter f = compare(colA, FilterOperatorEnum.EQ, "1234");
+            Assert.assertEquals(map(colA, "1234"), f.findMustEqualColsAndValues(cols));
+        }
+        
+        {
+            TupleFilter f = compare(colA, FilterOperatorEnum.ISNULL);
+            Assert.assertEquals(map(colA, null), f.findMustEqualColsAndValues(cols));
+        }
+        
+        {
+            TupleFilter f = and(compare(colA, FilterOperatorEnum.ISNULL), compare(colB, FilterOperatorEnum.EQ, "1234"));
+            Assert.assertEquals(map(colA, null, colB, "1234"), f.findMustEqualColsAndValues(cols));
+            Assert.assertTrue(not(f).findMustEqualColsAndValues(cols).isEmpty());
+        }
+        
+        {
+            TupleFilter f = compare(colA, FilterOperatorEnum.LT, "1234");
+            Assert.assertTrue(f.findMustEqualColsAndValues(cols).isEmpty());
+        }
+    }
+
+    private HashMap<TblColRef, Object> map(TblColRef col, String v) {
+        HashMap<TblColRef, Object> r = new HashMap<>();
+        r.put(col, v);
+        return r;
+    }
+    
+    private HashMap<TblColRef, Object> map(TblColRef col, String v, TblColRef col2, String v2) {
+        HashMap<TblColRef, Object> r = new HashMap<>();
+        r.put(col, v);
+        r.put(col2, v2);
+        return r;
+    }
+
+    @Test
+    public void testMustTrueTupleFilter() {
+        TupleFilter andFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
+        TupleFilter andFilter2  = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
+        TupleFilter orFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.OR);
+        andFilter.addChild(andFilter2);
+        andFilter.addChild(orFilter);
+
+        Set<CompareTupleFilter> trueTupleFilters = andFilter.findMustTrueCompareFilters();
+        Assert.assertTrue(trueTupleFilters.isEmpty());
+
+        TupleFilter compFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
+        compFilter.addChild(new ColumnTupleFilter(TblColRef.newInnerColumn("test1", TblColRef.InnerDataTypeEnum.LITERAL)));
+        TupleFilter compFilter2 = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
+        compFilter2.addChild(new ColumnTupleFilter(TblColRef.newInnerColumn("test2", TblColRef.InnerDataTypeEnum.LITERAL)));
+        andFilter2.addChild(compFilter);
+        orFilter.addChild(compFilter2);
+        Assert.assertEquals(Sets.newHashSet(compFilter), andFilter.findMustTrueCompareFilters());
+        
+        Assert.assertEquals(Sets.newHashSet(compFilter2), compFilter2.findMustTrueCompareFilters());
+    }
+    
 }
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
index 5f86a45..229ef01 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
@@ -31,7 +31,6 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.common.FuzzyValueCombination;
 import org.apache.kylin.cube.cuboid.Cuboid;
@@ -39,7 +38,6 @@ import org.apache.kylin.cube.gridtable.CubeGridTable;
 import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
 import org.apache.kylin.cube.gridtable.RecordComparators;
 import org.apache.kylin.cube.gridtable.ScanRangePlannerBase;
-import org.apache.kylin.cube.gridtable.SegmentGTStartAndEnd;
 import org.apache.kylin.cube.kv.CubeDimEncMap;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.gridtable.GTInfo;
@@ -137,15 +135,9 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
         this.gtDynColumns = new ImmutableBitSet(tmpGtDynCols);
         this.gtRtAggrMetrics = mapping.makeGridTableColumns(tmpRtAggrMetrics);
 
-        if (cubeSegment.getModel().getPartitionDesc().isPartitioned()) {
-            int index = mapping.getIndexOf(cubeSegment.getModel().getPartitionDesc().getPartitionDateColumnRef());
-            if (index >= 0) {
-                SegmentGTStartAndEnd segmentGTStartAndEnd = new SegmentGTStartAndEnd(cubeSegment, gtInfo);
-                this.gtStartAndEnd = segmentGTStartAndEnd.getSegmentStartAndEnd(index);
-                this.isPartitionColUsingDatetimeEncoding = segmentGTStartAndEnd.isUsingDatetimeEncoding(index);
-                this.gtPartitionCol = gtInfo.colRef(index);
-            }
-        }
+        this.gtAggrGroups = mapping.makeGridTableColumns(replaceDerivedColumns(groupByPushDown, cubeSegment.getCubeDesc()));
+        this.gtAggrMetrics = mapping.makeGridTableColumns(metrics);
+        this.gtAggrFuncs = mapping.makeAggrFuncs(metrics);
     }
 
     protected StorageContext context;
@@ -153,7 +145,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
     /**
      * Construct  GTScanRangePlanner with incomplete information. For UT only.
      */
-    public CubeScanRangePlanner(GTInfo info, Pair<ByteArray, ByteArray> gtStartAndEnd, TblColRef gtPartitionCol, TupleFilter gtFilter) {
+    public CubeScanRangePlanner(GTInfo info, TblColRef gtPartitionCol, TupleFilter gtFilter) {
 
         this.maxScanRanges = KylinConfig.getInstanceFromEnv().getQueryStorageVisitScanRangeMax();
         this.maxFuzzyKeysPerSplit = KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeyMax();
@@ -170,8 +162,6 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
         this.rangeStartEndComparator = RecordComparators.getRangeStartEndComparator(comp);
 
         this.gtFilter = gtFilter;
-        this.gtStartAndEnd = gtStartAndEnd;
-        this.gtPartitionCol = gtPartitionCol;
     }
 
     public GTScanRequest planScanRequest() {
@@ -237,18 +227,6 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
         Map<Integer, Set<ByteArray>> fuzzyValues = Maps.newHashMap();
 
         for (ColumnRange range : andDimRanges) {
-            if (gtPartitionCol != null && range.column.equals(gtPartitionCol)) {
-                int beginCompare = rangeStartEndComparator.comparator.compare(range.begin, gtStartAndEnd.getSecond());
-                int endCompare = rangeStartEndComparator.comparator.compare(gtStartAndEnd.getFirst(), range.end);
-
-                if ((isPartitionColUsingDatetimeEncoding && endCompare <= 0 && beginCompare < 0) || (!isPartitionColUsingDatetimeEncoding && endCompare <= 0 && beginCompare <= 0)) {
-                    //segment range is [Closed,Open), but segmentStartAndEnd.getSecond() might be rounded when using dict encoding, so use <= when has equals in condition. 
-                } else {
-                    logger.debug("Pre-check partition col filter failed, partitionColRef {}, segment start {}, segment end {}, range begin {}, range end {}", //
-                            gtPartitionCol, makeReadable(gtStartAndEnd.getFirst()), makeReadable(gtStartAndEnd.getSecond()), makeReadable(range.begin), makeReadable(range.end));
-                    return null;
-                }
-            }
 
             int col = range.column.getColumnDesc().getZeroBasedIndex();
             if (!gtInfo.getPrimaryKey().get(col))
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 352a868..269833f 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -33,6 +33,7 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.RawQueryLastHacker;
+import org.apache.kylin.cube.common.SegmentPruner;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
 import org.apache.kylin.cube.gridtable.CuboidToGridTableMappingExt;
@@ -87,14 +88,10 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         GTCubeStorageQueryRequest request = getStorageQueryRequest(context, sqlDigest, returnTupleInfo);
 
         List<CubeSegmentScanner> scanners = Lists.newArrayList();
-        for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
+        SegmentPruner segPruner = new SegmentPruner(sqlDigest.filter);
+        for (CubeSegment cubeSeg : segPruner.listSegmentsForQuery(cubeInstance)) {
             CubeSegmentScanner scanner;
 
-            if (cubeDesc.getConfig().isSkippingEmptySegments() && cubeSeg.getInputRecords() == 0) {
-                logger.info("Skip cube segment {} because its input record is 0", cubeSeg);
-                continue;
-            }
-
             scanner = new CubeSegmentScanner(cubeSeg, request.getCuboid(), request.getDimensions(), //
                     request.getGroups(), request.getDynGroups(), request.getDynGroupExprs(), //
                     request.getMetrics(), request.getDynFuncs(), //
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java
deleted file mode 100644
index 56b1106..0000000
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.translate;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.cube.kv.RowKeyColumnOrder;
-import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.collect.Sets;
-
-/**
- * 
- * @author xjiang
- * 
- */
-public class ColumnValueRange {
-    private TblColRef column;
-    private RowKeyColumnOrder order;
-    private String beginValue;
-    private String endValue;
-    private Set<String> equalValues;
-
-    public ColumnValueRange(TblColRef column, Collection<String> values, FilterOperatorEnum op) {
-        this.column = column;
-        this.order = RowKeyColumnOrder.getInstance(column.getType());
-
-        switch (op) {
-        case EQ:
-        case IN:
-            equalValues = new HashSet<String>(values);
-            refreshBeginEndFromEquals();
-            break;
-        case LT:
-        case LTE:
-            endValue = order.max(values);
-            break;
-        case GT:
-        case GTE:
-            beginValue = order.min(values);
-            break;
-        case NEQ:
-        case NOTIN:
-        case ISNULL: // TODO ISNULL worth pass down as a special equal value
-        case ISNOTNULL:
-            // let Optiq filter it!
-            break;
-        default:
-            throw new UnsupportedOperationException(op.name());
-        }
-    }
-
-    public ColumnValueRange(TblColRef column, String beginValue, String endValue, Set<String> equalValues) {
-        copy(column, beginValue, endValue, equalValues);
-    }
-
-    void copy(TblColRef column, String beginValue, String endValue, Set<String> equalValues) {
-        this.column = column;
-        this.order = RowKeyColumnOrder.getInstance(column.getType());
-        this.beginValue = beginValue;
-        this.endValue = endValue;
-        this.equalValues = equalValues;
-    }
-
-    public TblColRef getColumn() {
-        return column;
-    }
-
-    public String getBeginValue() {
-        return beginValue;
-    }
-
-    public String getEndValue() {
-        return endValue;
-    }
-
-    public Set<String> getEqualValues() {
-        return equalValues;
-    }
-
-    private void refreshBeginEndFromEquals() {
-        this.beginValue = order.min(this.equalValues);
-        this.endValue = order.max(this.equalValues);
-    }
-
-    public boolean satisfyAll() {
-        return beginValue == null && endValue == null && equalValues == null; // the NEQ case
-    }
-
-    public boolean satisfyNone() {
-        if (equalValues != null) {
-            return equalValues.isEmpty();
-        } else if (beginValue != null && endValue != null) {
-            return order.compare(beginValue, endValue) > 0;
-        } else {
-            return false;
-        }
-    }
-
-    public void andMerge(ColumnValueRange another) {
-        assert this.column.equals(another.column);
-
-        if (another.satisfyAll()) {
-            return;
-        }
-
-        if (this.satisfyAll()) {
-            copy(another.column, another.beginValue, another.endValue, another.equalValues);
-            return;
-        }
-
-        if (this.equalValues != null && another.equalValues != null) {
-            this.equalValues.retainAll(another.equalValues);
-            refreshBeginEndFromEquals();
-            return;
-        }
-
-        if (this.equalValues != null) {
-            this.equalValues = filter(this.equalValues, another.beginValue, another.endValue);
-            refreshBeginEndFromEquals();
-            return;
-        }
-
-        if (another.equalValues != null) {
-            this.equalValues = filter(another.equalValues, this.beginValue, this.endValue);
-            refreshBeginEndFromEquals();
-            return;
-        }
-
-        this.beginValue = order.max(this.beginValue, another.beginValue);
-        this.endValue = order.min(this.endValue, another.endValue);
-    }
-
-    private Set<String> filter(Set<String> equalValues, String beginValue, String endValue) {
-        Set<String> result = Sets.newHashSetWithExpectedSize(equalValues.size());
-        for (String v : equalValues) {
-            if (between(v, beginValue, endValue)) {
-                result.add(v);
-            }
-        }
-        return equalValues;
-    }
-
-    private boolean between(String v, String beginValue, String endValue) {
-        return (beginValue == null || order.compare(beginValue, v) <= 0) && (endValue == null || order.compare(v, endValue) <= 0);
-    }
-
-    // remove invalid EQ/IN values and round start/end according to dictionary
-    public void preEvaluateWithDict(Dictionary<String> dict) {
-        if (dict == null || dict.getSize() == 0)
-            return;
-
-        if (equalValues != null) {
-            Iterator<String> it = equalValues.iterator();
-            while (it.hasNext()) {
-                String v = it.next();
-                try {
-                    dict.getIdFromValue(v);
-                } catch (IllegalArgumentException e) {
-                    // value not in dictionary
-                    it.remove();
-                }
-            }
-            refreshBeginEndFromEquals();
-        }
-
-        if (beginValue != null) {
-            try {
-                beginValue = dict.getValueFromId(dict.getIdFromValue(beginValue, 1));
-            } catch (IllegalArgumentException e) {
-                // beginValue is greater than the biggest in dictionary, mark FALSE
-                equalValues = Sets.newHashSet();
-            }
-        }
-
-        if (endValue != null) {
-            try {
-                endValue = dict.getValueFromId(dict.getIdFromValue(endValue, -1));
-            } catch (IllegalArgumentException e) {
-                // endValue is lesser than the smallest in dictionary, mark FALSE
-                equalValues = Sets.newHashSet();
-            }
-        }
-    }
-
-    public String toString() {
-        if (equalValues == null) {
-            return column.getName() + " between " + beginValue + " and " + endValue;
-        } else {
-            return column.getName() + " in " + equalValues;
-        }
-    }
-}
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
index 7fa426f..4a80d29 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
@@ -24,10 +24,10 @@ import java.util.Set;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Array;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.kv.RowKeyColumnOrder;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.cube.model.CubeDesc.DeriveType;
 import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.metadata.datatype.DataTypeOrder;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.ConstantTupleFilter;
@@ -164,9 +164,9 @@ public class DerivedFilterTranslator {
 
     private static void findMinMax(Set<Array<String>> satisfyingHostRecords, TblColRef[] hostCols, String[] min, String[] max) {
 
-        RowKeyColumnOrder[] orders = new RowKeyColumnOrder[hostCols.length];
+        DataTypeOrder[] orders = new DataTypeOrder[hostCols.length];
         for (int i = 0; i < hostCols.length; i++) {
-            orders[i] = RowKeyColumnOrder.getInstance(hostCols[i].getType());
+            orders[i] = hostCols[i].getType().getOrder();
         }
 
         for (Array<String> rec : satisfyingHostRecords) {
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
deleted file mode 100644
index 85678ac..0000000
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.translate;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.kylin.common.debug.BackdoorToggles;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
-import org.apache.kylin.cube.kv.FuzzyKeyEncoder;
-import org.apache.kylin.cube.kv.FuzzyMaskEncoder;
-import org.apache.kylin.cube.kv.LazyRowKeyEncoder;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * @author xjiang
- */
-public class HBaseKeyRange implements Comparable<HBaseKeyRange> {
-
-    private static final Logger logger = LoggerFactory.getLogger(HBaseKeyRange.class);
-
-    private static final int FUZZY_VALUE_CAP = 100;
-    private static final byte[] ZERO_TAIL_BYTES = new byte[] { 0 };
-
-    private final CubeSegment cubeSeg;
-    private final Cuboid cuboid;
-    private final List<Collection<ColumnValueRange>> flatOrAndFilter; // OR-AND filter, (A AND B AND ..) OR (C AND D AND ..) OR ..
-
-    private byte[] startKey;
-    private byte[] stopKey;
-    private List<Pair<byte[], byte[]>> fuzzyKeys;
-
-    private String startKeyString;
-    private String stopKeyString;
-    private String fuzzyKeyString;
-
-    private long partitionColumnStartDate = Long.MIN_VALUE;
-    private long partitionColumnEndDate = Long.MAX_VALUE;
-
-    public HBaseKeyRange(CubeSegment cubeSeg, Cuboid cuboid, byte[] startKey, byte[] stopKey, List<Pair<byte[], byte[]>> fuzzyKeys, List<Collection<ColumnValueRange>> flatColumnValueFilter, long partitionColumnStartDate, long partitionColumnEndDate) {
-        this.cubeSeg = cubeSeg;
-        this.cuboid = cuboid;
-        this.startKey = startKey;
-        this.stopKey = stopKey;
-        this.fuzzyKeys = fuzzyKeys;
-        this.flatOrAndFilter = flatColumnValueFilter;
-        this.partitionColumnStartDate = partitionColumnStartDate;
-        this.partitionColumnEndDate = partitionColumnEndDate;
-        initDebugString();
-    }
-
-    public HBaseKeyRange(Collection<TblColRef> dimensionColumns, Collection<ColumnValueRange> andDimensionRanges, CubeSegment cubeSeg, CubeDesc cubeDesc) {
-        this.cubeSeg = cubeSeg;
-        long cuboidId = this.calculateCuboidID(cubeDesc, dimensionColumns);
-        this.cuboid = Cuboid.findById(cubeSeg, cuboidId);
-        this.flatOrAndFilter = Lists.newLinkedList();
-        this.flatOrAndFilter.add(andDimensionRanges);
-        init(andDimensionRanges);
-        initDebugString();
-    }
-
-    private long calculateCuboidID(CubeDesc cube, Collection<TblColRef> dimensions) {
-        long cuboidID = 0;
-        for (TblColRef column : dimensions) {
-            int index = cube.getRowkey().getColumnBitIndex(column);
-            cuboidID |= 1L << index;
-        }
-        return cuboidID;
-    }
-
-    private void init(Collection<ColumnValueRange> andDimensionRanges) {
-        int size = andDimensionRanges.size();
-        Map<TblColRef, String> startValues = Maps.newHashMapWithExpectedSize(size);
-        Map<TblColRef, String> stopValues = Maps.newHashMapWithExpectedSize(size);
-        Map<TblColRef, Set<String>> fuzzyValues = Maps.newHashMapWithExpectedSize(size);
-        for (ColumnValueRange dimRange : andDimensionRanges) {
-            TblColRef column = dimRange.getColumn();
-            startValues.put(column, dimRange.getBeginValue());
-            stopValues.put(column, dimRange.getEndValue());
-            fuzzyValues.put(column, dimRange.getEqualValues());
-
-            TblColRef partitionDateColumnRef = cubeSeg.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
-            if (column.equals(partitionDateColumnRef)) {
-                initPartitionRange(dimRange);
-            }
-        }
-
-        AbstractRowKeyEncoder encoder = new LazyRowKeyEncoder(cubeSeg, cuboid);
-        encoder.setBlankByte(RowConstants.ROWKEY_LOWER_BYTE);
-        this.startKey = encoder.encode(startValues);
-        encoder.setBlankByte(RowConstants.ROWKEY_UPPER_BYTE);
-        // In order to make stopRow inclusive add a trailing 0 byte. #See Scan.setStopRow(byte [] stopRow)
-        this.stopKey = Bytes.add(encoder.encode(stopValues), ZERO_TAIL_BYTES);
-
-        // always fuzzy match cuboid ID to lock on the selected cuboid
-        this.fuzzyKeys = buildFuzzyKeys(fuzzyValues);
-    }
-
-    private void initPartitionRange(ColumnValueRange dimRange) {
-        if (null != dimRange.getBeginValue()) {
-            this.partitionColumnStartDate = DateFormat.stringToMillis(dimRange.getBeginValue());
-        }
-        if (null != dimRange.getEndValue()) {
-            this.partitionColumnEndDate = DateFormat.stringToMillis(dimRange.getEndValue());
-        }
-    }
-
-    private void initDebugString() {
-        this.startKeyString = BytesUtil.toHex(this.startKey);
-        this.stopKeyString = BytesUtil.toHex(this.stopKey);
-        StringBuilder buf = new StringBuilder();
-        for (Pair<byte[], byte[]> fuzzyKey : this.fuzzyKeys) {
-            buf.append(BytesUtil.toHex(fuzzyKey.getFirst()));
-            buf.append(" ");
-            buf.append(BytesUtil.toHex(fuzzyKey.getSecond()));
-            buf.append(";");
-        }
-        this.fuzzyKeyString = buf.toString();
-    }
-
-    private List<Pair<byte[], byte[]>> buildFuzzyKeys(Map<TblColRef, Set<String>> fuzzyValueSet) {
-        ArrayList<Pair<byte[], byte[]>> result = new ArrayList<Pair<byte[], byte[]>>();
-
-        // debug/profiling purpose
-        if (BackdoorToggles.getDisableFuzzyKey()) {
-            logger.info("The execution of this query will not use fuzzy key");
-            return result;
-        }
-
-        FuzzyKeyEncoder fuzzyKeyEncoder = new FuzzyKeyEncoder(cubeSeg, cuboid);
-        FuzzyMaskEncoder fuzzyMaskEncoder = new FuzzyMaskEncoder(cubeSeg, cuboid);
-
-        List<Map<TblColRef, String>> fuzzyValues = FuzzyValueCombination.calculate(fuzzyValueSet, FUZZY_VALUE_CAP);
-        for (Map<TblColRef, String> fuzzyValue : fuzzyValues) {
-            result.add(Pair.newPair(fuzzyKeyEncoder.encode(fuzzyValue), fuzzyMaskEncoder.encode(fuzzyValue)));
-        }
-        return result;
-    }
-
-    public CubeSegment getCubeSegment() {
-        return this.cubeSeg;
-    }
-
-    public Cuboid getCuboid() {
-        return cuboid;
-    }
-
-    public byte[] getStartKey() {
-        return startKey;
-    }
-
-    public byte[] getStopKey() {
-        return stopKey;
-    }
-
-    public List<Pair<byte[], byte[]>> getFuzzyKeys() {
-        return fuzzyKeys;
-    }
-
-    public String getStartKeyAsString() {
-        return startKeyString;
-    }
-
-    public String getStopKeyAsString() {
-        return stopKeyString;
-    }
-
-    public String getFuzzyKeyAsString() {
-        return fuzzyKeyString;
-    }
-
-    public List<Collection<ColumnValueRange>> getFlatOrAndFilter() {
-        return flatOrAndFilter;
-    }
-
-    public long getPartitionColumnStartDate() {
-        return partitionColumnStartDate;
-    }
-
-    public long getPartitionColumnEndDate() {
-        return partitionColumnEndDate;
-    }
-
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + ((cubeSeg == null) ? 0 : cubeSeg.hashCode());
-        result = prime * result + ((cuboid == null) ? 0 : cuboid.hashCode());
-        result = prime * result + ((fuzzyKeyString == null) ? 0 : fuzzyKeyString.hashCode());
-        result = prime * result + ((startKeyString == null) ? 0 : startKeyString.hashCode());
-        result = prime * result + ((stopKeyString == null) ? 0 : stopKeyString.hashCode());
-        return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        HBaseKeyRange other = (HBaseKeyRange) obj;
-        if (cubeSeg == null) {
-            if (other.cubeSeg != null)
-                return false;
-        } else if (!cubeSeg.equals(other.cubeSeg))
-            return false;
-        if (cuboid == null) {
-            if (other.cuboid != null)
-                return false;
-        } else if (!cuboid.equals(other.cuboid))
-            return false;
-        if (fuzzyKeyString == null) {
-            if (other.fuzzyKeyString != null)
-                return false;
-        } else if (!fuzzyKeyString.equals(other.fuzzyKeyString))
-            return false;
-        if (startKeyString == null) {
-            if (other.startKeyString != null)
-                return false;
-        } else if (!startKeyString.equals(other.startKeyString))
-            return false;
-        if (stopKeyString == null) {
-            if (other.stopKeyString != null)
-                return false;
-        } else if (!stopKeyString.equals(other.stopKeyString))
-            return false;
-        return true;
-    }
-
-    @Override
-    public int compareTo(HBaseKeyRange other) {
-        return Bytes.compareTo(this.startKey, other.startKey);
-    }
-
-    public boolean hitSegment() {
-        return cubeSeg.getTSRange().start.v <= getPartitionColumnEndDate() && cubeSeg.getTSRange().end.v >= getPartitionColumnStartDate();
-    }
-}
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
index 073c12c..08bcb65 100644
--- a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
+++ b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
@@ -32,7 +32,6 @@ import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.gridtable.CubeCodeSystem;
 import org.apache.kylin.dict.NumberDictionaryForestBuilder;
 import org.apache.kylin.dict.StringBytesConverter;
@@ -121,13 +120,11 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
 
         ByteArray segmentStart = enc(info, 0, "2015-01-14");
         ByteArray segmentStartX = enc(info, 0, "2015-01-14 00:00:00");//when partition col is dict encoded, time format will be free
-        ByteArray segmentEnd = enc(info, 0, "2015-01-15");
         assertEquals(segmentStart, segmentStartX);
 
         {
             LogicalTupleFilter filter = and(timeComp0, ageComp1);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals(1, r.size());//scan range are [close,close]
             assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
@@ -136,64 +133,57 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         }
         {
             LogicalTupleFilter filter = and(timeComp2, ageComp1);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(0, r.size());
+            assertEquals(1, r.size());
         }
         {
             LogicalTupleFilter filter = and(timeComp4, ageComp1);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(0, r.size());
+            assertEquals(1, r.size());
         }
         {
             LogicalTupleFilter filter = and(timeComp5, ageComp1);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(0, r.size());
+            assertEquals(1, r.size());
         }
         {
             LogicalTupleFilter filter = or(and(timeComp2, ageComp1), and(timeComp1, ageComp1),
                     and(timeComp6, ageComp1));
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(1, r.size());
-            assertEquals("[1421193600000, 10]-[null, 10]", r.get(0).toString());
+            assertEquals(2, r.size());
+            assertEquals("[1421193600000, 10]-[null, 10]", r.get(1).toString());
             assertEquals("[[null, 10, null, null, null], [1421193600000, 10, null, null, null]]",
-                    r.get(0).fuzzyKeys.toString());
+                    r.get(1).fuzzyKeys.toString());
         }
         {
             LogicalTupleFilter filter = or(and(timeComp3, ageComp3), and(timeComp7, ageComp1));
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals("[[0, 10]-[1421280000000, 30]]", r.toString());
         }
         {
             LogicalTupleFilter filter = or(timeComp2, timeComp1, timeComp6);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(1, r.size());
-            assertEquals("[1421193600000, null]-[null, null]", r.get(0).toString());
-            assertEquals(0, r.get(0).fuzzyKeys.size());
+            assertEquals(2, r.size());
+            assertEquals("[1421193600000, null]-[null, null]", r.get(1).toString());
+            assertEquals(0, r.get(1).fuzzyKeys.size());
         }
         {
             //skip FALSE filter
             LogicalTupleFilter filter = and(ageComp1, ConstantTupleFilter.FALSE);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals(0, r.size());
         }
         {
             //TRUE or FALSE filter
             LogicalTupleFilter filter = or(ConstantTupleFilter.TRUE, ConstantTupleFilter.FALSE);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals(1, r.size());
             assertEquals("[null, null]-[null, null]", r.get(0).toString());
@@ -201,8 +191,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         {
             //TRUE or other filter
             LogicalTupleFilter filter = or(ageComp1, ConstantTupleFilter.TRUE);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals(1, r.size());
             assertEquals("[null, null]-[null, null]", r.get(0).toString());
@@ -211,12 +200,9 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
 
     @Test
     public void verifySegmentSkipping2() {
-        ByteArray segmentEnd = enc(info, 0, "2015-01-15");
-
         {
             LogicalTupleFilter filter = and(timeComp0, ageComp1);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals(1, r.size());//scan range are [close,close]
             assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
@@ -226,10 +212,9 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
 
         {
             LogicalTupleFilter filter = and(timeComp5, ageComp1);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(0, r.size());//scan range are [close,close]
+            assertEquals(1, r.size());//scan range are [close,close]
         }
     }
 
@@ -239,7 +224,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         // flatten or-and & hbase fuzzy value
         {
             LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2));
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals(1, r.size());
             assertEquals("[1421193600000, 10]-[null, 20]", r.get(0).toString());
@@ -249,7 +234,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         // pre-evaluate ever false
         {
             LogicalTupleFilter filter = and(timeComp1, timeComp2);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals(0, r.size());
         }
@@ -257,7 +242,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         // pre-evaluate ever true
         {
             LogicalTupleFilter filter = or(timeComp1, ageComp4);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals("[[null, null]-[null, null]]", r.toString());
         }
@@ -265,7 +250,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         // merge overlap range
         {
             LogicalTupleFilter filter = or(timeComp1, timeComp3);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals("[[null, null]-[null, null]]", r.toString());
         }
@@ -274,7 +259,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         {
             LogicalTupleFilter filter = or(and(timeComp4, ageComp1), and(timeComp4, ageComp2),
                     and(timeComp4, ageComp3));
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals(3, r.size());
             assertEquals("[1421280000000, 10]-[1421280000000, 10]", r.get(0).toString());
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/translate/ColumnValueRangeTest.java b/core-storage/src/test/java/org/apache/kylin/storage/translate/ColumnValueRangeTest.java
deleted file mode 100644
index 0e7e91f..0000000
--- a/core-storage/src/test/java/org/apache/kylin/storage/translate/ColumnValueRangeTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.translate;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.dict.StringBytesConverter;
-import org.apache.kylin.dict.TrieDictionaryBuilder;
-import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class ColumnValueRangeTest extends LocalFileMetadataTestCase {
-
-    @BeforeClass
-    public static void setUp() throws Exception {
-        staticCreateTestMetadata();
-    }
-
-    @AfterClass
-    public static void after() throws Exception {
-        cleanAfterClass();
-    }
-
-    @Test
-    public void testPreEvaluateWithDict() {
-        TblColRef col = mockupTblColRef();
-        Dictionary<String> dict = mockupDictionary(col, "CN", "US");
-
-        ColumnValueRange r1 = new ColumnValueRange(col, set("CN", "US", "Other"), FilterOperatorEnum.EQ);
-        r1.preEvaluateWithDict(dict);
-        assertEquals(set("CN", "US"), r1.getEqualValues());
-
-        // less than rounding
-        {
-            ColumnValueRange r2 = new ColumnValueRange(col, set("CN"), FilterOperatorEnum.LT);
-            r2.preEvaluateWithDict(dict);
-            assertEquals(null, r2.getBeginValue());
-            assertEquals("CN", r2.getEndValue());
-
-            ColumnValueRange r3 = new ColumnValueRange(col, set("Other"), FilterOperatorEnum.LT);
-            r3.preEvaluateWithDict(dict);
-            assertEquals(null, r3.getBeginValue());
-            assertEquals("CN", r3.getEndValue());
-
-            ColumnValueRange r4 = new ColumnValueRange(col, set("UT"), FilterOperatorEnum.LT);
-            r4.preEvaluateWithDict(dict);
-            assertEquals(null, r4.getBeginValue());
-            assertEquals("US", r4.getEndValue());
-        }
-
-        // greater than rounding
-        {
-            ColumnValueRange r2 = new ColumnValueRange(col, set("CN"), FilterOperatorEnum.GTE);
-            r2.preEvaluateWithDict(dict);
-            assertEquals("CN", r2.getBeginValue());
-            assertEquals(null, r2.getEndValue());
-
-            ColumnValueRange r3 = new ColumnValueRange(col, set("Other"), FilterOperatorEnum.GTE);
-            r3.preEvaluateWithDict(dict);
-            assertEquals("US", r3.getBeginValue());
-            assertEquals(null, r3.getEndValue());
-
-            ColumnValueRange r4 = new ColumnValueRange(col, set("CI"), FilterOperatorEnum.GTE);
-            r4.preEvaluateWithDict(dict);
-            assertEquals("CN", r4.getBeginValue());
-            assertEquals(null, r4.getEndValue());
-        }
-
-        // ever false check
-        {
-            ColumnValueRange r2 = new ColumnValueRange(col, set("UT"), FilterOperatorEnum.GTE);
-            r2.preEvaluateWithDict(dict);
-            assertTrue(r2.satisfyNone());
-
-            ColumnValueRange r3 = new ColumnValueRange(col, set("CM"), FilterOperatorEnum.LT);
-            r3.preEvaluateWithDict(dict);
-            assertTrue(r3.satisfyNone());
-        }
-    }
-
-    public static Dictionary<String> mockupDictionary(TblColRef col, String... values) {
-        TrieDictionaryBuilder<String> builder = new TrieDictionaryBuilder<String>(new StringBytesConverter());
-        for (String v : values) {
-            builder.addValue(v);
-        }
-        return builder.build(0);
-    }
-
-    private static Set<String> set(String... values) {
-        HashSet<String> list = new HashSet<String>();
-        list.addAll(Arrays.asList(values));
-        return list;
-    }
-
-    public static TblColRef mockupTblColRef() {
-        TableDesc t = TableDesc.mockup("table_a");
-        return TblColRef.mockup(t, 1, "col_1", "string");
-    }
-}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
index 40f1ac5..5dd55b2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
@@ -43,10 +43,10 @@ import com.google.common.collect.Sets;
 
 /**
  */
+@SuppressWarnings("serial")
 public class BaseCuboidBuilder implements java.io.Serializable {
 
     protected static final Logger logger = LoggerFactory.getLogger(BaseCuboidBuilder.class);
-    public static final String HIVE_NULL = "\\N";
     protected String cubeName;
     protected Cuboid baseCuboid;
     protected CubeDesc cubeDesc;
@@ -95,7 +95,6 @@ public class BaseCuboidBuilder implements java.io.Serializable {
 
     private void initNullBytes() {
         nullStrs = Sets.newHashSet();
-        nullStrs.add(HIVE_NULL);
         String[] nullStrings = cubeDesc.getNullStrings();
         if (nullStrings != null) {
             for (String s : nullStrings) {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index 0ad4b9e..091f9a2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -18,6 +18,10 @@
 
 package org.apache.kylin.engine.mr.steps;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
@@ -34,15 +38,10 @@ import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
 /**
  */
 abstract public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, Text, Text> {
     protected static final Logger logger = LoggerFactory.getLogger(BaseCuboidMapperBase.class);
-    public static final String HIVE_NULL = "\\N";
     public static final byte[] ONE = Bytes.toBytes("1");
     protected String cubeName;
     protected String segmentID;
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
index f3bdabd..4305a25 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
@@ -104,7 +104,6 @@ public class CalculateStatsFromBaseCuboidJob extends AbstractHadoopJob {
 
     private void setupReducer(Path output, CubeSegment cubeSeg) throws IOException {
         int hllShardBase = MapReduceUtil.getCuboidHLLCounterReducerNum(cubeSeg.getCubeInstance());
-        job.getConfiguration().setInt(BatchConstants.CFG_HLL_REDUCER_NUM, hllShardBase);
 
         job.setReducerClass(CalculateStatsFromBaseCuboidReducer.class);
         job.setOutputFormatClass(SequenceFileOutputFormat.class);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
index 9bede82..4ea04d5 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
@@ -55,8 +55,7 @@ public class FactDistinctColumnPartitioner extends Partitioner<SelfDefineSortabl
         String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
         CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
 
-        reducerMapping = new FactDistinctColumnsReducerMapping(cube,
-                conf.getInt(BatchConstants.CFG_HLL_REDUCER_NUM, 1));
+        reducerMapping = new FactDistinctColumnsReducerMapping(cube);
     }
 
     @Override
@@ -65,8 +64,6 @@ public class FactDistinctColumnPartitioner extends Partitioner<SelfDefineSortabl
         if (key.getBytes()[0] == FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER) {
             Long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG);
             return reducerMapping.getReducerIdForCuboidRowCount(cuboidId);
-        } else if (key.getBytes()[0] == FactDistinctColumnsReducerMapping.MARK_FOR_PARTITION_COL) {
-            return reducerMapping.getReducerIdForDatePartitionColumn();
         } else {
             return BytesUtil.readUnsigned(key.getBytes(), 0, 1);
         }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index f96944a..8f5d176 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -131,6 +131,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
             throws IOException {
         FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(cubeSeg.getCubeInstance());
         int numberOfReducers = reducerMapping.getTotalReducerNum();
+        logger.info("{} has reducers {}.", this.getClass().getName(), numberOfReducers);
         if (numberOfReducers > 250) {
             throw new IllegalArgumentException(
                     "The max reducer number for FactDistinctColumnsJob is 250, but now it is "
@@ -141,7 +142,6 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
         job.setReducerClass(FactDistinctColumnsReducer.class);
         job.setPartitionerClass(FactDistinctColumnPartitioner.class);
         job.setNumReduceTasks(numberOfReducers);
-        job.getConfiguration().setInt(BatchConstants.CFG_HLL_REDUCER_NUM, reducerMapping.getCuboidRowCounterReducerNum());
 
         // make each reducer output to respective dir
         MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class, NullWritable.class, Text.class);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
index 569b810..fc9dc65 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
@@ -37,7 +37,6 @@ import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.hllc.HLLCounter;
 import org.apache.kylin.measure.hllc.RegisterType;
 import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,9 +67,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
 
     private static final Text EMPTY_TEXT = new Text();
 
-    private int partitionColumnIndex = -1;
-    private boolean needFetchPartitionCol = true;
-
     private SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
 
     @Override
@@ -96,18 +92,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
             allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision(), RegisterType.DENSE);
         }
 
-        TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
-        if (partitionColRef != null) {
-            partitionColumnIndex = intermediateTableDesc.getColumnIndex(partitionColRef);
-        }
-
-        // check whether need fetch the partition col values
-        if (partitionColumnIndex < 0) {
-            // if partition col not on cube, no need
-            needFetchPartitionCol = false;
-        } else {
-            needFetchPartitionCol = true;
-        }
         //for KYLIN-2518 backward compatibility
         boolean isUsePutRowKeyToHllNewAlgorithm;
         if (KylinVersion.isBefore200(cubeDesc.getVersion())) {
@@ -172,12 +156,12 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
 
         for (String[] row : rowCollection) {
             context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row));
-            for (int i = 0; i < dictCols.size(); i++) {
-                String fieldValue = row[dictionaryColumnIndex[i]];
+            for (int i = 0; i < allDimDictCols.size(); i++) {
+                String fieldValue = row[columnIndex[i]];
                 if (fieldValue == null)
                     continue;
 
-                int reducerIndex = reducerMapping.getReducerIdForDictCol(i, fieldValue);
+                int reducerIndex = reducerMapping.getReducerIdForCol(i, fieldValue);
                 
                 tmpbuf.clear();
                 byte[] valueBytes = Bytes.toBytes(fieldValue);
@@ -188,15 +172,14 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
                 tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
                 tmpbuf.put(valueBytes);
                 outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
-                DataType type = dictCols.get(i).getType();
+                DataType type = allDimDictCols.get(i).getType();
                 sortableKey.init(outputKey, type);
-                //judge type
                 context.write(sortableKey, EMPTY_TEXT);
 
                 // log a few rows for troubleshooting
                 if (rowCount < 10) {
                     logger.info(
-                            "Sample output: " + dictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex);
+                            "Sample output: " + allDimDictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex);
                 }
             }
 
@@ -204,22 +187,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
                 putRowKeyToHLL(row);
             }
 
-            if (needFetchPartitionCol == true) {
-                String fieldValue = row[partitionColumnIndex];
-                if (fieldValue != null) {
-                    tmpbuf.clear();
-                    byte[] valueBytes = Bytes.toBytes(fieldValue);
-                    int size = valueBytes.length + 1;
-                    if (size >= tmpbuf.capacity()) {
-                        tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size));
-                    }
-                    tmpbuf.put((byte) FactDistinctColumnsReducerMapping.MARK_FOR_PARTITION_COL);
-                    tmpbuf.put(valueBytes);
-                    outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
-                    sortableKey.init(outputKey, (byte) 0);
-                    context.write(sortableKey, EMPTY_TEXT);
-                }
-            }
             rowCount++;
         }
     }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index c66042b..ceddeb5 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -39,8 +39,6 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.metadata.model.TblColRef;
 
-import com.google.common.collect.Lists;
-
 /**
  */
 abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, SelfDefineSortableKey, Text> {
@@ -50,8 +48,8 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends Kyli
     protected CubeSegment cubeSeg;
     protected CubeDesc cubeDesc;
     protected long baseCuboidId;
-    protected List<TblColRef> dictCols;
     protected IMRTableInputFormat flatTableInputFormat;
+    protected List<TblColRef> allDimDictCols;
 
     protected Text outputKey = new Text();
     //protected SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
@@ -59,7 +57,7 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends Kyli
     protected int errorRecordCounter = 0;
 
     protected CubeJoinedFlatTableEnrich intermediateTableDesc;
-    protected int[] dictionaryColumnIndex;
+    protected int[] columnIndex;
 
     protected FactDistinctColumnsReducerMapping reducerMapping;
     
@@ -74,20 +72,18 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends Kyli
         cubeSeg = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID));
         cubeDesc = cube.getDescriptor();
         baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        dictCols = Lists.newArrayList(cubeDesc.getAllColumnsNeedDictionaryBuilt());
+        reducerMapping = new FactDistinctColumnsReducerMapping(cube);
+        allDimDictCols = reducerMapping.getAllDimDictCols();
 
         flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
 
         intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc);
-        dictionaryColumnIndex = new int[dictCols.size()];
-        for (int i = 0; i < dictCols.size(); i++) {
-            TblColRef colRef = dictCols.get(i);
+        columnIndex = new int[allDimDictCols.size()];
+        for (int i = 0; i < allDimDictCols.size(); i++) {
+            TblColRef colRef = allDimDictCols.get(i);
             int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef);
-            dictionaryColumnIndex[i] = columnIndexOnFlatTbl;
+            columnIndex[i] = columnIndexOnFlatTbl;
         }
-
-        reducerMapping = new FactDistinctColumnsReducerMapping(cube,
-                conf.getInt(BatchConstants.CFG_HLL_REDUCER_NUM, 1));
     }
 
     protected void handleErrorRecord(String[] record, Exception ex) throws IOException {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 801771a..61ba247 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -71,17 +70,16 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
     private boolean isStatistics = false;
     private KylinConfig cubeConfig;
     private int taskId;
-    private boolean isPartitionCol = false;
     private int rowCount = 0;
     private FactDistinctColumnsReducerMapping reducerMapping;
 
     //local build dict
     private boolean buildDictInReducer;
     private IDictionaryBuilder builder;
-    private long timeMaxValue = Long.MIN_VALUE;
-    private long timeMinValue = Long.MAX_VALUE;
+    private String maxValue = null;
+    private String minValue = null;
     public static final String DICT_FILE_POSTFIX = ".rldict";
-    public static final String PARTITION_COL_INFO_FILE_POSTFIX = ".pci";
+    public static final String DIMENSION_COL_INFO_FILE_POSTFIX = ".dci";
 
     private MultipleOutputs mos;
 
@@ -97,11 +95,9 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
         cubeConfig = cube.getConfig();
         cubeDesc = cube.getDescriptor();
 
-        int numberOfTasks = context.getNumReduceTasks();
         taskId = context.getTaskAttemptID().getTaskID().getId();
 
-        reducerMapping = new FactDistinctColumnsReducerMapping(cube,
-                conf.getInt(BatchConstants.CFG_HLL_REDUCER_NUM, 1));
+        reducerMapping = new FactDistinctColumnsReducerMapping(cube);
         
         logger.info("reducer no " + taskId + ", role play " + reducerMapping.getRolePlayOfReducer(taskId));
 
@@ -114,18 +110,9 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
             samplingPercentage = Integer
                     .parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
             logger.info("Reducer " + taskId + " handling stats");
-        } else if (reducerMapping.isPartitionColReducer(taskId)) {
-            // partition col
-            isPartitionCol = true;
-            col = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
-            if (col == null) {
-                logger.info("No partition col. This reducer will do nothing");
-            } else {
-                logger.info("Reducer " + taskId + " handling partition col " + col.getIdentity());
-            }
         } else {
             // normal col
-            col = reducerMapping.getDictColForReducer(taskId);
+            col = reducerMapping.getColForReducer(taskId);
             Preconditions.checkNotNull(col);
 
             // local build dict
@@ -133,7 +120,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
             if (cubeDesc.getDictionaryBuilderClass(col) != null) { // only works with default dictionary builder
                 buildDictInReducer = false;
             }
-            if (reducerMapping.getReducerNumForDictCol(col) > 1) {
+            if (reducerMapping.getReducerNumForDimCol(col) > 1) {
                 buildDictInReducer = false; // only works if this is the only reducer of a dictionary column
             }
             if (buildDictInReducer) {
@@ -167,24 +154,29 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
                     cuboidHLLMap.put(cuboidId, hll);
                 }
             }
-        } else if (isPartitionCol) {
-            // partition col
+        } else {
             String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1);
             logAFewRows(value);
-            long time = DateFormat.stringToMillis(value);
-            timeMinValue = Math.min(timeMinValue, time);
-            timeMaxValue = Math.max(timeMaxValue, time);
-        } else {
-            // normal col
-            if (buildDictInReducer) {
-                String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1);
-                logAFewRows(value);
-                builder.addValue(value);
-            } else {
-                byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
-                // output written to baseDir/colName/-r-00000 (etc)
-                String fileName = col.getIdentity() + "/";
-                mos.write(BatchConstants.CFG_OUTPUT_COLUMN, NullWritable.get(), new Text(keyBytes), fileName);
+            // if dimension col, compute max/min value
+            if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
+                if (minValue == null || col.getType().compare(minValue, value) > 0) {
+                    minValue = value;
+                }
+                if (maxValue == null || col.getType().compare(maxValue, value) < 0) {
+                    maxValue = value;
+                }
+            }
+
+            //if dict column
+            if (cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) {
+                if (buildDictInReducer) {
+                    builder.addValue(value);
+                } else {
+                    byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
+                    // output written to baseDir/colName/-r-00000 (etc)
+                    String fileName = col.getIdentity() + "/";
+                    mos.write(BatchConstants.CFG_OUTPUT_COLUMN, NullWritable.get(), new Text(keyBytes), fileName);
+                }
             }
         }
 
@@ -207,11 +199,12 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
 
             logMapperAndCuboidStatistics(allCuboids); // for human check
             outputStatistics(allCuboids);
-        } else if (isPartitionCol) {
-            // partition col
-            outputPartitionInfo();
         } else {
-            // normal col
+            //dimension col
+            if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
+                outputDimRangeInfo();
+            }
+            // dic col
             if (buildDictInReducer) {
                 Dictionary<String> dict = builder.build();
                 outputDict(col, dict);
@@ -221,14 +214,17 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
         mos.close();
     }
 
-    private void outputPartitionInfo() throws IOException, InterruptedException {
-        if (col != null) {
-            // output written to baseDir/colName/colName.pci-r-00000 (etc)
-            String partitionFileName = col.getIdentity() + "/" + col.getName() + PARTITION_COL_INFO_FILE_POSTFIX;
-
-            mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMinValue), partitionFileName);
-            mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMaxValue), partitionFileName);
-            logger.info("write partition info for col : " + col.getName() + "  minValue:" + timeMinValue + " maxValue:" + timeMaxValue);
+    private void outputDimRangeInfo() throws IOException, InterruptedException {
+        if (col != null && minValue != null) {
+            // output written to baseDir/colName/colName.dci-r-00000 (etc)
+            String dimRangeFileName = col.getIdentity() + "/" + col.getName() + DIMENSION_COL_INFO_FILE_POSTFIX;
+
+            mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new Text(minValue.getBytes()),
+                    dimRangeFileName);
+            mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new Text(maxValue.getBytes()),
+                    dimRangeFileName);
+            logger.info("write dimension range info for col : " + col.getName() + "  minValue:" + minValue + " maxValue:"
+                    + maxValue);
         }
     }
 
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java
index 51594c3..b60e4ce 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java
@@ -18,73 +18,74 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.common.MapReduceUtil;
 import org.apache.kylin.metadata.model.TblColRef;
 
+import com.google.common.collect.Lists;
+
 /**
  * Reducers play different roles based on reducer-id:
- * - (start from 0) one reducer for each dictionary column, UHC may have more than one reducer
- * - one reducer to get min/max of date partition column
+ * - (start from 0) one reducer for each dimension column, dictionary column, UHC may have more than one reducer
  * - (at the end) one or more reducers to collect row counts for cuboids using HLL
  */
 public class FactDistinctColumnsReducerMapping {
 
-    public static final int MARK_FOR_PARTITION_COL = -2;
     public static final int MARK_FOR_HLL_COUNTER = -1;
 
-    final private int nDictValueCollectors;
-    final private int datePartitionReducerId;
     final private int nCuboidRowCounters;
+    final private int nDimReducers;
     final private int nTotalReducers;
 
-    final private List<TblColRef> allDictCols;
-    final private int[] dictColIdToReducerBeginId;
+    final private List<TblColRef> allDimDictCols = Lists.newArrayList();
+    final private int[] colIdToReducerBeginId;
     final private int[] reducerRolePlay; // >=0 for dict col id, <0 for partition col and hll counter (using markers)
 
     public FactDistinctColumnsReducerMapping(CubeInstance cube) {
         this(cube, 0);
     }
 
-    public FactDistinctColumnsReducerMapping(CubeInstance cube, int cuboidRowCounterReducerNum) {
+    private FactDistinctColumnsReducerMapping(CubeInstance cube, int cuboidRowCounterReducerNum) {
         CubeDesc desc = cube.getDescriptor();
+        Set<TblColRef> allCols = cube.getAllColumns();
+        Set<TblColRef> dictCols = desc.getAllColumnsNeedDictionaryBuilt();
+        List<TblColRef> dimCols = desc.listDimensionColumnsExcludingDerived(true);
+        for (TblColRef colRef : allCols) {
+            if (dictCols.contains(colRef)) {
+                allDimDictCols.add(colRef);
+            } else if (dimCols.indexOf(colRef) >= 0){
+                allDimDictCols.add(colRef);
+            }
+        }
 
-        allDictCols = new ArrayList(desc.getAllColumnsNeedDictionaryBuilt());
-
-        dictColIdToReducerBeginId = new int[allDictCols.size() + 1];
+        colIdToReducerBeginId = new int[allDimDictCols.size() + 1];
 
         int uhcReducerCount = cube.getConfig().getUHCReducerCount();
         List<TblColRef> uhcList = desc.getAllUHCColumns();
         int counter = 0;
-        for (int i = 0; i < allDictCols.size(); i++) {
-            dictColIdToReducerBeginId[i] = counter;
-            boolean isUHC = uhcList.contains(allDictCols.get(i));
+        for (int i = 0; i < allDimDictCols.size(); i++) {
+            colIdToReducerBeginId[i] = counter;
+            boolean isUHC = uhcList.contains(allDimDictCols.get(i));
             counter += (isUHC) ? uhcReducerCount : 1;
         }
-
-        dictColIdToReducerBeginId[allDictCols.size()] = counter;
-        nDictValueCollectors = counter;
-        datePartitionReducerId = counter;
+        colIdToReducerBeginId[allDimDictCols.size()] = counter;
+        nDimReducers = counter;
 
         nCuboidRowCounters = cuboidRowCounterReducerNum == 0 ? //
                 MapReduceUtil.getCuboidHLLCounterReducerNum(cube) : cuboidRowCounterReducerNum;
-        nTotalReducers = nDictValueCollectors + 1 + nCuboidRowCounters;
+        nTotalReducers = nDimReducers + nCuboidRowCounters;
 
         reducerRolePlay = new int[nTotalReducers];
         for (int i = 0, dictId = 0; i < nTotalReducers; i++) {
-            if (i > datePartitionReducerId) {
+            if (i >= nDimReducers) {
                 // cuboid HLL counter reducer
                 reducerRolePlay[i] = MARK_FOR_HLL_COUNTER;
-            } else if (i == datePartitionReducerId) {
-                // date partition min/max reducer
-                reducerRolePlay[i] = MARK_FOR_PARTITION_COL;
             } else {
-                // dict value collector reducer
-                if (i == dictColIdToReducerBeginId[dictId + 1])
+                if (i == colIdToReducerBeginId[dictId + 1])
                     dictId++;
 
                 reducerRolePlay[i] = dictId;
@@ -92,8 +93,8 @@ public class FactDistinctColumnsReducerMapping {
         }
     }
     
-    public List<TblColRef> getAllDictCols() {
-        return allDictCols;
+    public List<TblColRef> getAllDimDictCols() {
+        return allDimDictCols;
     }
     
     public int getTotalReducerNum() {
@@ -104,9 +105,9 @@ public class FactDistinctColumnsReducerMapping {
         return nCuboidRowCounters;
     }
 
-    public int getReducerIdForDictCol(int dictColId, Object fieldValue) {
-        int begin = dictColIdToReducerBeginId[dictColId];
-        int span = dictColIdToReducerBeginId[dictColId + 1] - begin;
+    public int getReducerIdForCol(int colId, Object fieldValue) {
+        int begin = colIdToReducerBeginId[colId];
+        int span = colIdToReducerBeginId[colId + 1] - begin;
         
         if (span == 1)
             return begin;
@@ -120,37 +121,29 @@ public class FactDistinctColumnsReducerMapping {
     }
 
     public int getRolePlayOfReducer(int reducerId) {
-        return reducerRolePlay[reducerId];
+        return reducerRolePlay[reducerId % nTotalReducers];
     }
     
     public boolean isCuboidRowCounterReducer(int reducerId) {
         return getRolePlayOfReducer(reducerId) == MARK_FOR_HLL_COUNTER;
     }
-    
-    public boolean isPartitionColReducer(int reducerId) {
-        return getRolePlayOfReducer(reducerId) == MARK_FOR_PARTITION_COL;
-    }
 
-    public TblColRef getDictColForReducer(int reducerId) {
-        int role = getRolePlayOfReducer(reducerId);
+    public TblColRef getColForReducer(int reducerId) {
+        int role = getRolePlayOfReducer(reducerId % nTotalReducers);
         if (role < 0)
             throw new IllegalStateException();
         
-        return allDictCols.get(role);
-    }
-
-    public int getReducerNumForDictCol(TblColRef col) {
-        int dictColId = allDictCols.indexOf(col);
-        return dictColIdToReducerBeginId[dictColId + 1] - dictColIdToReducerBeginId[dictColId];
+        return allDimDictCols.get(role);
     }
 
-    public int getReducerIdForDatePartitionColumn() {
-        return datePartitionReducerId;
+    public int getReducerNumForDimCol(TblColRef col) {
+        int dictColId = allDimDictCols.indexOf(col);
+        return colIdToReducerBeginId[dictColId + 1] - colIdToReducerBeginId[dictColId];
     }
 
     public int getReducerIdForCuboidRowCount(long cuboidId) {
         int rowCounterId = (int) (Math.abs(cuboidId) % nCuboidRowCounters);
-        return datePartitionReducerId + 1 + rowCounterId;
+        return nDimReducers + rowCounterId;
     }
     
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index f749c80..fdb19db 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -23,15 +23,18 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.DimensionRangeInfo;
 import org.apache.kylin.cube.model.SnapshotTableDesc;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.LookupMaterializeContext;
@@ -40,11 +43,14 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.datatype.DataTypeOrder;
 import org.apache.kylin.metadata.model.SegmentRange.TSRange;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Sets;
+
 /**
  */
 public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
@@ -76,9 +82,7 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
 
         try {
             saveExtSnapshotIfNeeded(cubeManager, cube, segment);
-            if (segment.isOffsetCube()) {
-                updateTimeRange(segment);
-            }
+            updateSegment(segment);
 
             cubeManager.promoteNewlyBuiltSegments(cube, segment);
             return new ExecuteResult();
@@ -115,40 +119,51 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
         }
     }
 
-    private void updateTimeRange(CubeSegment segment) throws IOException {
+    private void updateSegment(CubeSegment segment) throws IOException {
         final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
 
-        if (partitionCol == null) {
-            return;
-        }
-        final String factColumnsInputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
-        Path colDir = new Path(factColumnsInputPath, partitionCol.getIdentity());
-        FileSystem fs = HadoopUtil.getWorkingFileSystem();
-        Path outputFile = HadoopUtil.getFilterOnlyPath(fs, colDir,
-                partitionCol.getName() + FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX);
-        if (outputFile == null) {
-            throw new IOException("fail to find the partition file in base dir: " + colDir);
-        }
-
-        FSDataInputStream is = null;
-        BufferedReader bufferedReader = null;
-        InputStreamReader isr = null;
-        long minValue, maxValue;
-        try {
-            is = fs.open(outputFile);
-            isr = new InputStreamReader(is);
-            bufferedReader = new BufferedReader(isr);
-            minValue = Long.parseLong(bufferedReader.readLine());
-            maxValue = Long.parseLong(bufferedReader.readLine());
-        } finally {
-            IOUtils.closeQuietly(is);
-            IOUtils.closeQuietly(isr);
-            IOUtils.closeQuietly(bufferedReader);
-        }
+        for (TblColRef dimColRef : segment.getCubeDesc().listDimensionColumnsExcludingDerived(true)) {
+            final String factColumnsInputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
+            Path colDir = new Path(factColumnsInputPath, dimColRef.getIdentity());
+            FileSystem fs = HadoopUtil.getWorkingFileSystem();
+
+            //handle multiple reducers
+            Path[] outputFiles = HadoopUtil.getFilteredPath(fs, colDir,
+                    dimColRef.getName() + FactDistinctColumnsReducer.DIMENSION_COL_INFO_FILE_POSTFIX);
+            if (outputFiles == null || outputFiles.length == 0) {
+                segment.getDimensionRangeInfoMap().put(dimColRef.getIdentity(), new DimensionRangeInfo(null, null));
+                continue;
+            }
 
-        logger.info("updateTimeRange step. minValue:" + minValue + " maxValue:" + maxValue);
-        if (minValue != timeMinValue && maxValue != timeMaxValue) {
-            segment.setTSRange(new TSRange(minValue, maxValue + 1));
+            FSDataInputStream is = null;
+            BufferedReader bufferedReader = null;
+            InputStreamReader isr = null;
+            Set<String> minValues = Sets.newHashSet(), maxValues = Sets.newHashSet();
+            for (Path outputFile : outputFiles) {
+                try {
+                    is = fs.open(outputFile);
+                    isr = new InputStreamReader(is);
+                    bufferedReader = new BufferedReader(isr);
+                    minValues.add(bufferedReader.readLine());
+                    maxValues.add(bufferedReader.readLine());
+                } finally {
+                    IOUtils.closeQuietly(is);
+                    IOUtils.closeQuietly(isr);
+                    IOUtils.closeQuietly(bufferedReader);
+                }
+            }
+            DataTypeOrder order = dimColRef.getType().getOrder();
+            String minValue = order.min(minValues);
+            String maxValue = order.max(maxValues);
+            logger.info("updateSegment step. {} minValue:" + minValue + " maxValue:" + maxValue, dimColRef.getName());
+
+            if (segment.isOffsetCube() && partitionCol != null && partitionCol.getIdentity().equals(dimColRef.getIdentity())) {
+                logger.info("update partition. {} timeMinValue:" + minValue + " timeMaxValue:" + maxValue, dimColRef.getName());
+                if (DateFormat.stringToMillis(minValue) != timeMinValue && DateFormat.stringToMillis(maxValue) != timeMaxValue) {
+                    segment.setTSRange(new TSRange(DateFormat.stringToMillis(minValue), DateFormat.stringToMillis(maxValue) + 1));
+                }
+            }
+            segment.getDimensionRangeInfoMap().put(dimColRef.getIdentity(), new DimensionRangeInfo(minValue, maxValue));
         }
     }
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
index e7aec8c..4cf6fc6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
@@ -20,10 +20,12 @@ package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.DimensionRangeInfo;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.exception.SegmentNotFoundException;
 import org.apache.kylin.job.exception.ExecuteException;
@@ -61,9 +63,9 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
         if (mergingSegmentIds.isEmpty()) {
             return ExecuteResult.createFailed(new SegmentNotFoundException("there are no merging segments"));
         }
+        
         long sourceCount = 0L;
         long sourceSize = 0L;
-
         boolean isOffsetCube = mergedSegment.isOffsetCube();
         Long tsStartMin = Long.MAX_VALUE, tsEndMax = 0L;
         for (String id : mergingSegmentIds) {
@@ -74,14 +76,26 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
             tsEndMax = Math.max(tsEndMax, segment.getTSRange().end.v);
         }
 
+        Map<String, DimensionRangeInfo> mergedSegDimRangeMap = null;
+        for (String id : mergingSegmentIds) {
+            CubeSegment segment = cube.getSegmentById(id);
+            Map<String, DimensionRangeInfo> segDimRangeMap = segment.getDimensionRangeInfoMap();
+            if (mergedSegDimRangeMap == null) {
+                mergedSegDimRangeMap = segDimRangeMap;
+            } else {
+                mergedSegDimRangeMap = DimensionRangeInfo.mergeRangeMap(cube.getModel(), segDimRangeMap,
+                        mergedSegDimRangeMap);
+            }
+        }
+
         // update segment info
         mergedSegment.setSizeKB(cubeSizeBytes / 1024);
         mergedSegment.setInputRecords(sourceCount);
         mergedSegment.setInputRecordsSize(sourceSize);
         mergedSegment.setLastBuildJobID(CubingExecutableUtil.getCubingJobId(this.getParams()));
         mergedSegment.setLastBuildTime(System.currentTimeMillis());
-
-        if (isOffsetCube == true) {
+        mergedSegment.setDimensionRangeInfoMap(mergedSegDimRangeMap);
+        if (isOffsetCube) {
             SegmentRange.TSRange tsRange = new SegmentRange.TSRange(tsStartMin, tsEndMax);
             mergedSegment.setTSRange(tsRange);
         }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
index d013386..03aa616 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
@@ -59,6 +59,7 @@ public class UpdateCubeInfoAfterOptimizeStep extends AbstractExecutable {
         segment.setSizeKB(cubeSizeBytes / 1024);
         segment.setInputRecords(sourceCount);
         segment.setInputRecordsSize(sourceSizeBytes);
+        segment.setDimensionRangeInfoMap(originalSegment.getDimensionRangeInfoMap());
 
         try {
             cubeManager.promoteNewlyOptimizeSegments(cube, segment);
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java
index a6bc019..3c58b26 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java
@@ -60,24 +60,22 @@ public class FactDistinctColumnsReducerMappingTest extends LocalFileMetadataTest
         int totalReducerNum = mapping.getTotalReducerNum();
         Assert.assertEquals(2, mapping.getCuboidRowCounterReducerNum());
         
-        // check partition column reducer & cuboid row count reducers
+        // check cuboid row count reducers
         Assert.assertEquals(FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER,
                 mapping.getRolePlayOfReducer(totalReducerNum - 1));
         Assert.assertEquals(FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER,
                 mapping.getRolePlayOfReducer(totalReducerNum - 2));
-        Assert.assertEquals(FactDistinctColumnsReducerMapping.MARK_FOR_PARTITION_COL,
-                mapping.getRolePlayOfReducer(totalReducerNum - 3));
         
         // check all dict column reducers
-        int dictEnd = totalReducerNum - 3;
+        int dictEnd = totalReducerNum - 2;
         for (int i = 0; i < dictEnd; i++)
             Assert.assertTrue(mapping.getRolePlayOfReducer(i) >= 0);
         
         // check a UHC dict column
-        Assert.assertEquals(2, mapping.getReducerNumForDictCol(aUHC));
+        Assert.assertEquals(2, mapping.getReducerNumForDimCol(aUHC));
         int uhcReducerBegin = -1;
         for (int i = 0; i < dictEnd; i++) {
-            if (mapping.getDictColForReducer(i).equals(aUHC)) {
+            if (mapping.getColForReducer(i).equals(aUHC)) {
                 uhcReducerBegin = i;
                 break;
             }
@@ -86,7 +84,7 @@ public class FactDistinctColumnsReducerMappingTest extends LocalFileMetadataTest
         int[] allRolePlay = mapping.getAllRolePlaysForReducers();
         Assert.assertEquals(allRolePlay[uhcReducerBegin], allRolePlay[uhcReducerBegin + 1]);
         for (int i = 0; i < 5; i++) {
-            int reducerId = mapping.getReducerIdForDictCol(uhcReducerBegin, i);
+            int reducerId = mapping.getReducerIdForCol(uhcReducerBegin, i);
             Assert.assertTrue(uhcReducerBegin <= reducerId && reducerId <= uhcReducerBegin + 1);
         }
     }
diff --git a/examples/test_case_data/localmeta/cube/ssb_cube_with_dimention_range.json b/examples/test_case_data/localmeta/cube/ssb_cube_with_dimention_range.json
new file mode 100644
index 0000000..124081c
--- /dev/null
+++ b/examples/test_case_data/localmeta/cube/ssb_cube_with_dimention_range.json
@@ -0,0 +1,110 @@
+{
+  "uuid" : "70a9f288-3c01-4745-a04b-5641e82d6c72",
+  "name" : "ssb_cube_with_dimention_range",
+  "owner" : "ADMIN",
+  "cost" : 50,
+  "status" : "DISABLED",
+  "segments" : [{
+    "uuid": "f34e2a88-50e4-4c8c-8dd1-64e922ce8c37",
+    "name": "19920523163722_20180523173026",
+    "storage_location_identifier": "KYLIN_KFV177RJOS",
+    "date_range_start": 706639042000,
+    "date_range_end": 1527096626000,
+    "source_offset_start": 0,
+    "source_offset_end": 0,
+    "status": "READY",
+    "size_kb": 100,
+    "input_records": 1000,
+    "input_records_size": 102400,
+    "last_build_time": 1527068515334,
+    "last_build_job_id": "a5010166-b64a-4289-ac56-064640f7f2fa",
+    "create_time_utc": 1527067828660,
+    "total_shards": 0,
+    "blackout_cuboids": [],
+    "binary_signature": null,
+    "dimension_range_info_map": {
+      "PART.P_BRAND": {
+        "min": "MFGR#1101",
+        "max": "MFGR#5540"
+      },
+      "V_LINEORDER.LO_QUANTITY": {
+        "min": "1",
+        "max": "50"
+      },
+      "PART.P_MFGR": {
+        "min": "MFGR#1",
+        "max": "MFGR#5"
+      },
+      "DATES.D_YEARMONTHNUM": {
+        "min": "199205",
+        "max": "199808"
+      },
+      "CUSTOMER.C_NATION": {
+        "min": "ALGERIA",
+        "max": "VIETNAM"
+      },
+      "DATES.D_YEARMONTH": {
+        "min": "Apr1993",
+        "max": "Sep1997"
+      },
+      "CUSTOMER.C_CUSTKEY": {
+        "min": "1",
+        "max": "2999"
+      },
+      "DATES.D_DATEKEY": {
+        "min": "19920523",
+        "max": "19980802"
+      },
+      "SUPPLIER.S_SUPPKEY": {
+        "min": "1",
+        "max": "200"
+      },
+      "SUPPLIER.S_REGION": {
+        "min": "AFRICA",
+        "max": "MIDDLE EAST"
+      },
+      "PART.P_PARTKEY": {
+        "min": "1",
+        "max": "20000"
+      },
+      "PART.P_CATEGORY": {
+        "min": "MFGR#11",
+        "max": "MFGR#55"
+      },
+      "DATES.D_WEEKNUMINYEAR": {
+        "min": "1",
+        "max": "53"
+      },
+      "CUSTOMER.C_CITY": {
+        "min": "ALGERIA  0",
+        "max": "VIETNAM  9"
+      },
+      "SUPPLIER.S_NATION": {
+        "min": "ALGERIA",
+        "max": "VIETNAM"
+      },
+      "SUPPLIER.S_CITY": {
+        "min": "ALGERIA  1",
+        "max": "VIETNAM  9"
+      },
+      "CUSTOMER.C_REGION": {
+        "min": "AFRICA",
+        "max": "MIDDLE EAST"
+      },
+      "DATES.D_YEAR": {
+        "min": "1992",
+        "max": "1998"
+      },
+      "V_LINEORDER.LO_DISCOUNT": {
+        "min": "0",
+        "max": "10"
+      }
+    }
+  } ],
+  "last_modified" : 1457534216410,
+  "descriptor" : "ssb_cube_with_dimention_range",
+  "create_time_utc" : 1457444500888,
+  "size_kb" : 100,
+  "input_records_count" : 1000,
+  "input_records_size" : 102400
+}
\ No newline at end of file
diff --git a/examples/test_case_data/localmeta/cube_desc/ssb_cube_with_dimention_range.json b/examples/test_case_data/localmeta/cube_desc/ssb_cube_with_dimention_range.json
new file mode 100644
index 0000000..d91b420
--- /dev/null
+++ b/examples/test_case_data/localmeta/cube_desc/ssb_cube_with_dimention_range.json
@@ -0,0 +1,269 @@
+{
+  "uuid" : "5c44df30-daec-486e-af90-927bf7851060",
+  "name" : "ssb_cube_with_dimention_range",
+  "description" : "",
+  "dimensions" : [ {
+    "name" : "PART.P_MFGR",
+    "table" : "PART",
+    "column" : "P_MFGR",
+    "derived" : null
+  }, {
+    "name" : "DATES.D_YEARMONTH",
+    "table" : "DATES",
+    "column" : "D_YEARMONTH",
+    "derived" : null
+  }, {
+    "name" : "SUPPLIER.S_NATION",
+    "table" : "SUPPLIER",
+    "column" : "S_NATION",
+    "derived" : null
+  }, {
+    "name" : "V_LINEORDER.LO_DISCOUNT",
+    "table" : "V_LINEORDER",
+    "column" : "LO_DISCOUNT",
+    "derived" : null
+  }, {
+    "name" : "CUSTOMER.C_CUSTKEY",
+    "table" : "CUSTOMER",
+    "column" : "C_CUSTKEY",
+    "derived" : null
+  }, {
+    "name" : "CUSTOMER.C_NATION",
+    "table" : "CUSTOMER",
+    "column" : "C_NATION",
+    "derived" : null
+  }, {
+    "name" : "SUPPLIER.S_REGION",
+    "table" : "SUPPLIER",
+    "column" : "S_REGION",
+    "derived" : null
+  }, {
+    "name" : "CUSTOMER.C_CITY",
+    "table" : "CUSTOMER",
+    "column" : "C_CITY",
+    "derived" : null
+  }, {
+    "name" : "SUPPLIER.S_SUPPKEY",
+    "table" : "SUPPLIER",
+    "column" : "S_SUPPKEY",
+    "derived" : null
+  }, {
+    "name" : "V_LINEORDER.LO_QUANTITY",
+    "table" : "V_LINEORDER",
+    "column" : "LO_QUANTITY",
+    "derived" : null
+  }, {
+    "name" : "PART.P_BRAND",
+    "table" : "PART",
+    "column" : "P_BRAND",
+    "derived" : null
+  }, {
+    "name" : "SUPPLIER.S_CITY",
+    "table" : "SUPPLIER",
+    "column" : "S_CITY",
+    "derived" : null
+  }, {
+    "name" : "PART.P_PARTKEY",
+    "table" : "PART",
+    "column" : "P_PARTKEY",
+    "derived" : null
+  }, {
+    "name" : "DATES.D_YEARMONTHNUM",
+    "table" : "DATES",
+    "column" : "D_YEARMONTHNUM",
+    "derived" : null
+  }, {
+    "name" : "DATES.D_WEEKNUMINYEAR",
+    "table" : "DATES",
+    "column" : "D_WEEKNUMINYEAR",
+    "derived" : null
+  }, {
+    "name" : "CUSTOMER.C_REGION",
+    "table" : "CUSTOMER",
+    "column" : "C_REGION",
+    "derived" : null
+  }, {
+    "name" : "PART.P_CATEGORY",
+    "table" : "PART",
+    "column" : "P_CATEGORY",
+    "derived" : null
+  }, {
+    "name" : "DATES.D_YEAR",
+    "table" : "DATES",
+    "column" : "D_YEAR",
+    "derived" : null
+  }, {
+    "name" : "DATES.D_DATEKEY",
+    "table" : "DATES",
+    "column" : "D_DATEKEY",
+    "derived" : null
+  } ],
+  "measures" : [ {
+    "name" : "_COUNT_",
+    "function" : {
+      "expression" : "COUNT",
+      "parameter" : {
+        "type" : "constant",
+        "value" : "1",
+        "next_parameter" : null
+      },
+      "returntype" : "bigint"
+    },
+    "dependent_measure_ref" : null
+  }, {
+    "name" : "TOTAL_REVENUE",
+    "function" : {
+      "expression" : "SUM",
+      "parameter" : {
+        "type" : "column",
+        "value" : "LO_REVENUE",
+        "next_parameter" : null
+      },
+      "returntype" : "bigint"
+    },
+    "dependent_measure_ref" : null
+  }, {
+    "name" : "TOTAL_SUPPLYCOST",
+    "function" : {
+      "expression" : "SUM",
+      "parameter" : {
+        "type" : "column",
+        "value" : "LO_SUPPLYCOST",
+        "next_parameter" : null
+      },
+      "returntype" : "bigint"
+    },
+    "dependent_measure_ref" : null
+  } ],
+  "rowkey" : {
+    "rowkey_columns" : [ {
+      "column" : "PART.P_MFGR",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "DATES.D_YEARMONTHNUM",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "CUSTOMER.C_REGION",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "V_LINEORDER.LO_QUANTITY",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "DATES.D_YEARMONTH",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "DATES.D_WEEKNUMINYEAR",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "SUPPLIER.S_REGION",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "V_LINEORDER.LO_DISCOUNT",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "CUSTOMER.C_CITY",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "PART.P_CATEGORY",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "DATES.D_YEAR",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "PART.P_BRAND",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "SUPPLIER.S_CITY",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "SUPPLIER.S_NATION",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "CUSTOMER.C_NATION",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "DATES.D_DATEKEY",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "SUPPLIER.S_SUPPKEY",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "CUSTOMER.C_CUSTKEY",
+      "encoding" : "dict",
+      "isShardBy" : true,
+      "index" : "eq"
+    }, {
+      "column" : "PART.P_PARTKEY",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    } ]
+  },
+  "signature" : "",
+  "last_modified" : 1457503036686,
+  "model_name" : "ssb",
+  "null_string" : null,
+  "hbase_mapping" : {
+    "column_family" : [ {
+      "name" : "F1",
+      "columns" : [ {
+        "qualifier" : "M",
+        "measure_refs" : [ "_COUNT_", "TOTAL_REVENUE", "TOTAL_SUPPLYCOST"]
+      } ]
+    } ]
+  },
+  "aggregation_groups" : [ {
+    "includes" : [ "SUPPLIER.S_REGION", "CUSTOMER.C_NATION", "SUPPLIER.S_NATION", "CUSTOMER.C_REGION", "DATES.D_YEAR", "DATES.D_WEEKNUMINYEAR", "DATES.D_YEARMONTH", "DATES.D_YEARMONTHNUM", "SUPPLIER.S_CITY" ],
+    "select_rule" : {
+      "hierarchy_dims" : [ [ "S_REGION", "S_NATION", "S_CITY" ] ],
+      "mandatory_dims" : [ "DATES.D_YEAR" ],
+      "joint_dims" : [ ]
+    }
+  } ],
+  "notify_list" : [ ],
+  "status_need_notify" : [ ],
+  "partition_date_start" : 3153000000000,
+  "partition_date_end" : 3153600000000,
+  "auto_merge_time_ranges" : [ 604800000, 2419200000 ],
+  "retention_range" : 0,
+  "engine_type" : 2,
+  "storage_type" : 2,
+  "override_kylin_properties" : {
+    "kylin.storage.hbase.compression-codec" : "lz4",
+    "kylin.cube.aggrgroup.is-mandatory-only-valid" : "true"
+  }
+}
\ No newline at end of file
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index afd9788..16ceede 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -18,15 +18,31 @@
 
 package org.apache.kylin.provision;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.Pair;
@@ -34,6 +50,7 @@ import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.DimensionRangeInfo;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.EngineFactory;
@@ -48,6 +65,7 @@ import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
 import org.apache.kylin.metadata.model.SegmentRange.TSRange;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.rest.job.StorageCleanupJob;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator;
@@ -55,21 +73,9 @@ import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 public class BuildCubeWithEngine {
 
@@ -304,21 +310,30 @@ public class BuildCubeWithEngine {
         
         if (!buildSegment(cubeName, date1, date2))
             return false;
+        checkNormalSegRangeInfo(cubeManager.getCube(cubeName));
         if (!buildSegment(cubeName, date2, date3))
             return false;
+        checkNormalSegRangeInfo(cubeManager.getCube(cubeName));
         if (!optimizeCube(cubeName))
             return false;
+        checkNormalSegRangeInfo(cubeManager.getCube(cubeName));
         if (!buildSegment(cubeName, date3, date4))
             return false;
+        checkNormalSegRangeInfo(cubeManager.getCube(cubeName));
         if (!buildSegment(cubeName, date4, date5)) // one empty segment
             return false;
+        checkEmptySegRangeInfo(cubeManager.getCube(cubeName));
         if (!buildSegment(cubeName, date5, date6)) // another empty segment
             return false;
+        checkEmptySegRangeInfo(cubeManager.getCube(cubeName));
+
 
         if (!mergeSegment(cubeName, date2, date4)) // merge 2 normal segments
             return false;
+        checkNormalSegRangeInfo(cubeManager.getCube(cubeName));
         if (!mergeSegment(cubeName, date2, date5)) // merge normal and empty
             return false;
+        checkNormalSegRangeInfo(cubeManager.getCube(cubeName));
         
         // now have 2 normal segments [date1, date2) [date2, date5) and 1 empty segment [date5, date6)
         return true;
@@ -460,4 +475,40 @@ public class BuildCubeWithEngine {
         }
     }
 
+    private void checkEmptySegRangeInfo(CubeInstance cube) {
+        CubeSegment segment = getLastModifiedSegment(cube);
+        for (String colId : segment.getDimensionRangeInfoMap().keySet()) {
+            DimensionRangeInfo range = segment.getDimensionRangeInfoMap().get(colId);
+            if (!(range.getMax() == null && range.getMin() == null)) {
+                throw new RuntimeException("Empty segment must have null info.");
+            }
+        }
+    }
+
+    private void checkNormalSegRangeInfo(CubeInstance cube) {
+        CubeSegment segment = getLastModifiedSegment(cube);
+        if (segment.getModel().getPartitionDesc().isPartitioned()) {
+            TblColRef colRef = segment.getModel().getPartitionDesc().getPartitionDateColumnRef();
+            DimensionRangeInfo dmRangeInfo = segment.getDimensionRangeInfoMap().get(colRef.getIdentity());
+            long min_v = DateFormat.stringToMillis(dmRangeInfo.getMin());
+            long max_v = DateFormat.stringToMillis(dmRangeInfo.getMax());
+            long ts_range_start = segment.getTSRange().start.v;
+            long ts_range_end = segment.getTSRange().end.v;
+            if (!(ts_range_start <= min_v && max_v <= ts_range_end -1)) {
+                throw new RuntimeException(String.format(
+                        "Build cube failed, wrong partition column min/max value."
+                                + " Segment: %s, min value: %s, TsRange.start: %s, max value: %s, TsRange.end: %s",
+                        segment, min_v, ts_range_start, max_v, ts_range_end));
+            }
+        }
+    }
+    
+    private CubeSegment getLastModifiedSegment(CubeInstance cube) {
+        return Collections.max(cube.getSegments(), new Comparator<CubeSegment>() {
+            @Override
+            public int compare(CubeSegment o1, CubeSegment o2) {
+                return Long.compare(o1.getLastBuildTime(), o2.getLastBuildTime());
+            }
+        });
+    }
 }
diff --git a/query/src/main/java/org/apache/kylin/query/optrule/AggregateMultipleExpandRule.java b/query/src/main/java/org/apache/kylin/query/optrule/AggregateMultipleExpandRule.java
index cdd6004..39c90e8 100644
--- a/query/src/main/java/org/apache/kylin/query/optrule/AggregateMultipleExpandRule.java
+++ b/query/src/main/java/org/apache/kylin/query/optrule/AggregateMultipleExpandRule.java
@@ -18,8 +18,12 @@
 
 package org.apache.kylin.query.optrule;
 
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelOptRuleOperand;
@@ -33,13 +37,11 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.tools.RelBuilder;
 import org.apache.calcite.util.ImmutableBitSet;
 
-import javax.annotation.Nullable;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
 
 /**
- * Supoort grouping query. Expand the non-simple aggregate to more than one simple aggregates.
+ * Support grouping query. Expand the non-simple aggregate to more than one simple aggregates.
  * Add project on expanded simple aggregate to add indicators of origin aggregate.
  * All projects on aggregate added into one union, which replace the origin aggregate.
  * The new aggregates will be transformed by {@link org.apache.kylin.query.optrule.AggregateProjectReduceRule}, to reduce rolled up dimensions.
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
index 39a2669..b34b42e 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
@@ -39,7 +39,9 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexProgram;
 import org.apache.calcite.rex.RexProgramBuilder;
 import org.apache.kylin.metadata.filter.FilterOptimizeTransformer;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.query.relnode.visitor.TupleFilterVisitor;
 
@@ -117,7 +119,29 @@ public class OLAPFilterRel extends Filter implements OLAPRel {
             }
         }
 
-        context.filter = TupleFilter.and(context.filter, filter);
+        context.filter = and(context.filter, filter);
+    }
+    
+    private TupleFilter and(TupleFilter f1, TupleFilter f2) {
+        if (f1 == null)
+            return f2;
+        if (f2 == null)
+            return f1;
+
+        if (f1.getOperator() == FilterOperatorEnum.AND) {
+            f1.addChild(f2);
+            return f1;
+        }
+
+        if (f2.getOperator() == FilterOperatorEnum.AND) {
+            f2.addChild(f1);
+            return f2;
+        }
+
+        LogicalTupleFilter and = new LogicalTupleFilter(FilterOperatorEnum.AND);
+        and.addChild(f1);
+        and.addChild(f2);
+        return and;
     }
 
     @Override
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 8aec8b9..bfea632 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -45,7 +45,6 @@ import com.google.common.collect.Lists;
 
 public class HiveMRInput extends HiveInputBase implements IMRInput {
 
-    @SuppressWarnings("unused")
     private static final Logger logger = LoggerFactory.getLogger(HiveMRInput.class);
 
     @Override
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
index 75f322f..4ebdf3d 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
@@ -126,7 +126,7 @@ public class HiveTableReader implements TableReader {
         String[] arr = new String[record.size()];
         for (int i = 0; i < arr.length; i++) {
             Object o = record.get(i);
-            arr[i] = (o == null) ? null : o.toString();
+            arr[i] = (o == null || "\\N".equals(o)) ? null : o.toString();
         }
         return arr;
     }