You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2018/06/30 12:22:55 UTC

[kylin] 09/12: KYLIN-3370 enhance segment pruning

This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch sync
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 18d8663f5e0902ca8093b8e3d6ca89993c1b5d3f
Author: ZhansShaoxiong <sh...@gmail.com>
AuthorDate: Mon Jun 18 10:55:51 2018 +0800

    KYLIN-3370 enhance segment pruning
---
 .../java/org/apache/kylin/cube/CubeSegment.java    |  12 +
 .../org/apache/kylin/cube/DimensionRangeInfo.java  | 104 ++++++++
 .../apache/kylin/cube/common/SegmentPruner.java    | 168 +++++++++++++
 .../kylin/cube/gridtable/ScanRangePlannerBase.java |   4 +-
 .../apache/kylin/cube/DimensionRangeInfoTest.java  |  86 +++++++
 .../kylin/cube/common/SegmentPrunerTest.java       | 186 ++++++++++++++
 .../apache/kylin/metadata/filter/TupleFilter.java  |  28 ++-
 .../kylin/metadata/datatype/DataTypeOrderTest.java |  57 +++++
 .../kylin/metadata/filter/TupleFilterTest.java     |  23 ++
 .../storage/gtrecord/CubeScanRangePlanner.java     |  30 +--
 .../storage/gtrecord/GTCubeStorageQueryBase.java   |   9 +-
 .../kylin/storage/gtrecord/DictGridTableTest.java  |  69 +++---
 .../mr/steps/CalculateStatsFromBaseCuboidJob.java  |   1 -
 .../mr/steps/FactDistinctColumnPartitioner.java    |   5 +-
 .../engine/mr/steps/FactDistinctColumnsJob.java    |   2 +-
 .../engine/mr/steps/FactDistinctColumnsMapper.java |  43 +---
 .../mr/steps/FactDistinctColumnsMapperBase.java    |  20 +-
 .../mr/steps/FactDistinctColumnsReducer.java       |  90 ++++---
 .../steps/FactDistinctColumnsReducerMapping.java   |  87 +++----
 .../mr/steps/UpdateCubeInfoAfterBuildStep.java     |  83 ++++---
 .../mr/steps/UpdateCubeInfoAfterMergeStep.java     |  16 +-
 .../FactDistinctColumnsReducerMappingTest.java     |  12 +-
 .../cube/ssb_cube_with_dimention_range.json        | 110 +++++++++
 .../cube_desc/ssb_cube_with_dimention_range.json   | 269 +++++++++++++++++++++
 .../kylin/provision/BuildCubeWithEngine.java       |  86 +++++--
 25 files changed, 1312 insertions(+), 288 deletions(-)

diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index d49c273..75c90a4 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -119,6 +119,10 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
     @JsonInclude(JsonInclude.Include.NON_EMPTY)
     private Map<String, String> additionalInfo = new LinkedHashMap<String, String>();
 
+    @JsonProperty("dimension_range_info_map")
+    @JsonInclude(JsonInclude.Include.NON_EMPTY)
+    private Map<String, DimensionRangeInfo> dimensionRangeInfoMap = Maps.newHashMap();
+
     private Map<Long, Short> cuboidBaseShards = Maps.newConcurrentMap(); // cuboid id ==> base(starting) shard for this cuboid
 
     // lazy init
@@ -575,4 +579,12 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
     public void setSourcePartitionOffsetStart(Map<Integer, Long> sourcePartitionOffsetStart) {
         this.sourcePartitionOffsetStart = sourcePartitionOffsetStart;
     }
+
+    public Map<String, DimensionRangeInfo> getDimensionRangeInfoMap() {
+        return dimensionRangeInfoMap;
+    }
+
+    public void setDimensionRangeInfoMap(Map<String, DimensionRangeInfo> dimensionRangeInfoMap) {
+        this.dimensionRangeInfoMap = dimensionRangeInfoMap;
+    }
 }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java b/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
new file mode 100644
index 0000000..e36ca96
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube;
+
+import java.util.Map;
+
+import org.apache.kylin.metadata.datatype.DataTypeOrder;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Maps;
+
+@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
+public class DimensionRangeInfo {
+
+    private static final Logger logger = LoggerFactory.getLogger(DimensionRangeInfo.class);
+
+    public static Map<String, DimensionRangeInfo> mergeRangeMap(DataModelDesc model, Map<String, DimensionRangeInfo> m1,
+            Map<String, DimensionRangeInfo> m2) {
+
+        if (!m1.keySet().equals(m2.keySet())) {
+            logger.warn("Merging incompatible maps of DimensionRangeInfo, keys in m1 " + m1.keySet() + ", keys in m2 "
+                    + m2.keySet());
+        }
+
+        Map<String, DimensionRangeInfo> result = Maps.newHashMap();
+        
+        for (String colId : m1.keySet()) {
+            if (!m2.containsKey(colId))
+                continue;
+
+            DimensionRangeInfo r1 = m1.get(colId);
+            DimensionRangeInfo r2 = m2.get(colId);
+            
+            DimensionRangeInfo newR;
+            if (r1.getMin() == null && r1.getMax() == null) {
+                newR = r2; // when r1 is all null or has 0 records
+            } else if (r2.getMin() == null && r2.getMax() == null) {
+                newR = r1; // when r2 is all null or has 0 records
+            } else {
+                DataTypeOrder order = model.findColumn(colId).getType().getOrder();
+                String newMin = order.min(r1.getMin(), r2.getMin());
+                String newMax = order.max(r1.getMax(), r2.getMax());
+                newR = new DimensionRangeInfo(newMin, newMax);
+            }
+            
+            result.put(colId, newR);
+        }
+        
+        return result;
+    }
+    
+    // ============================================================================
+
+    @JsonProperty("min")
+    private String min;
+
+    @JsonProperty("max")
+    private String max;
+
+    public DimensionRangeInfo() {}
+
+    public DimensionRangeInfo(String min, String max) {
+        if (min == null && max != null || min != null && max == null)
+            throw new IllegalStateException();
+        
+        this.min = min;
+        this.max = max;
+    }
+
+    public String getMin() {
+        return min;
+    }
+
+    public String getMax() {
+        return max;
+    }
+    
+    @Override
+    public String toString() {
+        return "[" + min + ", " + max + "]";
+    }
+
+}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/common/SegmentPruner.java b/core-cube/src/main/java/org/apache/kylin/cube/common/SegmentPruner.java
new file mode 100644
index 0000000..098f334
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/common/SegmentPruner.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.common;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.DimensionRangeInfo;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataTypeOrder;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SegmentPruner {
+    private static final Logger logger = LoggerFactory.getLogger(SegmentPruner.class);
+
+    final private Set<CompareTupleFilter> mustTrueCompares;
+
+    public SegmentPruner(TupleFilter filter) {
+        this.mustTrueCompares = filter == null ? Collections.<CompareTupleFilter> emptySet()
+                : filter.findMustTrueCompareFilters();
+    }
+
+    public List<CubeSegment> listSegmentsForQuery(CubeInstance cube) {
+        List<CubeSegment> r = new ArrayList<>();
+        for (CubeSegment seg : cube.getSegments(SegmentStatusEnum.READY)) {
+            if (check(seg))
+                r.add(seg);
+        }
+        return r;
+    }
+    
+    public boolean check(CubeSegment seg) {
+        
+        if (seg.getInputRecords() == 0) {
+            if (seg.getConfig().isSkippingEmptySegments()) {
+                logger.debug("Prune segment {} due to 0 input record", seg);
+                return false;
+            } else {
+                logger.debug("Insist scan of segment {} having 0 input record", seg);
+            }
+        }
+
+        Map<String, DimensionRangeInfo> segDimRangInfoMap = seg.getDimensionRangeInfoMap();
+        for (CompareTupleFilter comp : mustTrueCompares) {
+            TblColRef col = comp.getColumn();
+            
+            DimensionRangeInfo dimRangeInfo = segDimRangInfoMap.get(col.getIdentity());
+            if (dimRangeInfo == null)
+                dimRangeInfo = tryDeduceRangeFromPartitionCol(seg, col);
+            if (dimRangeInfo == null)
+                continue;
+            
+            String minVal = dimRangeInfo.getMin();
+            String maxVal = dimRangeInfo.getMax();
+            
+            if (!satisfy(comp, minVal, maxVal)) {
+                logger.debug("Prune segment {} due to given filter", seg);
+                return false;
+            }
+        }
+
+        logger.debug("Pruner passed on segment {}", seg);
+        return true;
+    }
+
+    private DimensionRangeInfo tryDeduceRangeFromPartitionCol(CubeSegment seg, TblColRef col) {
+        DataModelDesc model = seg.getModel();
+        PartitionDesc part = model.getPartitionDesc();
+        
+        if (!part.isPartitioned())
+            return null;
+        if (!col.equals(part.getPartitionDateColumnRef()))
+            return null;
+        
+        // deduce the dim range from TSRange
+        TSRange tsRange = seg.getTSRange();
+        if (tsRange.start.isMin || tsRange.end.isMax)
+            return null; // DimensionRangeInfo cannot express infinite
+        
+        String min = tsRangeToStr(tsRange.start.v, part);
+        String max = tsRangeToStr(tsRange.end.v - 1, part); // note the -1, end side is exclusive
+        return new DimensionRangeInfo(min, max);
+    }
+
+    private String tsRangeToStr(long ts, PartitionDesc part) {
+        String value;
+        DataType partitionColType = part.getPartitionDateColumnRef().getType();
+        if (partitionColType.isDate()) {
+            value = DateFormat.formatToDateStr(ts);
+        } else if (partitionColType.isTimeFamily()) {
+            value = DateFormat.formatToTimeWithoutMilliStr(ts);
+        } else if (partitionColType.isStringFamily() || partitionColType.isIntegerFamily()) {//integer like 20160101
+            String partitionDateFormat = part.getPartitionDateFormat();
+            if (StringUtils.isEmpty(partitionDateFormat)) {
+                value = "" + ts;
+            } else {
+                value = DateFormat.formatToDateStr(ts, partitionDateFormat);
+            }
+        } else {
+            throw new RuntimeException("Type " + partitionColType + " is not valid partition column type");
+        }
+        return value;
+    }
+
+    private boolean satisfy(CompareTupleFilter comp, String minVal, String maxVal) {
+
+        TblColRef col = comp.getColumn();
+        DataTypeOrder order = col.getType().getOrder();
+        String filterVal = toString(comp.getFirstValue());
+
+        switch (comp.getOperator()) {
+        case EQ:
+        case IN:
+            String filterMin = order.min((Set<String>) comp.getValues());
+            String filterMax = order.max((Set<String>) comp.getValues());
+            return order.compare(filterMin, maxVal) <= 0 && order.compare(minVal, filterMax) <= 0;
+        case LT:
+            return order.compare(minVal, filterVal) < 0;
+        case LTE:
+            return order.compare(minVal, filterVal) <= 0;
+        case GT:
+            return order.compare(maxVal, filterVal) > 0;
+        case GTE:
+            return order.compare(maxVal, filterVal) >= 0;
+        case NEQ:
+        case NOTIN:
+        case ISNULL:
+        case ISNOTNULL:
+        default:
+            return true;
+        }
+    }
+
+    private String toString(Object v) {
+        return v == null ? null : v.toString();
+    }
+}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java
index 811d512..97e4dc3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java
@@ -29,7 +29,6 @@ import java.util.Set;
 
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.metadata.expression.TupleExpression;
@@ -49,14 +48,13 @@ public abstract class ScanRangePlannerBase {
     //GT 
     protected GTInfo gtInfo;
     protected TupleFilter gtFilter;
-    protected Pair<ByteArray, ByteArray> gtStartAndEnd;
-    protected TblColRef gtPartitionCol;
     protected ImmutableBitSet gtDimensions;
     protected ImmutableBitSet gtAggrGroups;
     protected ImmutableBitSet gtAggrMetrics;
     protected String[] gtAggrFuncs;
     protected TupleFilter havingFilter;
     protected boolean isPartitionColUsingDatetimeEncoding = true;
+    protected int onlyShardId = -1;
 
     protected RecordComparator rangeStartComparator;
     protected RecordComparator rangeEndComparator;
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/DimensionRangeInfoTest.java b/core-cube/src/test/java/org/apache/kylin/cube/DimensionRangeInfoTest.java
new file mode 100644
index 0000000..94acf9f
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/DimensionRangeInfoTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.DataModelManager;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DimensionRangeInfoTest extends LocalFileMetadataTestCase {
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testMergeRangeMap() {
+        DataModelDesc model = DataModelManager.getInstance(getTestConfig()).getDataModelDesc("ci_inner_join_model");
+        String colId = "TEST_KYLIN_FACT.CAL_DT";
+
+        // normal merge
+        {
+            Map<String, DimensionRangeInfo> m1 = new HashMap<>();
+            m1.put(colId, new DimensionRangeInfo("2012-01-01", "2012-05-31"));
+
+            Map<String, DimensionRangeInfo> m2 = new HashMap<>();
+            m2.put(colId, new DimensionRangeInfo("2012-06-01", "2013-06-30"));
+
+            DimensionRangeInfo r1 = DimensionRangeInfo.mergeRangeMap(model, m1, m2).get(colId);
+            Assert.assertEquals("2012-01-01", r1.getMin());
+            Assert.assertEquals("2013-06-30", r1.getMax());
+        }
+        
+        // missing column on one side
+        {
+            Map<String, DimensionRangeInfo> m1 = new HashMap<>();
+            m1.put(colId, new DimensionRangeInfo("2012-01-01", "2012-05-31"));
+
+            Map<String, DimensionRangeInfo> m2 = new HashMap<>();
+
+            Assert.assertTrue(DimensionRangeInfo.mergeRangeMap(model, m1, m2).isEmpty());
+        }
+        
+        // null min/max value (happens on empty segment, or all-null columns)
+        {
+            Map<String, DimensionRangeInfo> m1 = new HashMap<>();
+            m1.put(colId, new DimensionRangeInfo(null, null));
+
+            Map<String, DimensionRangeInfo> m2 = new HashMap<>();
+            m2.put(colId, new DimensionRangeInfo("2012-06-01", "2013-06-30"));
+
+            DimensionRangeInfo r1 = DimensionRangeInfo.mergeRangeMap(model, m1, m2).get(colId);
+            Assert.assertEquals("2012-06-01", r1.getMin());
+            Assert.assertEquals("2013-06-30", r1.getMax());
+        }
+        
+    }
+}
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/common/SegmentPrunerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/common/SegmentPrunerTest.java
new file mode 100644
index 0000000..7bf7a87
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/common/SegmentPrunerTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.common;
+
+import static org.apache.kylin.metadata.filter.TupleFilter.compare;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.common.util.SetAndUnsetSystemProp;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+public class SegmentPrunerTest extends LocalFileMetadataTestCase {
+    private CubeInstance cube;
+
+    @Before
+    public void setUp() {
+        this.createTestMetadata();
+        cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube("ssb_cube_with_dimention_range");
+    }
+
+    @After
+    public void after() {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testEmptySegment() {
+        CubeSegment seg = cube.getFirstSegment();
+        TblColRef col = cube.getModel().findColumn("CUSTOMER.C_NATION");
+
+        // a normal hit
+        TupleFilter f = compare(col, FilterOperatorEnum.EQ, "CHINA");
+        SegmentPruner segmentPruner = new SegmentPruner(f);
+        Assert.assertTrue(segmentPruner.check(seg));
+
+        // make the segment empty, it should be pruned
+        seg.setInputRecords(0);
+        Assert.assertFalse(segmentPruner.check(seg));
+    }
+
+    @Test
+    public void testDimensionRangeCheck() {
+        CubeSegment cubeSegment = cube.getSegments().getFirstSegment();
+
+        //integer
+        TblColRef qtyCol = cube.getModel().findColumn("V_LINEORDER.LO_QUANTITY");
+        TupleFilter constFilter_LO_QUANTITY0 = new ConstantTupleFilter(Sets.newHashSet("8", "18", "28"));//between min and max value
+        TupleFilter constFilter_LO_QUANTITY1 = new ConstantTupleFilter("1");//min value
+        TupleFilter constFilter_LO_QUANTITY2 = new ConstantTupleFilter("50");//max value
+        TupleFilter constFilter_LO_QUANTITY3 = new ConstantTupleFilter("0");//lt min value
+        TupleFilter constFilter_LO_QUANTITY4 = new ConstantTupleFilter("200");//gt max value
+        TupleFilter constFilter_LO_QUANTITY5 = new ConstantTupleFilter(Sets.newHashSet("51", "52", "53"));//gt max values
+
+        // is null
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.ISNULL);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertTrue(segmentPruner.check(cubeSegment));
+        }
+
+        //lt min value
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.LT, constFilter_LO_QUANTITY1);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertFalse(segmentPruner.check(cubeSegment));
+        }
+
+        //lte min value
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.LTE, constFilter_LO_QUANTITY1);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertTrue(segmentPruner.check(cubeSegment));
+        }
+
+        //lt max value
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.LT, constFilter_LO_QUANTITY2);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertTrue(segmentPruner.check(cubeSegment));
+        }
+
+        //gt max value
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.GT, constFilter_LO_QUANTITY2);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertFalse(segmentPruner.check(cubeSegment));
+        }
+
+        //gte max value
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.GTE, constFilter_LO_QUANTITY2);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertTrue(segmentPruner.check(cubeSegment));
+        }
+
+        //gt min value
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.GT, constFilter_LO_QUANTITY1);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertTrue(segmentPruner.check(cubeSegment));
+        }
+
+        //in over-max values
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.IN, constFilter_LO_QUANTITY5);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertFalse(segmentPruner.check(cubeSegment));
+        }
+
+        //in normal values
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.IN, constFilter_LO_QUANTITY0);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertTrue(segmentPruner.check(cubeSegment));
+        }
+
+        //lte under-min value
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.LTE, constFilter_LO_QUANTITY3);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertFalse(segmentPruner.check(cubeSegment));
+        }
+
+        //gte over-max value
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.GTE, constFilter_LO_QUANTITY4);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertFalse(segmentPruner.check(cubeSegment));
+        }
+    }
+
+    @Test
+    public void testLegacyCubeSeg() {
+        // legacy cube segments does not have DimensionRangeInfo, but with TSRange can do some pruning
+        CubeInstance cube = CubeManager.getInstance(getTestConfig())
+                .getCube("test_kylin_cube_without_slr_left_join_ready_2_segments");
+
+        TblColRef col = cube.getModel().findColumn("TEST_KYLIN_FACT.CAL_DT");
+        CubeSegment seg = cube.getSegments(SegmentStatusEnum.READY).get(0);
+        TSRange tsRange = seg.getTSRange();
+        long start = tsRange.start.v;
+
+        try (SetAndUnsetSystemProp sns = new SetAndUnsetSystemProp("kylin.query.skip-empty-segments", "false")) {
+            {
+                TupleFilter f = compare(col, FilterOperatorEnum.LTE, start);
+                SegmentPruner segmentPruner = new SegmentPruner(f);
+                Assert.assertTrue(segmentPruner.check(seg));
+            }
+            {
+                TupleFilter f = compare(col, FilterOperatorEnum.LT, start);
+                SegmentPruner segmentPruner = new SegmentPruner(f);
+                Assert.assertFalse(segmentPruner.check(seg));
+            }
+        }
+    }
+}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
index 16ea8ee..43b204a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 /**
  * 
@@ -92,7 +93,7 @@ public abstract class TupleFilter {
     public static CompareTupleFilter compare(TblColRef col, FilterOperatorEnum op, Object val) {
         CompareTupleFilter r = new CompareTupleFilter(op);
         r.addChild(new ColumnTupleFilter(col));
-        r.addChild(new ConstantTupleFilter(val));
+        r.addChild(val instanceof ConstantTupleFilter ? (ConstantTupleFilter) val : new ConstantTupleFilter(val));
         return r;
     }
 
@@ -309,6 +310,31 @@ public abstract class TupleFilter {
         }
     }
 
+    //find must true compareTupleFilter
+    public Set<CompareTupleFilter> findMustTrueCompareFilters() {
+        Set<CompareTupleFilter> result = Sets.newHashSet();
+        findMustTrueCompareFilters(this, result);
+        return result;
+    }
+
+    private void findMustTrueCompareFilters(TupleFilter filter, Set<CompareTupleFilter> result) {
+        if (filter instanceof CompareTupleFilter) {
+            if (((CompareTupleFilter) filter).getColumn() != null) {
+                result.add((CompareTupleFilter) filter);
+            }
+            return;
+        }
+        
+        if (filter instanceof LogicalTupleFilter) {
+            if (filter.getOperator() == FilterOperatorEnum.AND) {
+                for (TupleFilter child : filter.getChildren()) {
+                    findMustTrueCompareFilters(child, result);
+                }
+            }
+            return;
+        }
+    }
+    
     public abstract boolean isEvaluable();
 
     public abstract boolean evaluate(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs);
diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/DataTypeOrderTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/DataTypeOrderTest.java
new file mode 100644
index 0000000..7666ffe
--- /dev/null
+++ b/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/DataTypeOrderTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Set;
+
+public class DataTypeOrderTest {
+    @Test
+    public void testDataTypeOrder() {
+        DataType intType = DataType.getType("integer");
+        DataTypeOrder dataTypeOrder = intType.getOrder();
+        Set<String> integers = Sets.newHashSet("100000", "2", "1000", "100", "77", "10", "9", "2000000", "-10000", "0");
+        Assert.assertEquals("2000000", dataTypeOrder.max(integers));
+        Assert.assertEquals("-10000", dataTypeOrder.min(integers));
+
+        DataType doubleType = DataType.getType("double");
+        dataTypeOrder = doubleType.getOrder();
+        Set<String> doubels = Sets.newHashSet("1.1", "-299.5", "100000", "1.000", "4.000000001", "0.00", "-1000000.231231", "8000000",
+                "10", "10.00");
+        Assert.assertEquals("8000000", dataTypeOrder.max(doubels));
+        Assert.assertEquals("-1000000.231231", dataTypeOrder.min(doubels));
+
+        DataType datetimeType = DataType.getType("date");
+        dataTypeOrder = datetimeType.getOrder();
+        Set<String> datetimes = Sets.newHashSet("2010-01-02", "2888-08-09", "2018-05-26", "1527512082000", "2010-02-03 23:59:59",
+                "2000-12-12 12:00:00", "1970-01-19 00:18:32", "1998-12-02", "2018-05-28 10:00:00.255", "1995-09-20 20:00:00.220");
+        Assert.assertEquals("2888-08-09", dataTypeOrder.max(datetimes));
+        Assert.assertEquals("1970-01-19 00:18:32", dataTypeOrder.min(datetimes));
+
+        DataType stringType = new DataType("varchar", 256, 10);
+        dataTypeOrder = stringType.getOrder();
+        Set<String> strings = Sets.newHashSet(null, "", "中国", "China No.1", "神兽麒麟", "Rocket", "Apache Kylin", "google", "NULL",
+                "empty");
+        Assert.assertEquals("神兽麒麟", dataTypeOrder.max(strings));
+        Assert.assertEquals("", dataTypeOrder.min(strings));
+    }
+}
diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/filter/TupleFilterTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/filter/TupleFilterTest.java
index b1e8f62..95609eb 100644
--- a/core-metadata/src/test/java/org/apache/kylin/metadata/filter/TupleFilterTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/metadata/filter/TupleFilterTest.java
@@ -116,4 +116,27 @@ public class TupleFilterTest {
         r.put(col2, v2);
         return r;
     }
+
+    @Test
+    public void testMustTrueTupleFilter() {
+        TupleFilter andFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
+        TupleFilter andFilter2  = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
+        TupleFilter orFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.OR);
+        andFilter.addChild(andFilter2);
+        andFilter.addChild(orFilter);
+
+        Set<CompareTupleFilter> trueTupleFilters = andFilter.findMustTrueCompareFilters();
+        Assert.assertTrue(trueTupleFilters.isEmpty());
+
+        TupleFilter compFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
+        compFilter.addChild(new ColumnTupleFilter(TblColRef.newInnerColumn("test1", TblColRef.InnerDataTypeEnum.LITERAL)));
+        TupleFilter compFilter2 = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
+        compFilter2.addChild(new ColumnTupleFilter(TblColRef.newInnerColumn("test2", TblColRef.InnerDataTypeEnum.LITERAL)));
+        andFilter2.addChild(compFilter);
+        orFilter.addChild(compFilter2);
+        Assert.assertEquals(Sets.newHashSet(compFilter), andFilter.findMustTrueCompareFilters());
+        
+        Assert.assertEquals(Sets.newHashSet(compFilter2), compFilter2.findMustTrueCompareFilters());
+    }
+    
 }
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
index 5f86a45..229ef01 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
@@ -31,7 +31,6 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.common.FuzzyValueCombination;
 import org.apache.kylin.cube.cuboid.Cuboid;
@@ -39,7 +38,6 @@ import org.apache.kylin.cube.gridtable.CubeGridTable;
 import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
 import org.apache.kylin.cube.gridtable.RecordComparators;
 import org.apache.kylin.cube.gridtable.ScanRangePlannerBase;
-import org.apache.kylin.cube.gridtable.SegmentGTStartAndEnd;
 import org.apache.kylin.cube.kv.CubeDimEncMap;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.gridtable.GTInfo;
@@ -137,15 +135,9 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
         this.gtDynColumns = new ImmutableBitSet(tmpGtDynCols);
         this.gtRtAggrMetrics = mapping.makeGridTableColumns(tmpRtAggrMetrics);
 
-        if (cubeSegment.getModel().getPartitionDesc().isPartitioned()) {
-            int index = mapping.getIndexOf(cubeSegment.getModel().getPartitionDesc().getPartitionDateColumnRef());
-            if (index >= 0) {
-                SegmentGTStartAndEnd segmentGTStartAndEnd = new SegmentGTStartAndEnd(cubeSegment, gtInfo);
-                this.gtStartAndEnd = segmentGTStartAndEnd.getSegmentStartAndEnd(index);
-                this.isPartitionColUsingDatetimeEncoding = segmentGTStartAndEnd.isUsingDatetimeEncoding(index);
-                this.gtPartitionCol = gtInfo.colRef(index);
-            }
-        }
+        this.gtAggrGroups = mapping.makeGridTableColumns(replaceDerivedColumns(groupByPushDown, cubeSegment.getCubeDesc()));
+        this.gtAggrMetrics = mapping.makeGridTableColumns(metrics);
+        this.gtAggrFuncs = mapping.makeAggrFuncs(metrics);
     }
 
     protected StorageContext context;
@@ -153,7 +145,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
     /**
      * Construct  GTScanRangePlanner with incomplete information. For UT only.
      */
-    public CubeScanRangePlanner(GTInfo info, Pair<ByteArray, ByteArray> gtStartAndEnd, TblColRef gtPartitionCol, TupleFilter gtFilter) {
+    public CubeScanRangePlanner(GTInfo info, TblColRef gtPartitionCol, TupleFilter gtFilter) {
 
         this.maxScanRanges = KylinConfig.getInstanceFromEnv().getQueryStorageVisitScanRangeMax();
         this.maxFuzzyKeysPerSplit = KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeyMax();
@@ -170,8 +162,6 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
         this.rangeStartEndComparator = RecordComparators.getRangeStartEndComparator(comp);
 
         this.gtFilter = gtFilter;
-        this.gtStartAndEnd = gtStartAndEnd;
-        this.gtPartitionCol = gtPartitionCol;
     }
 
     public GTScanRequest planScanRequest() {
@@ -237,18 +227,6 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
         Map<Integer, Set<ByteArray>> fuzzyValues = Maps.newHashMap();
 
         for (ColumnRange range : andDimRanges) {
-            if (gtPartitionCol != null && range.column.equals(gtPartitionCol)) {
-                int beginCompare = rangeStartEndComparator.comparator.compare(range.begin, gtStartAndEnd.getSecond());
-                int endCompare = rangeStartEndComparator.comparator.compare(gtStartAndEnd.getFirst(), range.end);
-
-                if ((isPartitionColUsingDatetimeEncoding && endCompare <= 0 && beginCompare < 0) || (!isPartitionColUsingDatetimeEncoding && endCompare <= 0 && beginCompare <= 0)) {
-                    //segment range is [Closed,Open), but segmentStartAndEnd.getSecond() might be rounded when using dict encoding, so use <= when has equals in condition. 
-                } else {
-                    logger.debug("Pre-check partition col filter failed, partitionColRef {}, segment start {}, segment end {}, range begin {}, range end {}", //
-                            gtPartitionCol, makeReadable(gtStartAndEnd.getFirst()), makeReadable(gtStartAndEnd.getSecond()), makeReadable(range.begin), makeReadable(range.end));
-                    return null;
-                }
-            }
 
             int col = range.column.getColumnDesc().getZeroBasedIndex();
             if (!gtInfo.getPrimaryKey().get(col))
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 352a868..269833f 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -33,6 +33,7 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.RawQueryLastHacker;
+import org.apache.kylin.cube.common.SegmentPruner;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
 import org.apache.kylin.cube.gridtable.CuboidToGridTableMappingExt;
@@ -87,14 +88,10 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         GTCubeStorageQueryRequest request = getStorageQueryRequest(context, sqlDigest, returnTupleInfo);
 
         List<CubeSegmentScanner> scanners = Lists.newArrayList();
-        for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
+        SegmentPruner segPruner = new SegmentPruner(sqlDigest.filter);
+        for (CubeSegment cubeSeg : segPruner.listSegmentsForQuery(cubeInstance)) {
             CubeSegmentScanner scanner;
 
-            if (cubeDesc.getConfig().isSkippingEmptySegments() && cubeSeg.getInputRecords() == 0) {
-                logger.info("Skip cube segment {} because its input record is 0", cubeSeg);
-                continue;
-            }
-
             scanner = new CubeSegmentScanner(cubeSeg, request.getCuboid(), request.getDimensions(), //
                     request.getGroups(), request.getDynGroups(), request.getDynGroupExprs(), //
                     request.getMetrics(), request.getDynFuncs(), //
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
index 073c12c..08bcb65 100644
--- a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
+++ b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
@@ -32,7 +32,6 @@ import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.gridtable.CubeCodeSystem;
 import org.apache.kylin.dict.NumberDictionaryForestBuilder;
 import org.apache.kylin.dict.StringBytesConverter;
@@ -121,13 +120,11 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
 
         ByteArray segmentStart = enc(info, 0, "2015-01-14");
         ByteArray segmentStartX = enc(info, 0, "2015-01-14 00:00:00");//when partition col is dict encoded, time format will be free
-        ByteArray segmentEnd = enc(info, 0, "2015-01-15");
         assertEquals(segmentStart, segmentStartX);
 
         {
             LogicalTupleFilter filter = and(timeComp0, ageComp1);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals(1, r.size());//scan range are [close,close]
             assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
@@ -136,64 +133,57 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         }
         {
             LogicalTupleFilter filter = and(timeComp2, ageComp1);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(0, r.size());
+            assertEquals(1, r.size());
         }
         {
             LogicalTupleFilter filter = and(timeComp4, ageComp1);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(0, r.size());
+            assertEquals(1, r.size());
         }
         {
             LogicalTupleFilter filter = and(timeComp5, ageComp1);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(0, r.size());
+            assertEquals(1, r.size());
         }
         {
             LogicalTupleFilter filter = or(and(timeComp2, ageComp1), and(timeComp1, ageComp1),
                     and(timeComp6, ageComp1));
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(1, r.size());
-            assertEquals("[1421193600000, 10]-[null, 10]", r.get(0).toString());
+            assertEquals(2, r.size());
+            assertEquals("[1421193600000, 10]-[null, 10]", r.get(1).toString());
             assertEquals("[[null, 10, null, null, null], [1421193600000, 10, null, null, null]]",
-                    r.get(0).fuzzyKeys.toString());
+                    r.get(1).fuzzyKeys.toString());
         }
         {
             LogicalTupleFilter filter = or(and(timeComp3, ageComp3), and(timeComp7, ageComp1));
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals("[[0, 10]-[1421280000000, 30]]", r.toString());
         }
         {
             LogicalTupleFilter filter = or(timeComp2, timeComp1, timeComp6);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(1, r.size());
-            assertEquals("[1421193600000, null]-[null, null]", r.get(0).toString());
-            assertEquals(0, r.get(0).fuzzyKeys.size());
+            assertEquals(2, r.size());
+            assertEquals("[1421193600000, null]-[null, null]", r.get(1).toString());
+            assertEquals(0, r.get(1).fuzzyKeys.size());
         }
         {
             //skip FALSE filter
             LogicalTupleFilter filter = and(ageComp1, ConstantTupleFilter.FALSE);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals(0, r.size());
         }
         {
             //TRUE or FALSE filter
             LogicalTupleFilter filter = or(ConstantTupleFilter.TRUE, ConstantTupleFilter.FALSE);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals(1, r.size());
             assertEquals("[null, null]-[null, null]", r.get(0).toString());
@@ -201,8 +191,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         {
             //TRUE or other filter
             LogicalTupleFilter filter = or(ageComp1, ConstantTupleFilter.TRUE);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals(1, r.size());
             assertEquals("[null, null]-[null, null]", r.get(0).toString());
@@ -211,12 +200,9 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
 
     @Test
     public void verifySegmentSkipping2() {
-        ByteArray segmentEnd = enc(info, 0, "2015-01-15");
-
         {
             LogicalTupleFilter filter = and(timeComp0, ageComp1);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals(1, r.size());//scan range are [close,close]
             assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
@@ -226,10 +212,9 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
 
         {
             LogicalTupleFilter filter = and(timeComp5, ageComp1);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(0, r.size());//scan range are [close,close]
+            assertEquals(1, r.size());//scan range are [close,close]
         }
     }
 
@@ -239,7 +224,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         // flatten or-and & hbase fuzzy value
         {
             LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2));
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals(1, r.size());
             assertEquals("[1421193600000, 10]-[null, 20]", r.get(0).toString());
@@ -249,7 +234,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         // pre-evaluate ever false
         {
             LogicalTupleFilter filter = and(timeComp1, timeComp2);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals(0, r.size());
         }
@@ -257,7 +242,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         // pre-evaluate ever true
         {
             LogicalTupleFilter filter = or(timeComp1, ageComp4);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals("[[null, null]-[null, null]]", r.toString());
         }
@@ -265,7 +250,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         // merge overlap range
         {
             LogicalTupleFilter filter = or(timeComp1, timeComp3);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals("[[null, null]-[null, null]]", r.toString());
         }
@@ -274,7 +259,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         {
             LogicalTupleFilter filter = or(and(timeComp4, ageComp1), and(timeComp4, ageComp2),
                     and(timeComp4, ageComp3));
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals(3, r.size());
             assertEquals("[1421280000000, 10]-[1421280000000, 10]", r.get(0).toString());
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
index f3bdabd..4305a25 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
@@ -104,7 +104,6 @@ public class CalculateStatsFromBaseCuboidJob extends AbstractHadoopJob {
 
     private void setupReducer(Path output, CubeSegment cubeSeg) throws IOException {
         int hllShardBase = MapReduceUtil.getCuboidHLLCounterReducerNum(cubeSeg.getCubeInstance());
-        job.getConfiguration().setInt(BatchConstants.CFG_HLL_REDUCER_NUM, hllShardBase);
 
         job.setReducerClass(CalculateStatsFromBaseCuboidReducer.class);
         job.setOutputFormatClass(SequenceFileOutputFormat.class);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
index 9bede82..4ea04d5 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
@@ -55,8 +55,7 @@ public class FactDistinctColumnPartitioner extends Partitioner<SelfDefineSortabl
         String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
         CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
 
-        reducerMapping = new FactDistinctColumnsReducerMapping(cube,
-                conf.getInt(BatchConstants.CFG_HLL_REDUCER_NUM, 1));
+        reducerMapping = new FactDistinctColumnsReducerMapping(cube);
     }
 
     @Override
@@ -65,8 +64,6 @@ public class FactDistinctColumnPartitioner extends Partitioner<SelfDefineSortabl
         if (key.getBytes()[0] == FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER) {
             Long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG);
             return reducerMapping.getReducerIdForCuboidRowCount(cuboidId);
-        } else if (key.getBytes()[0] == FactDistinctColumnsReducerMapping.MARK_FOR_PARTITION_COL) {
-            return reducerMapping.getReducerIdForDatePartitionColumn();
         } else {
             return BytesUtil.readUnsigned(key.getBytes(), 0, 1);
         }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index f96944a..8f5d176 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -131,6 +131,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
             throws IOException {
         FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(cubeSeg.getCubeInstance());
         int numberOfReducers = reducerMapping.getTotalReducerNum();
+        logger.info("{} has reducers {}.", this.getClass().getName(), numberOfReducers);
         if (numberOfReducers > 250) {
             throw new IllegalArgumentException(
                     "The max reducer number for FactDistinctColumnsJob is 250, but now it is "
@@ -141,7 +142,6 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
         job.setReducerClass(FactDistinctColumnsReducer.class);
         job.setPartitionerClass(FactDistinctColumnPartitioner.class);
         job.setNumReduceTasks(numberOfReducers);
-        job.getConfiguration().setInt(BatchConstants.CFG_HLL_REDUCER_NUM, reducerMapping.getCuboidRowCounterReducerNum());
 
         // make each reducer output to respective dir
         MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class, NullWritable.class, Text.class);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
index 569b810..fc9dc65 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
@@ -37,7 +37,6 @@ import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.hllc.HLLCounter;
 import org.apache.kylin.measure.hllc.RegisterType;
 import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,9 +67,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
 
     private static final Text EMPTY_TEXT = new Text();
 
-    private int partitionColumnIndex = -1;
-    private boolean needFetchPartitionCol = true;
-
     private SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
 
     @Override
@@ -96,18 +92,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
             allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision(), RegisterType.DENSE);
         }
 
-        TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
-        if (partitionColRef != null) {
-            partitionColumnIndex = intermediateTableDesc.getColumnIndex(partitionColRef);
-        }
-
-        // check whether need fetch the partition col values
-        if (partitionColumnIndex < 0) {
-            // if partition col not on cube, no need
-            needFetchPartitionCol = false;
-        } else {
-            needFetchPartitionCol = true;
-        }
         //for KYLIN-2518 backward compatibility
         boolean isUsePutRowKeyToHllNewAlgorithm;
         if (KylinVersion.isBefore200(cubeDesc.getVersion())) {
@@ -172,12 +156,12 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
 
         for (String[] row : rowCollection) {
             context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row));
-            for (int i = 0; i < dictCols.size(); i++) {
-                String fieldValue = row[dictionaryColumnIndex[i]];
+            for (int i = 0; i < allDimDictCols.size(); i++) {
+                String fieldValue = row[columnIndex[i]];
                 if (fieldValue == null)
                     continue;
 
-                int reducerIndex = reducerMapping.getReducerIdForDictCol(i, fieldValue);
+                int reducerIndex = reducerMapping.getReducerIdForCol(i, fieldValue);
                 
                 tmpbuf.clear();
                 byte[] valueBytes = Bytes.toBytes(fieldValue);
@@ -188,15 +172,14 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
                 tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
                 tmpbuf.put(valueBytes);
                 outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
-                DataType type = dictCols.get(i).getType();
+                DataType type = allDimDictCols.get(i).getType();
                 sortableKey.init(outputKey, type);
-                //judge type
                 context.write(sortableKey, EMPTY_TEXT);
 
                 // log a few rows for troubleshooting
                 if (rowCount < 10) {
                     logger.info(
-                            "Sample output: " + dictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex);
+                            "Sample output: " + allDimDictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex);
                 }
             }
 
@@ -204,22 +187,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
                 putRowKeyToHLL(row);
             }
 
-            if (needFetchPartitionCol == true) {
-                String fieldValue = row[partitionColumnIndex];
-                if (fieldValue != null) {
-                    tmpbuf.clear();
-                    byte[] valueBytes = Bytes.toBytes(fieldValue);
-                    int size = valueBytes.length + 1;
-                    if (size >= tmpbuf.capacity()) {
-                        tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size));
-                    }
-                    tmpbuf.put((byte) FactDistinctColumnsReducerMapping.MARK_FOR_PARTITION_COL);
-                    tmpbuf.put(valueBytes);
-                    outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
-                    sortableKey.init(outputKey, (byte) 0);
-                    context.write(sortableKey, EMPTY_TEXT);
-                }
-            }
             rowCount++;
         }
     }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index c66042b..ceddeb5 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -39,8 +39,6 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.metadata.model.TblColRef;
 
-import com.google.common.collect.Lists;
-
 /**
  */
 abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, SelfDefineSortableKey, Text> {
@@ -50,8 +48,8 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends Kyli
     protected CubeSegment cubeSeg;
     protected CubeDesc cubeDesc;
     protected long baseCuboidId;
-    protected List<TblColRef> dictCols;
     protected IMRTableInputFormat flatTableInputFormat;
+    protected List<TblColRef> allDimDictCols;
 
     protected Text outputKey = new Text();
     //protected SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
@@ -59,7 +57,7 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends Kyli
     protected int errorRecordCounter = 0;
 
     protected CubeJoinedFlatTableEnrich intermediateTableDesc;
-    protected int[] dictionaryColumnIndex;
+    protected int[] columnIndex;
 
     protected FactDistinctColumnsReducerMapping reducerMapping;
     
@@ -74,20 +72,18 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends Kyli
         cubeSeg = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID));
         cubeDesc = cube.getDescriptor();
         baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        dictCols = Lists.newArrayList(cubeDesc.getAllColumnsNeedDictionaryBuilt());
+        reducerMapping = new FactDistinctColumnsReducerMapping(cube);
+        allDimDictCols = reducerMapping.getAllDimDictCols();
 
         flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
 
         intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc);
-        dictionaryColumnIndex = new int[dictCols.size()];
-        for (int i = 0; i < dictCols.size(); i++) {
-            TblColRef colRef = dictCols.get(i);
+        columnIndex = new int[allDimDictCols.size()];
+        for (int i = 0; i < allDimDictCols.size(); i++) {
+            TblColRef colRef = allDimDictCols.get(i);
             int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef);
-            dictionaryColumnIndex[i] = columnIndexOnFlatTbl;
+            columnIndex[i] = columnIndexOnFlatTbl;
         }
-
-        reducerMapping = new FactDistinctColumnsReducerMapping(cube,
-                conf.getInt(BatchConstants.CFG_HLL_REDUCER_NUM, 1));
     }
 
     protected void handleErrorRecord(String[] record, Exception ex) throws IOException {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 801771a..61ba247 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -71,17 +70,16 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
     private boolean isStatistics = false;
     private KylinConfig cubeConfig;
     private int taskId;
-    private boolean isPartitionCol = false;
     private int rowCount = 0;
     private FactDistinctColumnsReducerMapping reducerMapping;
 
     //local build dict
     private boolean buildDictInReducer;
     private IDictionaryBuilder builder;
-    private long timeMaxValue = Long.MIN_VALUE;
-    private long timeMinValue = Long.MAX_VALUE;
+    private String maxValue = null;
+    private String minValue = null;
     public static final String DICT_FILE_POSTFIX = ".rldict";
-    public static final String PARTITION_COL_INFO_FILE_POSTFIX = ".pci";
+    public static final String DIMENSION_COL_INFO_FILE_POSTFIX = ".dci";
 
     private MultipleOutputs mos;
 
@@ -97,11 +95,9 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
         cubeConfig = cube.getConfig();
         cubeDesc = cube.getDescriptor();
 
-        int numberOfTasks = context.getNumReduceTasks();
         taskId = context.getTaskAttemptID().getTaskID().getId();
 
-        reducerMapping = new FactDistinctColumnsReducerMapping(cube,
-                conf.getInt(BatchConstants.CFG_HLL_REDUCER_NUM, 1));
+        reducerMapping = new FactDistinctColumnsReducerMapping(cube);
         
         logger.info("reducer no " + taskId + ", role play " + reducerMapping.getRolePlayOfReducer(taskId));
 
@@ -114,18 +110,9 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
             samplingPercentage = Integer
                     .parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
             logger.info("Reducer " + taskId + " handling stats");
-        } else if (reducerMapping.isPartitionColReducer(taskId)) {
-            // partition col
-            isPartitionCol = true;
-            col = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
-            if (col == null) {
-                logger.info("No partition col. This reducer will do nothing");
-            } else {
-                logger.info("Reducer " + taskId + " handling partition col " + col.getIdentity());
-            }
         } else {
             // normal col
-            col = reducerMapping.getDictColForReducer(taskId);
+            col = reducerMapping.getColForReducer(taskId);
             Preconditions.checkNotNull(col);
 
             // local build dict
@@ -133,7 +120,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
             if (cubeDesc.getDictionaryBuilderClass(col) != null) { // only works with default dictionary builder
                 buildDictInReducer = false;
             }
-            if (reducerMapping.getReducerNumForDictCol(col) > 1) {
+            if (reducerMapping.getReducerNumForDimCol(col) > 1) {
                 buildDictInReducer = false; // only works if this is the only reducer of a dictionary column
             }
             if (buildDictInReducer) {
@@ -167,24 +154,29 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
                     cuboidHLLMap.put(cuboidId, hll);
                 }
             }
-        } else if (isPartitionCol) {
-            // partition col
+        } else {
             String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1);
             logAFewRows(value);
-            long time = DateFormat.stringToMillis(value);
-            timeMinValue = Math.min(timeMinValue, time);
-            timeMaxValue = Math.max(timeMaxValue, time);
-        } else {
-            // normal col
-            if (buildDictInReducer) {
-                String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1);
-                logAFewRows(value);
-                builder.addValue(value);
-            } else {
-                byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
-                // output written to baseDir/colName/-r-00000 (etc)
-                String fileName = col.getIdentity() + "/";
-                mos.write(BatchConstants.CFG_OUTPUT_COLUMN, NullWritable.get(), new Text(keyBytes), fileName);
+            // if dimension col, compute max/min value
+            if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
+                if (minValue == null || col.getType().compare(minValue, value) > 0) {
+                    minValue = value;
+                }
+                if (maxValue == null || col.getType().compare(maxValue, value) < 0) {
+                    maxValue = value;
+                }
+            }
+
+            //if dict column
+            if (cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) {
+                if (buildDictInReducer) {
+                    builder.addValue(value);
+                } else {
+                    byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
+                    // output written to baseDir/colName/-r-00000 (etc)
+                    String fileName = col.getIdentity() + "/";
+                    mos.write(BatchConstants.CFG_OUTPUT_COLUMN, NullWritable.get(), new Text(keyBytes), fileName);
+                }
             }
         }
 
@@ -207,11 +199,12 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
 
             logMapperAndCuboidStatistics(allCuboids); // for human check
             outputStatistics(allCuboids);
-        } else if (isPartitionCol) {
-            // partition col
-            outputPartitionInfo();
         } else {
-            // normal col
+            //dimension col
+            if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
+                outputDimRangeInfo();
+            }
+            // dic col
             if (buildDictInReducer) {
                 Dictionary<String> dict = builder.build();
                 outputDict(col, dict);
@@ -221,14 +214,17 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
         mos.close();
     }
 
-    private void outputPartitionInfo() throws IOException, InterruptedException {
-        if (col != null) {
-            // output written to baseDir/colName/colName.pci-r-00000 (etc)
-            String partitionFileName = col.getIdentity() + "/" + col.getName() + PARTITION_COL_INFO_FILE_POSTFIX;
-
-            mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMinValue), partitionFileName);
-            mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMaxValue), partitionFileName);
-            logger.info("write partition info for col : " + col.getName() + "  minValue:" + timeMinValue + " maxValue:" + timeMaxValue);
+    private void outputDimRangeInfo() throws IOException, InterruptedException {
+        if (col != null && minValue != null) {
+            // output written to baseDir/colName/colName.dci-r-00000 (etc)
+            String dimRangeFileName = col.getIdentity() + "/" + col.getName() + DIMENSION_COL_INFO_FILE_POSTFIX;
+
+            mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new Text(minValue.getBytes()),
+                    dimRangeFileName);
+            mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new Text(maxValue.getBytes()),
+                    dimRangeFileName);
+            logger.info("write dimension range info for col : " + col.getName() + "  minValue:" + minValue + " maxValue:"
+                    + maxValue);
         }
     }
 
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java
index 51594c3..b60e4ce 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java
@@ -18,73 +18,74 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.common.MapReduceUtil;
 import org.apache.kylin.metadata.model.TblColRef;
 
+import com.google.common.collect.Lists;
+
 /**
  * Reducers play different roles based on reducer-id:
- * - (start from 0) one reducer for each dictionary column, UHC may have more than one reducer
- * - one reducer to get min/max of date partition column
+ * - (start from 0) one reducer for each dimension column, dictionary column, UHC may have more than one reducer
  * - (at the end) one or more reducers to collect row counts for cuboids using HLL
  */
 public class FactDistinctColumnsReducerMapping {
 
-    public static final int MARK_FOR_PARTITION_COL = -2;
     public static final int MARK_FOR_HLL_COUNTER = -1;
 
-    final private int nDictValueCollectors;
-    final private int datePartitionReducerId;
     final private int nCuboidRowCounters;
+    final private int nDimReducers;
     final private int nTotalReducers;
 
-    final private List<TblColRef> allDictCols;
-    final private int[] dictColIdToReducerBeginId;
+    final private List<TblColRef> allDimDictCols = Lists.newArrayList();
+    final private int[] colIdToReducerBeginId;
     final private int[] reducerRolePlay; // >=0 for dict col id, <0 for partition col and hll counter (using markers)
 
     public FactDistinctColumnsReducerMapping(CubeInstance cube) {
         this(cube, 0);
     }
 
-    public FactDistinctColumnsReducerMapping(CubeInstance cube, int cuboidRowCounterReducerNum) {
+    private FactDistinctColumnsReducerMapping(CubeInstance cube, int cuboidRowCounterReducerNum) {
         CubeDesc desc = cube.getDescriptor();
+        Set<TblColRef> allCols = cube.getAllColumns();
+        Set<TblColRef> dictCols = desc.getAllColumnsNeedDictionaryBuilt();
+        List<TblColRef> dimCols = desc.listDimensionColumnsExcludingDerived(true);
+        for (TblColRef colRef : allCols) {
+            if (dictCols.contains(colRef)) {
+                allDimDictCols.add(colRef);
+            } else if (dimCols.indexOf(colRef) >= 0){
+                allDimDictCols.add(colRef);
+            }
+        }
 
-        allDictCols = new ArrayList(desc.getAllColumnsNeedDictionaryBuilt());
-
-        dictColIdToReducerBeginId = new int[allDictCols.size() + 1];
+        colIdToReducerBeginId = new int[allDimDictCols.size() + 1];
 
         int uhcReducerCount = cube.getConfig().getUHCReducerCount();
         List<TblColRef> uhcList = desc.getAllUHCColumns();
         int counter = 0;
-        for (int i = 0; i < allDictCols.size(); i++) {
-            dictColIdToReducerBeginId[i] = counter;
-            boolean isUHC = uhcList.contains(allDictCols.get(i));
+        for (int i = 0; i < allDimDictCols.size(); i++) {
+            colIdToReducerBeginId[i] = counter;
+            boolean isUHC = uhcList.contains(allDimDictCols.get(i));
             counter += (isUHC) ? uhcReducerCount : 1;
         }
-
-        dictColIdToReducerBeginId[allDictCols.size()] = counter;
-        nDictValueCollectors = counter;
-        datePartitionReducerId = counter;
+        colIdToReducerBeginId[allDimDictCols.size()] = counter;
+        nDimReducers = counter;
 
         nCuboidRowCounters = cuboidRowCounterReducerNum == 0 ? //
                 MapReduceUtil.getCuboidHLLCounterReducerNum(cube) : cuboidRowCounterReducerNum;
-        nTotalReducers = nDictValueCollectors + 1 + nCuboidRowCounters;
+        nTotalReducers = nDimReducers + nCuboidRowCounters;
 
         reducerRolePlay = new int[nTotalReducers];
         for (int i = 0, dictId = 0; i < nTotalReducers; i++) {
-            if (i > datePartitionReducerId) {
+            if (i >= nDimReducers) {
                 // cuboid HLL counter reducer
                 reducerRolePlay[i] = MARK_FOR_HLL_COUNTER;
-            } else if (i == datePartitionReducerId) {
-                // date partition min/max reducer
-                reducerRolePlay[i] = MARK_FOR_PARTITION_COL;
             } else {
-                // dict value collector reducer
-                if (i == dictColIdToReducerBeginId[dictId + 1])
+                if (i == colIdToReducerBeginId[dictId + 1])
                     dictId++;
 
                 reducerRolePlay[i] = dictId;
@@ -92,8 +93,8 @@ public class FactDistinctColumnsReducerMapping {
         }
     }
     
-    public List<TblColRef> getAllDictCols() {
-        return allDictCols;
+    public List<TblColRef> getAllDimDictCols() {
+        return allDimDictCols;
     }
     
     public int getTotalReducerNum() {
@@ -104,9 +105,9 @@ public class FactDistinctColumnsReducerMapping {
         return nCuboidRowCounters;
     }
 
-    public int getReducerIdForDictCol(int dictColId, Object fieldValue) {
-        int begin = dictColIdToReducerBeginId[dictColId];
-        int span = dictColIdToReducerBeginId[dictColId + 1] - begin;
+    public int getReducerIdForCol(int colId, Object fieldValue) {
+        int begin = colIdToReducerBeginId[colId];
+        int span = colIdToReducerBeginId[colId + 1] - begin;
         
         if (span == 1)
             return begin;
@@ -120,37 +121,29 @@ public class FactDistinctColumnsReducerMapping {
     }
 
     public int getRolePlayOfReducer(int reducerId) {
-        return reducerRolePlay[reducerId];
+        return reducerRolePlay[reducerId % nTotalReducers];
     }
     
     public boolean isCuboidRowCounterReducer(int reducerId) {
         return getRolePlayOfReducer(reducerId) == MARK_FOR_HLL_COUNTER;
     }
-    
-    public boolean isPartitionColReducer(int reducerId) {
-        return getRolePlayOfReducer(reducerId) == MARK_FOR_PARTITION_COL;
-    }
 
-    public TblColRef getDictColForReducer(int reducerId) {
-        int role = getRolePlayOfReducer(reducerId);
+    public TblColRef getColForReducer(int reducerId) {
+        int role = getRolePlayOfReducer(reducerId % nTotalReducers);
         if (role < 0)
             throw new IllegalStateException();
         
-        return allDictCols.get(role);
-    }
-
-    public int getReducerNumForDictCol(TblColRef col) {
-        int dictColId = allDictCols.indexOf(col);
-        return dictColIdToReducerBeginId[dictColId + 1] - dictColIdToReducerBeginId[dictColId];
+        return allDimDictCols.get(role);
     }
 
-    public int getReducerIdForDatePartitionColumn() {
-        return datePartitionReducerId;
+    public int getReducerNumForDimCol(TblColRef col) {
+        int dictColId = allDimDictCols.indexOf(col);
+        return colIdToReducerBeginId[dictColId + 1] - colIdToReducerBeginId[dictColId];
     }
 
     public int getReducerIdForCuboidRowCount(long cuboidId) {
         int rowCounterId = (int) (Math.abs(cuboidId) % nCuboidRowCounters);
-        return datePartitionReducerId + 1 + rowCounterId;
+        return nDimReducers + rowCounterId;
     }
     
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index f749c80..fdb19db 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -23,15 +23,18 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.DimensionRangeInfo;
 import org.apache.kylin.cube.model.SnapshotTableDesc;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.LookupMaterializeContext;
@@ -40,11 +43,14 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.datatype.DataTypeOrder;
 import org.apache.kylin.metadata.model.SegmentRange.TSRange;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Sets;
+
 /**
  */
 public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
@@ -76,9 +82,7 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
 
         try {
             saveExtSnapshotIfNeeded(cubeManager, cube, segment);
-            if (segment.isOffsetCube()) {
-                updateTimeRange(segment);
-            }
+            updateSegment(segment);
 
             cubeManager.promoteNewlyBuiltSegments(cube, segment);
             return new ExecuteResult();
@@ -115,40 +119,51 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
         }
     }
 
-    private void updateTimeRange(CubeSegment segment) throws IOException {
+    private void updateSegment(CubeSegment segment) throws IOException {
         final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
 
-        if (partitionCol == null) {
-            return;
-        }
-        final String factColumnsInputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
-        Path colDir = new Path(factColumnsInputPath, partitionCol.getIdentity());
-        FileSystem fs = HadoopUtil.getWorkingFileSystem();
-        Path outputFile = HadoopUtil.getFilterOnlyPath(fs, colDir,
-                partitionCol.getName() + FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX);
-        if (outputFile == null) {
-            throw new IOException("fail to find the partition file in base dir: " + colDir);
-        }
-
-        FSDataInputStream is = null;
-        BufferedReader bufferedReader = null;
-        InputStreamReader isr = null;
-        long minValue, maxValue;
-        try {
-            is = fs.open(outputFile);
-            isr = new InputStreamReader(is);
-            bufferedReader = new BufferedReader(isr);
-            minValue = Long.parseLong(bufferedReader.readLine());
-            maxValue = Long.parseLong(bufferedReader.readLine());
-        } finally {
-            IOUtils.closeQuietly(is);
-            IOUtils.closeQuietly(isr);
-            IOUtils.closeQuietly(bufferedReader);
-        }
+        for (TblColRef dimColRef : segment.getCubeDesc().listDimensionColumnsExcludingDerived(true)) {
+            final String factColumnsInputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
+            Path colDir = new Path(factColumnsInputPath, dimColRef.getIdentity());
+            FileSystem fs = HadoopUtil.getWorkingFileSystem();
+
+            //handle multiple reducers
+            Path[] outputFiles = HadoopUtil.getFilteredPath(fs, colDir,
+                    dimColRef.getName() + FactDistinctColumnsReducer.DIMENSION_COL_INFO_FILE_POSTFIX);
+            if (outputFiles == null || outputFiles.length == 0) {
+                segment.getDimensionRangeInfoMap().put(dimColRef.getIdentity(), new DimensionRangeInfo(null, null));
+                continue;
+            }
 
-        logger.info("updateTimeRange step. minValue:" + minValue + " maxValue:" + maxValue);
-        if (minValue != timeMinValue && maxValue != timeMaxValue) {
-            segment.setTSRange(new TSRange(minValue, maxValue + 1));
+            FSDataInputStream is = null;
+            BufferedReader bufferedReader = null;
+            InputStreamReader isr = null;
+            Set<String> minValues = Sets.newHashSet(), maxValues = Sets.newHashSet();
+            for (Path outputFile : outputFiles) {
+                try {
+                    is = fs.open(outputFile);
+                    isr = new InputStreamReader(is);
+                    bufferedReader = new BufferedReader(isr);
+                    minValues.add(bufferedReader.readLine());
+                    maxValues.add(bufferedReader.readLine());
+                } finally {
+                    IOUtils.closeQuietly(is);
+                    IOUtils.closeQuietly(isr);
+                    IOUtils.closeQuietly(bufferedReader);
+                }
+            }
+            DataTypeOrder order = dimColRef.getType().getOrder();
+            String minValue = order.min(minValues);
+            String maxValue = order.max(maxValues);
+            logger.info("updateSegment step. {} minValue:" + minValue + " maxValue:" + maxValue, dimColRef.getName());
+
+            if (segment.isOffsetCube() && partitionCol != null && partitionCol.getIdentity().equals(dimColRef.getIdentity())) {
+                logger.info("update partition. {} timeMinValue:" + minValue + " timeMaxValue:" + maxValue, dimColRef.getName());
+                if (DateFormat.stringToMillis(minValue) != timeMinValue && DateFormat.stringToMillis(maxValue) != timeMaxValue) {
+                    segment.setTSRange(new TSRange(DateFormat.stringToMillis(minValue), DateFormat.stringToMillis(maxValue) + 1));
+                }
+            }
+            segment.getDimensionRangeInfoMap().put(dimColRef.getIdentity(), new DimensionRangeInfo(minValue, maxValue));
         }
     }
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
index e7aec8c..56f5077 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
@@ -20,10 +20,12 @@ package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.DimensionRangeInfo;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.exception.SegmentNotFoundException;
 import org.apache.kylin.job.exception.ExecuteException;
@@ -66,8 +68,19 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
 
         boolean isOffsetCube = mergedSegment.isOffsetCube();
         Long tsStartMin = Long.MAX_VALUE, tsEndMax = 0L;
+        Map<String, DimensionRangeInfo> mergedSegDimRangeMap = null;
         for (String id : mergingSegmentIds) {
             CubeSegment segment = cube.getSegmentById(id);
+            Map<String, DimensionRangeInfo> segDimRangeMap = segment.getDimensionRangeInfoMap();
+            if (segDimRangeMap.isEmpty()) {
+                continue;
+            }
+            if (mergedSegDimRangeMap == null) {
+                mergedSegDimRangeMap = segDimRangeMap;
+            } else {
+                mergedSegDimRangeMap = DimensionRangeInfo.mergeRangeMap(cube.getModel(), segDimRangeMap,
+                        mergedSegDimRangeMap);
+            }
             sourceCount += segment.getInputRecords();
             sourceSize += segment.getInputRecordsSize();
             tsStartMin = Math.min(tsStartMin, segment.getTSRange().start.v);
@@ -80,8 +93,9 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
         mergedSegment.setInputRecordsSize(sourceSize);
         mergedSegment.setLastBuildJobID(CubingExecutableUtil.getCubingJobId(this.getParams()));
         mergedSegment.setLastBuildTime(System.currentTimeMillis());
+        mergedSegment.setDimensionRangeInfoMap(mergedSegDimRangeMap);
 
-        if (isOffsetCube == true) {
+        if (isOffsetCube) {
             SegmentRange.TSRange tsRange = new SegmentRange.TSRange(tsStartMin, tsEndMax);
             mergedSegment.setTSRange(tsRange);
         }
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java
index a6bc019..3c58b26 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java
@@ -60,24 +60,22 @@ public class FactDistinctColumnsReducerMappingTest extends LocalFileMetadataTest
         int totalReducerNum = mapping.getTotalReducerNum();
         Assert.assertEquals(2, mapping.getCuboidRowCounterReducerNum());
         
-        // check partition column reducer & cuboid row count reducers
+        // check cuboid row count reducers
         Assert.assertEquals(FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER,
                 mapping.getRolePlayOfReducer(totalReducerNum - 1));
         Assert.assertEquals(FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER,
                 mapping.getRolePlayOfReducer(totalReducerNum - 2));
-        Assert.assertEquals(FactDistinctColumnsReducerMapping.MARK_FOR_PARTITION_COL,
-                mapping.getRolePlayOfReducer(totalReducerNum - 3));
         
         // check all dict column reducers
-        int dictEnd = totalReducerNum - 3;
+        int dictEnd = totalReducerNum - 2;
         for (int i = 0; i < dictEnd; i++)
             Assert.assertTrue(mapping.getRolePlayOfReducer(i) >= 0);
         
         // check a UHC dict column
-        Assert.assertEquals(2, mapping.getReducerNumForDictCol(aUHC));
+        Assert.assertEquals(2, mapping.getReducerNumForDimCol(aUHC));
         int uhcReducerBegin = -1;
         for (int i = 0; i < dictEnd; i++) {
-            if (mapping.getDictColForReducer(i).equals(aUHC)) {
+            if (mapping.getColForReducer(i).equals(aUHC)) {
                 uhcReducerBegin = i;
                 break;
             }
@@ -86,7 +84,7 @@ public class FactDistinctColumnsReducerMappingTest extends LocalFileMetadataTest
         int[] allRolePlay = mapping.getAllRolePlaysForReducers();
         Assert.assertEquals(allRolePlay[uhcReducerBegin], allRolePlay[uhcReducerBegin + 1]);
         for (int i = 0; i < 5; i++) {
-            int reducerId = mapping.getReducerIdForDictCol(uhcReducerBegin, i);
+            int reducerId = mapping.getReducerIdForCol(uhcReducerBegin, i);
             Assert.assertTrue(uhcReducerBegin <= reducerId && reducerId <= uhcReducerBegin + 1);
         }
     }
diff --git a/examples/test_case_data/localmeta/cube/ssb_cube_with_dimention_range.json b/examples/test_case_data/localmeta/cube/ssb_cube_with_dimention_range.json
new file mode 100644
index 0000000..124081c
--- /dev/null
+++ b/examples/test_case_data/localmeta/cube/ssb_cube_with_dimention_range.json
@@ -0,0 +1,110 @@
+{
+  "uuid" : "70a9f288-3c01-4745-a04b-5641e82d6c72",
+  "name" : "ssb_cube_with_dimention_range",
+  "owner" : "ADMIN",
+  "cost" : 50,
+  "status" : "DISABLED",
+  "segments" : [{
+    "uuid": "f34e2a88-50e4-4c8c-8dd1-64e922ce8c37",
+    "name": "19920523163722_20180523173026",
+    "storage_location_identifier": "KYLIN_KFV177RJOS",
+    "date_range_start": 706639042000,
+    "date_range_end": 1527096626000,
+    "source_offset_start": 0,
+    "source_offset_end": 0,
+    "status": "READY",
+    "size_kb": 100,
+    "input_records": 1000,
+    "input_records_size": 102400,
+    "last_build_time": 1527068515334,
+    "last_build_job_id": "a5010166-b64a-4289-ac56-064640f7f2fa",
+    "create_time_utc": 1527067828660,
+    "total_shards": 0,
+    "blackout_cuboids": [],
+    "binary_signature": null,
+    "dimension_range_info_map": {
+      "PART.P_BRAND": {
+        "min": "MFGR#1101",
+        "max": "MFGR#5540"
+      },
+      "V_LINEORDER.LO_QUANTITY": {
+        "min": "1",
+        "max": "50"
+      },
+      "PART.P_MFGR": {
+        "min": "MFGR#1",
+        "max": "MFGR#5"
+      },
+      "DATES.D_YEARMONTHNUM": {
+        "min": "199205",
+        "max": "199808"
+      },
+      "CUSTOMER.C_NATION": {
+        "min": "ALGERIA",
+        "max": "VIETNAM"
+      },
+      "DATES.D_YEARMONTH": {
+        "min": "Apr1993",
+        "max": "Sep1997"
+      },
+      "CUSTOMER.C_CUSTKEY": {
+        "min": "1",
+        "max": "2999"
+      },
+      "DATES.D_DATEKEY": {
+        "min": "19920523",
+        "max": "19980802"
+      },
+      "SUPPLIER.S_SUPPKEY": {
+        "min": "1",
+        "max": "200"
+      },
+      "SUPPLIER.S_REGION": {
+        "min": "AFRICA",
+        "max": "MIDDLE EAST"
+      },
+      "PART.P_PARTKEY": {
+        "min": "1",
+        "max": "20000"
+      },
+      "PART.P_CATEGORY": {
+        "min": "MFGR#11",
+        "max": "MFGR#55"
+      },
+      "DATES.D_WEEKNUMINYEAR": {
+        "min": "1",
+        "max": "53"
+      },
+      "CUSTOMER.C_CITY": {
+        "min": "ALGERIA  0",
+        "max": "VIETNAM  9"
+      },
+      "SUPPLIER.S_NATION": {
+        "min": "ALGERIA",
+        "max": "VIETNAM"
+      },
+      "SUPPLIER.S_CITY": {
+        "min": "ALGERIA  1",
+        "max": "VIETNAM  9"
+      },
+      "CUSTOMER.C_REGION": {
+        "min": "AFRICA",
+        "max": "MIDDLE EAST"
+      },
+      "DATES.D_YEAR": {
+        "min": "1992",
+        "max": "1998"
+      },
+      "V_LINEORDER.LO_DISCOUNT": {
+        "min": "0",
+        "max": "10"
+      }
+    }
+  } ],
+  "last_modified" : 1457534216410,
+  "descriptor" : "ssb_cube_with_dimention_range",
+  "create_time_utc" : 1457444500888,
+  "size_kb" : 100,
+  "input_records_count" : 1000,
+  "input_records_size" : 102400
+}
\ No newline at end of file
diff --git a/examples/test_case_data/localmeta/cube_desc/ssb_cube_with_dimention_range.json b/examples/test_case_data/localmeta/cube_desc/ssb_cube_with_dimention_range.json
new file mode 100644
index 0000000..d91b420
--- /dev/null
+++ b/examples/test_case_data/localmeta/cube_desc/ssb_cube_with_dimention_range.json
@@ -0,0 +1,269 @@
+{
+  "uuid" : "5c44df30-daec-486e-af90-927bf7851060",
+  "name" : "ssb_cube_with_dimention_range",
+  "description" : "",
+  "dimensions" : [ {
+    "name" : "PART.P_MFGR",
+    "table" : "PART",
+    "column" : "P_MFGR",
+    "derived" : null
+  }, {
+    "name" : "DATES.D_YEARMONTH",
+    "table" : "DATES",
+    "column" : "D_YEARMONTH",
+    "derived" : null
+  }, {
+    "name" : "SUPPLIER.S_NATION",
+    "table" : "SUPPLIER",
+    "column" : "S_NATION",
+    "derived" : null
+  }, {
+    "name" : "V_LINEORDER.LO_DISCOUNT",
+    "table" : "V_LINEORDER",
+    "column" : "LO_DISCOUNT",
+    "derived" : null
+  }, {
+    "name" : "CUSTOMER.C_CUSTKEY",
+    "table" : "CUSTOMER",
+    "column" : "C_CUSTKEY",
+    "derived" : null
+  }, {
+    "name" : "CUSTOMER.C_NATION",
+    "table" : "CUSTOMER",
+    "column" : "C_NATION",
+    "derived" : null
+  }, {
+    "name" : "SUPPLIER.S_REGION",
+    "table" : "SUPPLIER",
+    "column" : "S_REGION",
+    "derived" : null
+  }, {
+    "name" : "CUSTOMER.C_CITY",
+    "table" : "CUSTOMER",
+    "column" : "C_CITY",
+    "derived" : null
+  }, {
+    "name" : "SUPPLIER.S_SUPPKEY",
+    "table" : "SUPPLIER",
+    "column" : "S_SUPPKEY",
+    "derived" : null
+  }, {
+    "name" : "V_LINEORDER.LO_QUANTITY",
+    "table" : "V_LINEORDER",
+    "column" : "LO_QUANTITY",
+    "derived" : null
+  }, {
+    "name" : "PART.P_BRAND",
+    "table" : "PART",
+    "column" : "P_BRAND",
+    "derived" : null
+  }, {
+    "name" : "SUPPLIER.S_CITY",
+    "table" : "SUPPLIER",
+    "column" : "S_CITY",
+    "derived" : null
+  }, {
+    "name" : "PART.P_PARTKEY",
+    "table" : "PART",
+    "column" : "P_PARTKEY",
+    "derived" : null
+  }, {
+    "name" : "DATES.D_YEARMONTHNUM",
+    "table" : "DATES",
+    "column" : "D_YEARMONTHNUM",
+    "derived" : null
+  }, {
+    "name" : "DATES.D_WEEKNUMINYEAR",
+    "table" : "DATES",
+    "column" : "D_WEEKNUMINYEAR",
+    "derived" : null
+  }, {
+    "name" : "CUSTOMER.C_REGION",
+    "table" : "CUSTOMER",
+    "column" : "C_REGION",
+    "derived" : null
+  }, {
+    "name" : "PART.P_CATEGORY",
+    "table" : "PART",
+    "column" : "P_CATEGORY",
+    "derived" : null
+  }, {
+    "name" : "DATES.D_YEAR",
+    "table" : "DATES",
+    "column" : "D_YEAR",
+    "derived" : null
+  }, {
+    "name" : "DATES.D_DATEKEY",
+    "table" : "DATES",
+    "column" : "D_DATEKEY",
+    "derived" : null
+  } ],
+  "measures" : [ {
+    "name" : "_COUNT_",
+    "function" : {
+      "expression" : "COUNT",
+      "parameter" : {
+        "type" : "constant",
+        "value" : "1",
+        "next_parameter" : null
+      },
+      "returntype" : "bigint"
+    },
+    "dependent_measure_ref" : null
+  }, {
+    "name" : "TOTAL_REVENUE",
+    "function" : {
+      "expression" : "SUM",
+      "parameter" : {
+        "type" : "column",
+        "value" : "LO_REVENUE",
+        "next_parameter" : null
+      },
+      "returntype" : "bigint"
+    },
+    "dependent_measure_ref" : null
+  }, {
+    "name" : "TOTAL_SUPPLYCOST",
+    "function" : {
+      "expression" : "SUM",
+      "parameter" : {
+        "type" : "column",
+        "value" : "LO_SUPPLYCOST",
+        "next_parameter" : null
+      },
+      "returntype" : "bigint"
+    },
+    "dependent_measure_ref" : null
+  } ],
+  "rowkey" : {
+    "rowkey_columns" : [ {
+      "column" : "PART.P_MFGR",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "DATES.D_YEARMONTHNUM",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "CUSTOMER.C_REGION",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "V_LINEORDER.LO_QUANTITY",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "DATES.D_YEARMONTH",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "DATES.D_WEEKNUMINYEAR",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "SUPPLIER.S_REGION",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "V_LINEORDER.LO_DISCOUNT",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "CUSTOMER.C_CITY",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "PART.P_CATEGORY",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "DATES.D_YEAR",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "PART.P_BRAND",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "SUPPLIER.S_CITY",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "SUPPLIER.S_NATION",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "CUSTOMER.C_NATION",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "DATES.D_DATEKEY",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "SUPPLIER.S_SUPPKEY",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "CUSTOMER.C_CUSTKEY",
+      "encoding" : "dict",
+      "isShardBy" : true,
+      "index" : "eq"
+    }, {
+      "column" : "PART.P_PARTKEY",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    } ]
+  },
+  "signature" : "",
+  "last_modified" : 1457503036686,
+  "model_name" : "ssb",
+  "null_string" : null,
+  "hbase_mapping" : {
+    "column_family" : [ {
+      "name" : "F1",
+      "columns" : [ {
+        "qualifier" : "M",
+        "measure_refs" : [ "_COUNT_", "TOTAL_REVENUE", "TOTAL_SUPPLYCOST"]
+      } ]
+    } ]
+  },
+  "aggregation_groups" : [ {
+    "includes" : [ "SUPPLIER.S_REGION", "CUSTOMER.C_NATION", "SUPPLIER.S_NATION", "CUSTOMER.C_REGION", "DATES.D_YEAR", "DATES.D_WEEKNUMINYEAR", "DATES.D_YEARMONTH", "DATES.D_YEARMONTHNUM", "SUPPLIER.S_CITY" ],
+    "select_rule" : {
+      "hierarchy_dims" : [ [ "S_REGION", "S_NATION", "S_CITY" ] ],
+      "mandatory_dims" : [ "DATES.D_YEAR" ],
+      "joint_dims" : [ ]
+    }
+  } ],
+  "notify_list" : [ ],
+  "status_need_notify" : [ ],
+  "partition_date_start" : 3153000000000,
+  "partition_date_end" : 3153600000000,
+  "auto_merge_time_ranges" : [ 604800000, 2419200000 ],
+  "retention_range" : 0,
+  "engine_type" : 2,
+  "storage_type" : 2,
+  "override_kylin_properties" : {
+    "kylin.storage.hbase.compression-codec" : "lz4",
+    "kylin.cube.aggrgroup.is-mandatory-only-valid" : "true"
+  }
+}
\ No newline at end of file
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index afd9788..2f22bd4 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -18,15 +18,31 @@
 
 package org.apache.kylin.provision;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.Pair;
@@ -34,6 +50,7 @@ import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.DimensionRangeInfo;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.EngineFactory;
@@ -48,6 +65,7 @@ import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
 import org.apache.kylin.metadata.model.SegmentRange.TSRange;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.rest.job.StorageCleanupJob;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator;
@@ -55,21 +73,9 @@ import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 public class BuildCubeWithEngine {
 
@@ -304,21 +310,29 @@ public class BuildCubeWithEngine {
         
         if (!buildSegment(cubeName, date1, date2))
             return false;
+        checkNormalSegRangeInfo(cubeManager.getCube(cubeName));
         if (!buildSegment(cubeName, date2, date3))
             return false;
         if (!optimizeCube(cubeName))
             return false;
+        checkNormalSegRangeInfo(cubeManager.getCube(cubeName));
         if (!buildSegment(cubeName, date3, date4))
             return false;
+        checkNormalSegRangeInfo(cubeManager.getCube(cubeName));
         if (!buildSegment(cubeName, date4, date5)) // one empty segment
             return false;
+        checkEmptySegRangeInfo(cubeManager.getCube(cubeName));
         if (!buildSegment(cubeName, date5, date6)) // another empty segment
             return false;
+        checkEmptySegRangeInfo(cubeManager.getCube(cubeName));
+
 
         if (!mergeSegment(cubeName, date2, date4)) // merge 2 normal segments
             return false;
+        checkNormalSegRangeInfo(cubeManager.getCube(cubeName));
         if (!mergeSegment(cubeName, date2, date5)) // merge normal and empty
             return false;
+        checkNormalSegRangeInfo(cubeManager.getCube(cubeName));
         
         // now have 2 normal segments [date1, date2) [date2, date5) and 1 empty segment [date5, date6)
         return true;
@@ -460,4 +474,40 @@ public class BuildCubeWithEngine {
         }
     }
 
+    private void checkEmptySegRangeInfo(CubeInstance cube) {
+        CubeSegment segment = getLastModifiedSegment(cube);
+        for (String colId : segment.getDimensionRangeInfoMap().keySet()) {
+            DimensionRangeInfo range = segment.getDimensionRangeInfoMap().get(colId);
+            if (!(range.getMax() == null && range.getMin() == null)) {
+                throw new RuntimeException("Empty segment must have null info.");
+            }
+        }
+    }
+
+    private void checkNormalSegRangeInfo(CubeInstance cube) {
+        CubeSegment segment = getLastModifiedSegment(cube);
+        if (segment.getModel().getPartitionDesc().isPartitioned()) {
+            TblColRef colRef = segment.getModel().getPartitionDesc().getPartitionDateColumnRef();
+            DimensionRangeInfo dmRangeInfo = segment.getDimensionRangeInfoMap().get(colRef.getIdentity());
+            long min_v = DateFormat.stringToMillis(dmRangeInfo.getMin());
+            long max_v = DateFormat.stringToMillis(dmRangeInfo.getMax());
+            long ts_range_start = segment.getTSRange().start.v;
+            long ts_range_end = segment.getTSRange().end.v;
+            if (!(ts_range_start <= min_v && max_v <= ts_range_end -1)) {
+                throw new RuntimeException(String.format(
+                        "Build cube failed, wrong partition column min/max value."
+                                + " Segment: %s, min value: %s, TsRange.start: %s, max value: %s, TsRange.end: %s",
+                        segment, min_v, ts_range_start, max_v, ts_range_end));
+            }
+        }
+    }
+    
+    private CubeSegment getLastModifiedSegment(CubeInstance cube) {
+        return Collections.max(cube.getSegments(), new Comparator<CubeSegment>() {
+            @Override
+            public int compare(CubeSegment o1, CubeSegment o2) {
+                return Long.compare(o1.getLastBuildTime(), o2.getLastBuildTime());
+            }
+        });
+    }
 }