You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/07/08 09:03:06 UTC

[kylin] branch master updated (59354ac -> 31073af)

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git.


    from 59354ac  KYLIN-3428 too large dict file cause OOM
     new 1845ee0  KYLIN-3391 BadQueryDetector only detect first query
     new fdbe369  KYLIN-3390 QueryInterceptorUtil.queryInterceptors is not thread safe
     new f487d80  KYLIN-2662 fix NegativeArraySizeException in TrieDictionaryForest
     new 05baf21  KYLIN-3370 enhance segment pruning
     new d1ed107  KYLIN-3423 Performance improvement in FactDistinctColumnsMapper
     new 31073af  KYLIN-3403 Use IntegerCodeSystem for date type filter

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/kylin/common/KylinConfigBase.java   |   5 +-
 ...ception.java => TooBigDictionaryException.java} |  16 +-
 .../java/org/apache/kylin/cube/CubeSegment.java    |  12 +
 .../org/apache/kylin/cube/DimensionRangeInfo.java  | 112 +++++++++
 .../apache/kylin/cube/common/SegmentPruner.java    | 179 ++++++++++++++
 .../kylin/cube/gridtable/ScanRangePlannerBase.java |   4 +-
 .../inmemcubing/InputConverterUnitForRawData.java  |   2 -
 .../apache/kylin/cube/kv/RowKeyColumnOrder.java    | 108 --------
 .../kylin/cube/model/CubeJoinedFlatTableDesc.java  |  13 +-
 .../apache/kylin/cube/DimensionRangeInfoTest.java  |  87 +++++++
 .../kylin/cube/common/SegmentPrunerTest.java       | 195 +++++++++++++++
 .../java/org/apache/kylin/dict/TrieDictionary.java |   3 +
 .../apache/kylin/dict/TrieDictionaryBuilder.java   |   2 +-
 .../apache/kylin/dict/TrieDictionaryForest.java    |   2 +-
 .../kylin/dict/TrieDictionaryForestBuilder.java    |  17 +-
 .../apache/kylin/metadata/datatype/DataType.java   |  26 +-
 .../kylin/metadata/datatype/DataTypeOrder.java     | 155 ++++++++++++
 .../kylin/metadata/filter/CompareTupleFilter.java  |   2 +-
 .../metadata/filter/FilterCodeSystemFactory.java   |   2 +
 .../apache/kylin/metadata/filter/TupleFilter.java  | 116 +++++++--
 .../kylin/metadata/datatype/DataTypeOrderTest.java |  57 +++++
 .../kylin/metadata/filter/TupleFilterTest.java     |  77 ++++++
 .../storage/gtrecord/CubeScanRangePlanner.java     |  30 +--
 .../storage/gtrecord/GTCubeStorageQueryBase.java   |   9 +-
 .../kylin/storage/translate/ColumnValueRange.java  | 214 ----------------
 .../storage/translate/DerivedFilterTranslator.java |   6 +-
 .../kylin/storage/translate/HBaseKeyRange.java     | 273 ---------------------
 .../kylin/storage/gtrecord/DictGridTableTest.java  |  69 ++----
 .../storage/translate/ColumnValueRangeTest.java    | 126 ----------
 .../kylin/engine/mr/common/BaseCuboidBuilder.java  |   3 +-
 .../engine/mr/steps/BaseCuboidMapperBase.java      |   9 +-
 .../mr/steps/CalculateStatsFromBaseCuboidJob.java  |   1 -
 .../mr/steps/FactDistinctColumnPartitioner.java    |   5 +-
 .../engine/mr/steps/FactDistinctColumnsJob.java    |   2 +-
 .../engine/mr/steps/FactDistinctColumnsMapper.java | 160 ++++++++----
 .../mr/steps/FactDistinctColumnsMapperBase.java    |  21 +-
 .../mr/steps/FactDistinctColumnsReducer.java       |  90 ++++---
 .../steps/FactDistinctColumnsReducerMapping.java   |  87 +++----
 .../mr/steps/UpdateCubeInfoAfterBuildStep.java     |  83 ++++---
 .../mr/steps/UpdateCubeInfoAfterMergeStep.java     |  20 +-
 .../mr/steps/UpdateCubeInfoAfterOptimizeStep.java  |   1 +
 .../FactDistinctColumnsReducerMappingTest.java     |  16 +-
 .../cube/ssb_cube_with_dimention_range.json        | 110 +++++++++
 .../cube_desc/ssb_cube_with_dimention_range.json   | 269 ++++++++++++++++++++
 .../kylin/provision/BuildCubeWithEngine.java       |  87 +++++--
 .../query/sql/{query93.sql => query112.sql}        |  11 +-
 .../kylin/engine/mr/steps/DictColDeduperTest.java  |  65 +++++
 .../query/optrule/AggregateMultipleExpandRule.java |  16 +-
 .../apache/kylin/query/relnode/OLAPFilterRel.java  |  26 +-
 .../kylin/query/security/QueryInterceptorUtil.java |   8 +-
 .../kylin/rest/service/BadQueryDetector.java       |   5 +-
 .../org/apache/kylin/source/hive/HiveMRInput.java  |   1 -
 .../apache/kylin/source/hive/HiveTableReader.java  |   2 +-
 53 files changed, 1888 insertions(+), 1129 deletions(-)
 copy core-common/src/main/java/org/apache/kylin/common/exceptions/{ResourceLimitExceededException.java => TooBigDictionaryException.java} (75%)
 create mode 100644 core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
 create mode 100644 core-cube/src/main/java/org/apache/kylin/cube/common/SegmentPruner.java
 delete mode 100644 core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java
 create mode 100644 core-cube/src/test/java/org/apache/kylin/cube/DimensionRangeInfoTest.java
 create mode 100644 core-cube/src/test/java/org/apache/kylin/cube/common/SegmentPrunerTest.java
 create mode 100644 core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeOrder.java
 create mode 100644 core-metadata/src/test/java/org/apache/kylin/metadata/datatype/DataTypeOrderTest.java
 delete mode 100644 core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java
 delete mode 100644 core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
 delete mode 100644 core-storage/src/test/java/org/apache/kylin/storage/translate/ColumnValueRangeTest.java
 create mode 100644 examples/test_case_data/localmeta/cube/ssb_cube_with_dimention_range.json
 create mode 100644 examples/test_case_data/localmeta/cube_desc/ssb_cube_with_dimention_range.json
 copy kylin-it/src/test/resources/query/sql/{query93.sql => query112.sql} (88%)
 create mode 100644 kylin/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/DictColDeduperTest.java


[kylin] 03/06: KYLIN-2662 fix NegativeArraySizeException in TrieDictionaryForest

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit f487d803a0505bb11dfadce90aee12d8d20042f5
Author: Li Yang <li...@apache.org>
AuthorDate: Sat May 26 14:54:28 2018 +0800

    KYLIN-2662 fix NegativeArraySizeException in TrieDictionaryForest
    
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 .../exceptions/TooBigDictionaryException.java      | 34 ++++++++++++++++++++++
 .../java/org/apache/kylin/dict/TrieDictionary.java |  3 ++
 .../apache/kylin/dict/TrieDictionaryBuilder.java   |  2 +-
 .../apache/kylin/dict/TrieDictionaryForest.java    |  2 +-
 .../kylin/dict/TrieDictionaryForestBuilder.java    | 17 +++++++++--
 5 files changed, 54 insertions(+), 4 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/exceptions/TooBigDictionaryException.java b/core-common/src/main/java/org/apache/kylin/common/exceptions/TooBigDictionaryException.java
new file mode 100644
index 0000000..ab08c85
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/exceptions/TooBigDictionaryException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.exceptions;
+
+/**
+ * @author TinChiWay
+ * @date 2018/4/2
+ */
+@SuppressWarnings("serial")
+public class TooBigDictionaryException extends RuntimeException {
+    public TooBigDictionaryException(String message, Exception e) {
+        super(message, e);
+    }
+
+    public TooBigDictionaryException(String message) {
+        super(message);
+    }
+}
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
index 754f451..d531c05 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
@@ -146,6 +146,9 @@ public class TrieDictionary<T> extends CacheDictionary<T> {
         return maxValueLength;
     }
 
+    public int getStorageSizeInBytes() {
+        return trieBytes.length;
+    }
 
     @Override
     protected int getIdFromValueBytesWithoutCache(byte[] value, int offset, int len, int roundingFlag) {
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryBuilder.java
index 18169ca..b3440a1 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryBuilder.java
@@ -39,7 +39,7 @@ import org.apache.kylin.common.util.BytesUtil;
  */
 public class TrieDictionaryBuilder<T> {
 
-    private static final int _2GB = 2000000000;
+    public static final int _2GB = 2000000000;
 
     public static class Node {
         public byte[] part;
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForest.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForest.java
index 09d5bc2..4642cf4 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForest.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForest.java
@@ -189,8 +189,8 @@ public class TrieDictionaryForest<T> extends CacheDictionary<T> {
         //write tree size
         headOut.writeInt(trees.size());
         headOut.close();
-        byte[] head = byteBuf.toByteArray();
         //output
+        byte[] head = byteBuf.toByteArray();
         out.writeInt(head.length);
         out.write(head);
     }
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java
index 0e5e63e..29cff2e 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java
@@ -17,13 +17,14 @@
 */
 package org.apache.kylin.dict;
 
+import java.util.ArrayList;
+
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.exceptions.TooBigDictionaryException;
 import org.apache.kylin.common.util.ByteArray;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-
 /**
  * Build a trie dictionary forest if the input values is ordered, or the forest falls back to a single trie.
  */
@@ -134,6 +135,18 @@ public class TrieDictionaryForestBuilder<T> {
         byte[] valueBytes = tree.getValueBytesFromIdWithoutCache(minId);
         valueDivide.add(new ByteArray(valueBytes, 0, valueBytes.length));
         curOffset += (tree.getMaxId() + 1);
+        
+        checkDictSize();
+    }
+
+    private void checkDictSize() {
+        // due to the limitation of resource store, no dictionary beyond 2GB is allowed
+        long size = 0;
+        for (TrieDictionary trie : trees) {
+            size += trie.getStorageSizeInBytes();
+        }
+        if (size > TrieDictionaryBuilder._2GB)
+            throw new TooBigDictionaryException("Too big dictionary, dictionary cannot be bigger than 2GB");
     }
 
     private void reset() {


[kylin] 01/06: KYLIN-3391 BadQueryDetector only detect first query

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 1845ee0ce3b8f775e05443fed1b4d3e860ae7503
Author: yiming.xu <10...@qq.com>
AuthorDate: Mon May 28 21:15:43 2018 +0800

    KYLIN-3391 BadQueryDetector only detect first query
    
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 .../src/main/java/org/apache/kylin/common/KylinConfigBase.java       | 5 ++++-
 .../main/java/org/apache/kylin/rest/service/BadQueryDetector.java    | 5 ++---
 2 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 3ae6c2d..637502e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1328,9 +1328,12 @@ abstract public class KylinConfigBase implements Serializable {
     public int getBadQueryDefaultAlertingSeconds() {
         return Integer.parseInt(getOptional("kylin.query.badquery-alerting-seconds", "90"));
     }
+    public double getBadQueryDefaultAlertingCoefficient() {
+        return Double.parseDouble(getOptional("kylin.query.timeout-seconds-coefficient", "0.5"));
+    }
 
     public int getBadQueryDefaultDetectIntervalSeconds() {
-        int time = getQueryTimeoutSeconds() / 2; // half of query timeout
+        int time =(int) (getQueryTimeoutSeconds() * getBadQueryDefaultAlertingCoefficient()); // half of query timeout
         if (time == 0) {
             time = 60; // 60 sec
         }
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java b/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
index 51f49a7..3f3db45 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java
@@ -144,6 +144,7 @@ public class BadQueryDetector extends Thread {
     }
 
     private void detectBadQuery() {
+        logger.info("Detect bad query.");
         long now = System.currentTimeMillis();
         ArrayList<Entry> entries = new ArrayList<Entry>(runningQueries.values());
         Collections.sort(entries);
@@ -156,8 +157,6 @@ public class BadQueryDetector extends Thread {
             if (runningSec >= alertRunningSec) {
                 notify(BadQueryEntry.ADJ_SLOW, e);
                 dumpStackTrace(e.thread, e.queryId);
-            } else {
-                break; // entries are sorted by startTime
             }
         }
 
@@ -170,7 +169,7 @@ public class BadQueryDetector extends Thread {
     private void setQueryThreadInterrupted(Entry e, float runningSec) {
         if (queryTimeoutSeconds != 0 && runningSec >= queryTimeoutSeconds) {
             e.thread.interrupt();
-            logger.error("Trying to cancel query:" + e.thread.getName());
+            logger.error("Query running "+ runningSec + "s, Trying to cancel query:" + e.thread.getName());
         }
     }
 


[kylin] 02/06: KYLIN-3390 QueryInterceptorUtil.queryInterceptors is not thread safe

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit fdbe369d1187dae8c7157ef90234e6855c9eee60
Author: yiming.xu <10...@qq.com>
AuthorDate: Mon May 28 21:16:27 2018 +0800

    KYLIN-3390 QueryInterceptorUtil.queryInterceptors is not thread safe
    
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 .../org/apache/kylin/query/security/QueryInterceptorUtil.java     | 8 +-------
 1 file changed, 1 insertion(+), 7 deletions(-)

diff --git a/query/src/main/java/org/apache/kylin/query/security/QueryInterceptorUtil.java b/query/src/main/java/org/apache/kylin/query/security/QueryInterceptorUtil.java
index 6ce4c8e..3641483 100644
--- a/query/src/main/java/org/apache/kylin/query/security/QueryInterceptorUtil.java
+++ b/query/src/main/java/org/apache/kylin/query/security/QueryInterceptorUtil.java
@@ -26,11 +26,7 @@ import org.apache.kylin.common.util.ClassUtil;
 
 public class QueryInterceptorUtil {
     private static List<QueryInterceptor> queryInterceptors = new ArrayList<>();
-
-    private static void setQueryInterceptor() {
-        if (queryInterceptors.size() > 0) {
-            return;
-        }
+    static {
         String[] classes = KylinConfig.getInstanceFromEnv().getQueryInterceptors();
         for (String clz : classes) {
             try {
@@ -43,8 +39,6 @@ public class QueryInterceptorUtil {
     }
 
     public static List<QueryInterceptor> getQueryInterceptors() {
-        setQueryInterceptor();
         return queryInterceptors;
     }
-
 }
\ No newline at end of file


[kylin] 04/06: KYLIN-3370 enhance segment pruning

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 05baf2123fd0b0fcfee613e1fe18c93722c8ed9b
Author: Li Yang <li...@apache.org>
AuthorDate: Tue May 22 12:00:19 2018 +0800

    KYLIN-3370 enhance segment pruning
    
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 .../java/org/apache/kylin/cube/CubeSegment.java    |  12 +
 .../org/apache/kylin/cube/DimensionRangeInfo.java  | 104 ++++++++
 .../apache/kylin/cube/common/SegmentPruner.java    | 179 ++++++++++++++
 .../kylin/cube/gridtable/ScanRangePlannerBase.java |   4 +-
 .../inmemcubing/InputConverterUnitForRawData.java  |   2 -
 .../apache/kylin/cube/kv/RowKeyColumnOrder.java    | 108 --------
 .../kylin/cube/model/CubeJoinedFlatTableDesc.java  |  13 +-
 .../apache/kylin/cube/DimensionRangeInfoTest.java  |  87 +++++++
 .../kylin/cube/common/SegmentPrunerTest.java       | 195 +++++++++++++++
 .../apache/kylin/metadata/datatype/DataType.java   |  26 +-
 .../kylin/metadata/datatype/DataTypeOrder.java     | 155 ++++++++++++
 .../kylin/metadata/filter/CompareTupleFilter.java  |   2 +-
 .../apache/kylin/metadata/filter/TupleFilter.java  | 116 +++++++--
 .../kylin/metadata/datatype/DataTypeOrderTest.java |  57 +++++
 .../kylin/metadata/filter/TupleFilterTest.java     |  77 ++++++
 .../storage/gtrecord/CubeScanRangePlanner.java     |  30 +--
 .../storage/gtrecord/GTCubeStorageQueryBase.java   |   9 +-
 .../kylin/storage/translate/ColumnValueRange.java  | 214 ----------------
 .../storage/translate/DerivedFilterTranslator.java |   6 +-
 .../kylin/storage/translate/HBaseKeyRange.java     | 273 ---------------------
 .../kylin/storage/gtrecord/DictGridTableTest.java  |  69 ++----
 .../storage/translate/ColumnValueRangeTest.java    | 126 ----------
 .../kylin/engine/mr/common/BaseCuboidBuilder.java  |   3 +-
 .../engine/mr/steps/BaseCuboidMapperBase.java      |   9 +-
 .../mr/steps/CalculateStatsFromBaseCuboidJob.java  |   1 -
 .../mr/steps/FactDistinctColumnPartitioner.java    |   5 +-
 .../engine/mr/steps/FactDistinctColumnsJob.java    |   2 +-
 .../engine/mr/steps/FactDistinctColumnsMapper.java |  43 +---
 .../mr/steps/FactDistinctColumnsMapperBase.java    |  20 +-
 .../mr/steps/FactDistinctColumnsReducer.java       |  90 ++++---
 .../steps/FactDistinctColumnsReducerMapping.java   |  87 +++----
 .../mr/steps/UpdateCubeInfoAfterBuildStep.java     |  83 ++++---
 .../mr/steps/UpdateCubeInfoAfterMergeStep.java     |  20 +-
 .../mr/steps/UpdateCubeInfoAfterOptimizeStep.java  |   1 +
 .../FactDistinctColumnsReducerMappingTest.java     |  12 +-
 .../cube/ssb_cube_with_dimention_range.json        | 110 +++++++++
 .../cube_desc/ssb_cube_with_dimention_range.json   | 269 ++++++++++++++++++++
 .../kylin/provision/BuildCubeWithEngine.java       |  87 +++++--
 .../query/optrule/AggregateMultipleExpandRule.java |  16 +-
 .../apache/kylin/query/relnode/OLAPFilterRel.java  |  26 +-
 .../org/apache/kylin/source/hive/HiveMRInput.java  |   1 -
 .../apache/kylin/source/hive/HiveTableReader.java  |   2 +-
 42 files changed, 1669 insertions(+), 1082 deletions(-)

diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index d49c273..75c90a4 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -119,6 +119,10 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
     @JsonInclude(JsonInclude.Include.NON_EMPTY)
     private Map<String, String> additionalInfo = new LinkedHashMap<String, String>();
 
+    @JsonProperty("dimension_range_info_map")
+    @JsonInclude(JsonInclude.Include.NON_EMPTY)
+    private Map<String, DimensionRangeInfo> dimensionRangeInfoMap = Maps.newHashMap();
+
     private Map<Long, Short> cuboidBaseShards = Maps.newConcurrentMap(); // cuboid id ==> base(starting) shard for this cuboid
 
     // lazy init
@@ -575,4 +579,12 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
     public void setSourcePartitionOffsetStart(Map<Integer, Long> sourcePartitionOffsetStart) {
         this.sourcePartitionOffsetStart = sourcePartitionOffsetStart;
     }
+
+    public Map<String, DimensionRangeInfo> getDimensionRangeInfoMap() {
+        return dimensionRangeInfoMap;
+    }
+
+    public void setDimensionRangeInfoMap(Map<String, DimensionRangeInfo> dimensionRangeInfoMap) {
+        this.dimensionRangeInfoMap = dimensionRangeInfoMap;
+    }
 }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java b/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
new file mode 100644
index 0000000..e36ca96
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube;
+
+import java.util.Map;
+
+import org.apache.kylin.metadata.datatype.DataTypeOrder;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Maps;
+
+@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
+public class DimensionRangeInfo {
+
+    private static final Logger logger = LoggerFactory.getLogger(DimensionRangeInfo.class);
+
+    public static Map<String, DimensionRangeInfo> mergeRangeMap(DataModelDesc model, Map<String, DimensionRangeInfo> m1,
+            Map<String, DimensionRangeInfo> m2) {
+
+        if (!m1.keySet().equals(m2.keySet())) {
+            logger.warn("Merging incompatible maps of DimensionRangeInfo, keys in m1 " + m1.keySet() + ", keys in m2 "
+                    + m2.keySet());
+        }
+
+        Map<String, DimensionRangeInfo> result = Maps.newHashMap();
+        
+        for (String colId : m1.keySet()) {
+            if (!m2.containsKey(colId))
+                continue;
+
+            DimensionRangeInfo r1 = m1.get(colId);
+            DimensionRangeInfo r2 = m2.get(colId);
+            
+            DimensionRangeInfo newR;
+            if (r1.getMin() == null && r1.getMax() == null) {
+                newR = r2; // when r1 is all null or has 0 records
+            } else if (r2.getMin() == null && r2.getMax() == null) {
+                newR = r1; // when r2 is all null or has 0 records
+            } else {
+                DataTypeOrder order = model.findColumn(colId).getType().getOrder();
+                String newMin = order.min(r1.getMin(), r2.getMin());
+                String newMax = order.max(r1.getMax(), r2.getMax());
+                newR = new DimensionRangeInfo(newMin, newMax);
+            }
+            
+            result.put(colId, newR);
+        }
+        
+        return result;
+    }
+    
+    // ============================================================================
+
+    @JsonProperty("min")
+    private String min;
+
+    @JsonProperty("max")
+    private String max;
+
+    public DimensionRangeInfo() {}
+
+    public DimensionRangeInfo(String min, String max) {
+        if (min == null && max != null || min != null && max == null)
+            throw new IllegalStateException();
+        
+        this.min = min;
+        this.max = max;
+    }
+
+    public String getMin() {
+        return min;
+    }
+
+    public String getMax() {
+        return max;
+    }
+    
+    @Override
+    public String toString() {
+        return "[" + min + ", " + max + "]";
+    }
+
+}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/common/SegmentPruner.java b/core-cube/src/main/java/org/apache/kylin/cube/common/SegmentPruner.java
new file mode 100644
index 0000000..de77511
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/common/SegmentPruner.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.common;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.DimensionRangeInfo;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataTypeOrder;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SegmentPruner {
+    private static final Logger logger = LoggerFactory.getLogger(SegmentPruner.class);
+
+    final private Set<CompareTupleFilter> mustTrueCompares;
+
+    public SegmentPruner(TupleFilter filter) {
+        this.mustTrueCompares = filter == null ? Collections.<CompareTupleFilter> emptySet()
+                : filter.findMustTrueCompareFilters();
+    }
+
+    public List<CubeSegment> listSegmentsForQuery(CubeInstance cube) {
+        List<CubeSegment> r = new ArrayList<>();
+        for (CubeSegment seg : cube.getSegments(SegmentStatusEnum.READY)) {
+            if (check(seg))
+                r.add(seg);
+        }
+        return r;
+    }
+    
+    public boolean check(CubeSegment seg) {
+        
+        if (seg.getInputRecords() == 0) {
+            if (seg.getConfig().isSkippingEmptySegments()) {
+                logger.debug("Prune segment {} due to 0 input record", seg);
+                return false;
+            } else {
+                logger.debug("Insist scan of segment {} having 0 input record", seg);
+            }
+        }
+
+        Map<String, DimensionRangeInfo> segDimRangInfoMap = seg.getDimensionRangeInfoMap();
+        for (CompareTupleFilter comp : mustTrueCompares) {
+            TblColRef col = comp.getColumn();
+            
+            DimensionRangeInfo dimRangeInfo = segDimRangInfoMap.get(col.getIdentity());
+            if (dimRangeInfo == null)
+                dimRangeInfo = tryDeduceRangeFromPartitionCol(seg, col);
+            if (dimRangeInfo == null)
+                continue;
+            
+            String minVal = dimRangeInfo.getMin();
+            String maxVal = dimRangeInfo.getMax();
+            
+            if (!satisfy(comp, minVal, maxVal)) {
+                logger.debug("Prune segment {} due to given filter", seg);
+                return false;
+            }
+        }
+
+        logger.debug("Pruner passed on segment {}", seg);
+        return true;
+    }
+
+    private DimensionRangeInfo tryDeduceRangeFromPartitionCol(CubeSegment seg, TblColRef col) {
+        DataModelDesc model = seg.getModel();
+        PartitionDesc part = model.getPartitionDesc();
+        
+        if (!part.isPartitioned())
+            return null;
+        if (!col.equals(part.getPartitionDateColumnRef()))
+            return null;
+        
+        // deduce the dim range from TSRange
+        TSRange tsRange = seg.getTSRange();
+        if (tsRange.start.isMin || tsRange.end.isMax)
+            return null; // DimensionRangeInfo cannot express infinite
+        
+        String min = tsRangeToStr(tsRange.start.v, part);
+        String max = tsRangeToStr(tsRange.end.v - 1, part); // note the -1, end side is exclusive
+        return new DimensionRangeInfo(min, max);
+    }
+
+    private String tsRangeToStr(long ts, PartitionDesc part) {
+        String value;
+        DataType partitionColType = part.getPartitionDateColumnRef().getType();
+        if (partitionColType.isDate()) {
+            value = DateFormat.formatToDateStr(ts);
+        } else if (partitionColType.isTimeFamily()) {
+            value = DateFormat.formatToTimeWithoutMilliStr(ts);
+        } else if (partitionColType.isStringFamily() || partitionColType.isIntegerFamily()) {//integer like 20160101
+            String partitionDateFormat = part.getPartitionDateFormat();
+            if (StringUtils.isEmpty(partitionDateFormat)) {
+                value = "" + ts;
+            } else {
+                value = DateFormat.formatToDateStr(ts, partitionDateFormat);
+            }
+        } else {
+            throw new RuntimeException("Type " + partitionColType + " is not valid partition column type");
+        }
+        return value;
+    }
+
+    private boolean satisfy(CompareTupleFilter comp, String minVal, String maxVal) {
+
+        // When both min and max are null, it means all cells of the column are null.
+        // In such case, return true to let query engine scan the segment, since the
+        // result of null comparison is query engine specific.
+        if (minVal == null && maxVal == null)
+            return true;
+        
+        // pass on non-constant filter
+        if (comp.getChildren().size() > 1 && !(comp.getChildren().get(1) instanceof ConstantTupleFilter))
+            return true;
+
+        TblColRef col = comp.getColumn();
+        DataTypeOrder order = col.getType().getOrder();
+        String filterVal = toString(comp.getFirstValue());
+        
+        switch (comp.getOperator()) {
+        case EQ:
+        case IN:
+            String filterMin = order.min((Set<String>) comp.getValues());
+            String filterMax = order.max((Set<String>) comp.getValues());
+            return order.compare(filterMin, maxVal) <= 0 && order.compare(minVal, filterMax) <= 0;
+        case LT:
+            return order.compare(minVal, filterVal) < 0;
+        case LTE:
+            return order.compare(minVal, filterVal) <= 0;
+        case GT:
+            return order.compare(maxVal, filterVal) > 0;
+        case GTE:
+            return order.compare(maxVal, filterVal) >= 0;
+        case NEQ:
+        case NOTIN:
+        case ISNULL:
+        case ISNOTNULL:
+        default:
+            return true;
+        }
+    }
+
+    private String toString(Object v) {
+        return v == null ? null : v.toString();
+    }
+}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java
index 811d512..97e4dc3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/ScanRangePlannerBase.java
@@ -29,7 +29,6 @@ import java.util.Set;
 
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.metadata.expression.TupleExpression;
@@ -49,14 +48,13 @@ public abstract class ScanRangePlannerBase {
     //GT 
     protected GTInfo gtInfo;
     protected TupleFilter gtFilter;
-    protected Pair<ByteArray, ByteArray> gtStartAndEnd;
-    protected TblColRef gtPartitionCol;
     protected ImmutableBitSet gtDimensions;
     protected ImmutableBitSet gtAggrGroups;
     protected ImmutableBitSet gtAggrMetrics;
     protected String[] gtAggrFuncs;
     protected TupleFilter havingFilter;
     protected boolean isPartitionColUsingDatetimeEncoding = true;
+    protected int onlyShardId = -1;
 
     protected RecordComparator rangeStartComparator;
     protected RecordComparator rangeEndComparator;
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
index fc34f37..2ff7ee0 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InputConverterUnitForRawData.java
@@ -43,7 +43,6 @@ public class InputConverterUnitForRawData implements InputConverterUnit<String[]
     @SuppressWarnings("unused")
     private static final Logger logger = LoggerFactory.getLogger(InputConverterUnitForRawData.class);
     
-    public static final byte[] HIVE_NULL = Bytes.toBytes("\\N");
     public static final String[] END_ROW = new String[0];
     public static final String[] CUT_ROW = { "" };
 
@@ -149,7 +148,6 @@ public class InputConverterUnitForRawData implements InputConverterUnit<String[]
 
     private void initNullBytes(CubeDesc cubeDesc) {
         nullBytes = Lists.newArrayList();
-        nullBytes.add(HIVE_NULL);
         String[] nullStrings = cubeDesc.getNullStrings();
         if (nullStrings != null) {
             for (String s : nullStrings) {
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java
deleted file mode 100644
index 3f4f6f4..0000000
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.cube.kv;
-
-import java.util.Collection;
-import java.util.Comparator;
-
-import org.apache.kylin.metadata.datatype.DataType;
-
-/**
- * @author yangli9
- */
-abstract public class RowKeyColumnOrder implements Comparator<String> {
-
-    public static final NumberOrder NUMBER_ORDER = new NumberOrder();
-    public static final StringOrder STRING_ORDER = new StringOrder();
-
-    public static RowKeyColumnOrder getInstance(DataType type) {
-        if (type.isNumberFamily() || type.isDateTimeFamily())
-            return NUMBER_ORDER;
-        else
-            return STRING_ORDER;
-    }
-
-    public String max(Collection<String> values) {
-        String max = null;
-        for (String v : values) {
-            if (max == null || compare(max, v) < 0)
-                max = v;
-        }
-        return max;
-    }
-
-    public String min(Collection<String> values) {
-        String min = null;
-        for (String v : values) {
-            if (min == null || compare(min, v) > 0)
-                min = v;
-        }
-        return min;
-    }
-
-    public String min(String v1, String v2) {
-        if (v1 == null)
-            return v2;
-        else if (v2 == null)
-            return v1;
-        else
-            return compare(v1, v2) <= 0 ? v1 : v2;
-    }
-
-    public String max(String v1, String v2) {
-        if (v1 == null)
-            return v2;
-        else if (v2 == null)
-            return v1;
-        else
-            return compare(v1, v2) >= 0 ? v1 : v2;
-    }
-
-    @Override
-    public int compare(String o1, String o2) {
-        // consider null
-        if (o1 == o2)
-            return 0;
-        if (o1 == null)
-            return -1;
-        if (o2 == null)
-            return 1;
-
-        return compareNonNull(o1, o2);
-    }
-
-    abstract int compareNonNull(String o1, String o2);
-
-    private static class StringOrder extends RowKeyColumnOrder {
-        @Override
-        public int compareNonNull(String o1, String o2) {
-            return o1.compareTo(o2);
-        }
-    }
-
-    private static class NumberOrder extends RowKeyColumnOrder {
-        @Override
-        public int compareNonNull(String o1, String o2) {
-            double d1 = Double.parseDouble(o1);
-            double d2 = Double.parseDouble(o2);
-            return Double.compare(d1, d2);
-        }
-    }
-
-}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index 2ab7aac..467a294 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.kylin.common.util.BytesSplitter;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.DataModelDesc;
@@ -147,16 +146,10 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, Serializab
         return factColumns;
     }
 
-    // sanity check the input record (in bytes) matches what's expected
-    public void sanityCheck(BytesSplitter bytesSplitter) {
-        if (columnCount != bytesSplitter.getBufferSize()) {
-            throw new IllegalArgumentException("Expect " + columnCount + " columns, but see "
-                    + bytesSplitter.getBufferSize() + " -- " + bytesSplitter);
-        }
-
-        // TODO: check data types here
+    public int getColumnCount() {
+        return columnCount;
     }
-
+    
     @Override
     public String getTableName() {
         return tableName;
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/DimensionRangeInfoTest.java b/core-cube/src/test/java/org/apache/kylin/cube/DimensionRangeInfoTest.java
new file mode 100644
index 0000000..43175c9
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/DimensionRangeInfoTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.DataModelManager;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DimensionRangeInfoTest extends LocalFileMetadataTestCase {
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testMergeRangeMap() {
+        DataModelDesc model = DataModelManager.getInstance(getTestConfig()).getDataModelDesc("ci_inner_join_model");
+        String colId = "TEST_KYLIN_FACT.CAL_DT";
+
+        // normal merge
+        {
+            Map<String, DimensionRangeInfo> m1 = new HashMap<>();
+            m1.put(colId, new DimensionRangeInfo("2012-01-01", "2012-05-31"));
+
+            Map<String, DimensionRangeInfo> m2 = new HashMap<>();
+            m2.put(colId, new DimensionRangeInfo("2012-06-01", "2013-06-30"));
+
+            DimensionRangeInfo r1 = DimensionRangeInfo.mergeRangeMap(model, m1, m2).get(colId);
+            Assert.assertEquals("2012-01-01", r1.getMin());
+            Assert.assertEquals("2013-06-30", r1.getMax());
+        }
+        
+        // missing column on one side
+        {
+            Map<String, DimensionRangeInfo> m1 = new HashMap<>();
+            m1.put(colId, new DimensionRangeInfo("2012-01-01", "2012-05-31"));
+
+            Map<String, DimensionRangeInfo> m2 = new HashMap<>();
+
+            Assert.assertTrue(DimensionRangeInfo.mergeRangeMap(model, m1, m2).isEmpty());
+            Assert.assertTrue(DimensionRangeInfo.mergeRangeMap(model, m2, m1).isEmpty());
+        }
+        
+        // null min/max value (happens on empty segment, or all-null columns)
+        {
+            Map<String, DimensionRangeInfo> m1 = new HashMap<>();
+            m1.put(colId, new DimensionRangeInfo(null, null));
+
+            Map<String, DimensionRangeInfo> m2 = new HashMap<>();
+            m2.put(colId, new DimensionRangeInfo("2012-06-01", "2013-06-30"));
+
+            DimensionRangeInfo r1 = DimensionRangeInfo.mergeRangeMap(model, m1, m2).get(colId);
+            Assert.assertEquals("2012-06-01", r1.getMin());
+            Assert.assertEquals("2013-06-30", r1.getMax());
+        }
+        
+    }
+}
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/common/SegmentPrunerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/common/SegmentPrunerTest.java
new file mode 100644
index 0000000..603db97
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/common/SegmentPrunerTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.common;
+
+import static org.apache.kylin.metadata.filter.TupleFilter.compare;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.common.util.SetAndUnsetSystemProp;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.SegmentRange.TSRange;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+public class SegmentPrunerTest extends LocalFileMetadataTestCase {
+    private CubeInstance cube;
+
+    @Before
+    public void setUp() {
+        this.createTestMetadata();
+        cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube("ssb_cube_with_dimention_range");
+    }
+
+    @After
+    public void after() {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testEmptySegment() {
+        CubeSegment seg = cube.getFirstSegment();
+        TblColRef col = cube.getModel().findColumn("CUSTOMER.C_NATION");
+
+        // a normal hit
+        TupleFilter f = compare(col, FilterOperatorEnum.EQ, "CHINA");
+        SegmentPruner segmentPruner = new SegmentPruner(f);
+        Assert.assertTrue(segmentPruner.check(seg));
+
+        // make the segment empty, it should be pruned
+        seg.setInputRecords(0);
+        Assert.assertFalse(segmentPruner.check(seg));
+    }
+
+    @Test
+    public void testDimensionRangeCheck() {
+        CubeSegment cubeSegment = cube.getSegments().getFirstSegment();
+
+        //integer
+        TblColRef qtyCol = cube.getModel().findColumn("V_LINEORDER.LO_QUANTITY");
+        TblColRef revCol = cube.getModel().findColumn("V_LINEORDER.V_REVENUE");
+        
+        TupleFilter constFilter_LO_QUANTITY0 = new ConstantTupleFilter(Sets.newHashSet("8", "18", "28"));//between min and max value
+        TupleFilter constFilter_LO_QUANTITY1 = new ConstantTupleFilter("1");//min value
+        TupleFilter constFilter_LO_QUANTITY2 = new ConstantTupleFilter("50");//max value
+        TupleFilter constFilter_LO_QUANTITY3 = new ConstantTupleFilter("0");//lt min value
+        TupleFilter constFilter_LO_QUANTITY4 = new ConstantTupleFilter("200");//gt max value
+        TupleFilter constFilter_LO_QUANTITY5 = new ConstantTupleFilter(Sets.newHashSet("51", "52", "53"));//gt max values
+
+        // non-constant filter
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.EQ, revCol);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertTrue(segmentPruner.check(cubeSegment));
+        }
+        
+        // is null
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.ISNULL);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertTrue(segmentPruner.check(cubeSegment));
+        }
+
+        //lt min value
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.LT, constFilter_LO_QUANTITY1);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertFalse(segmentPruner.check(cubeSegment));
+        }
+
+        //lte min value
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.LTE, constFilter_LO_QUANTITY1);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertTrue(segmentPruner.check(cubeSegment));
+        }
+
+        //lt max value
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.LT, constFilter_LO_QUANTITY2);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertTrue(segmentPruner.check(cubeSegment));
+        }
+
+        //gt max value
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.GT, constFilter_LO_QUANTITY2);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertFalse(segmentPruner.check(cubeSegment));
+        }
+
+        //gte max value
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.GTE, constFilter_LO_QUANTITY2);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertTrue(segmentPruner.check(cubeSegment));
+        }
+
+        //gt min value
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.GT, constFilter_LO_QUANTITY1);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertTrue(segmentPruner.check(cubeSegment));
+        }
+
+        //in over-max values
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.IN, constFilter_LO_QUANTITY5);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertFalse(segmentPruner.check(cubeSegment));
+        }
+
+        //in normal values
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.IN, constFilter_LO_QUANTITY0);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertTrue(segmentPruner.check(cubeSegment));
+        }
+
+        //lte under-min value
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.LTE, constFilter_LO_QUANTITY3);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertFalse(segmentPruner.check(cubeSegment));
+        }
+
+        //gte over-max value
+        {
+            TupleFilter f = compare(qtyCol, FilterOperatorEnum.GTE, constFilter_LO_QUANTITY4);
+            SegmentPruner segmentPruner = new SegmentPruner(f);
+            Assert.assertFalse(segmentPruner.check(cubeSegment));
+        }
+    }
+
+    @Test
+    public void testLegacyCubeSeg() {
+        // legacy cube segments does not have DimensionRangeInfo, but with TSRange can do some pruning
+        CubeInstance cube = CubeManager.getInstance(getTestConfig())
+                .getCube("test_kylin_cube_without_slr_left_join_ready_2_segments");
+
+        TblColRef col = cube.getModel().findColumn("TEST_KYLIN_FACT.CAL_DT");
+        CubeSegment seg = cube.getSegments(SegmentStatusEnum.READY).get(0);
+        TSRange tsRange = seg.getTSRange();
+        long start = tsRange.start.v;
+
+        try (SetAndUnsetSystemProp sns = new SetAndUnsetSystemProp("kylin.query.skip-empty-segments", "false")) {
+            {
+                TupleFilter f = compare(col, FilterOperatorEnum.LTE, start);
+                SegmentPruner segmentPruner = new SegmentPruner(f);
+                Assert.assertTrue(segmentPruner.check(seg));
+            }
+            {
+                TupleFilter f = compare(col, FilterOperatorEnum.LT, start);
+                SegmentPruner segmentPruner = new SegmentPruner(f);
+                Assert.assertFalse(segmentPruner.check(seg));
+            }
+        }
+    }
+}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java
index 5ccc1f3..d9eefb5 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java
@@ -19,7 +19,6 @@
 package org.apache.kylin.metadata.datatype;
 
 import java.io.Serializable;
-import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -35,7 +34,6 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.measure.MeasureTypeFactory;
 import org.apache.kylin.metadata.model.TblColRef.InnerDataTypeEnum;
 
@@ -162,6 +160,7 @@ public class DataType implements Serializable {
     private String name;
     private int precision;
     private int scale;
+    private transient DataTypeOrder order;
 
     public DataType(String name, int precision, int scale) {
         this.name = name;
@@ -224,24 +223,17 @@ public class DataType implements Serializable {
                 scale = KylinConfig.getInstanceFromEnv().getDefaultDecimalScale();
             }
         }
-
     }
 
+    public DataTypeOrder getOrder() {
+        if (order == null)
+            order = DataTypeOrder.getInstance(this);
+        
+        return order;
+    }
+    
     public int compare(String value1, String value2) {
-        if (isDateTimeFamily()) {
-            Long millis1 = DateFormat.stringToMillis(value1);
-            Long millis2 = DateFormat.stringToMillis(value2);
-            return millis1.compareTo(millis2);
-        } else if (isIntegerFamily()) {
-            Long l1 = new Long(value1);
-            Long l2 = new Long(value2);
-            return l1.compareTo(l2);
-        } else if (isNumberFamily()) {
-            BigDecimal bigDecimal1 = new BigDecimal(value1);
-            BigDecimal bigDecimal2 = new BigDecimal(value2);
-            return bigDecimal1.compareTo(bigDecimal2);
-        }
-        return value1.compareTo(value2);
+        return getOrder().compare(value1,  value2);
     }
 
     private String replaceLegacy(String str) {
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeOrder.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeOrder.java
new file mode 100644
index 0000000..091e2ae
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeOrder.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import java.math.BigDecimal;
+import java.util.Collection;
+import java.util.Comparator;
+
+import org.apache.kylin.common.util.DateFormat;
+
+/**
+ * Define order for string literals based on its underlying data type.
+ * 
+ * Null is the smallest.
+ */
+abstract public class DataTypeOrder implements Comparator<String> {
+
+    public static final DataTypeOrder INTEGER_ORDER = new IntegerOrder();
+    public static final DataTypeOrder DOUBLE_ORDER = new DoubleOrder();
+    public static final DataTypeOrder DECIMAL_ORDER = new DecimalOrder();
+    public static final DataTypeOrder DATETIME_ORDER = new DateTimeOrder();
+    public static final DataTypeOrder STRING_ORDER = new StringOrder();
+
+    // package private, access via DataType.getOrder()
+    static DataTypeOrder getInstance(DataType type) throws IllegalArgumentException {
+        if (type.isStringFamily())
+            return STRING_ORDER;
+        else if (type.isDateTimeFamily())
+            return DATETIME_ORDER;
+        else if (type.isIntegerFamily())
+            return INTEGER_ORDER;
+        else if (type.isFloat() || type.isDouble())
+            return DOUBLE_ORDER;
+        else if (type.isDecimal())
+            return DECIMAL_ORDER;
+        else
+            throw new IllegalArgumentException("Unsupported data type " + type);
+    }
+
+    public String max(Collection<String> values) {
+        String max = null;
+        for (String v : values) {
+            max = max(max, v);
+        }
+        return max;
+    }
+
+    public String min(Collection<String> values) {
+        String min = null;
+        for (String v : values) {
+            min = min(min, v);
+        }
+        return min;
+    }
+
+    public String min(String v1, String v2) {
+        if (v1 == null)
+            return v2;
+        else if (v2 == null)
+            return v1;
+        else
+            return compare(v1, v2) <= 0 ? v1 : v2;
+    }
+
+    public String max(String v1, String v2) {
+        if (v1 == null)
+            return v2;
+        else if (v2 == null)
+            return v1;
+        else
+            return compare(v1, v2) >= 0 ? v1 : v2;
+    }
+
+    @Override
+    public int compare(String s1, String s2) {
+        Comparable o1 = toComparable(s1);
+        Comparable o2 = toComparable(s2);
+        
+        // consider null
+        if (o1 == o2)
+            return 0;
+        if (o1 == null)
+            return -1;
+        if (o2 == null)
+            return 1;
+
+        return o1.compareTo(o2);
+    }
+
+    abstract Comparable toComparable(String s);
+
+    private static class StringOrder extends DataTypeOrder {
+        @Override
+        public String toComparable(String s) {
+            return s;
+        }
+    }
+
+    private static class IntegerOrder extends DataTypeOrder {
+        @Override
+        public Long toComparable(String s) {
+            if (s == null || s.isEmpty())
+                return null;
+            else
+                return Long.parseLong(s);
+        }
+    }
+
+    private static class DoubleOrder extends DataTypeOrder {
+        @Override
+        public Double toComparable(String s) {
+            if (s == null || s.isEmpty())
+                return null;
+            else
+                return Double.parseDouble(s);
+        }
+    }
+    
+    private static class DecimalOrder extends DataTypeOrder {
+        @Override
+        public BigDecimal toComparable(String s) {
+            if (s == null || s.isEmpty())
+                return null;
+            else
+                return new BigDecimal(s);
+        }
+    }
+    
+    private static class DateTimeOrder extends DataTypeOrder {
+        @Override
+        public Long toComparable(String s) {
+            if (s == null || s.isEmpty())
+                return null;
+            else
+                return DateFormat.stringToMillis(s);
+        }
+    }
+    
+}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
index 16f165a..2c75ec1 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
@@ -33,7 +33,7 @@ import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
  * @author xjiang
  */
 public class CompareTupleFilter extends TupleFilter implements IOptimizeableTupleFilter {
-
+    
     public enum CompareResultType {
         AlwaysTrue, AlwaysFalse, Unknown
     }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
index 09b41f5..672aba0 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
@@ -20,6 +20,7 @@ package org.apache.kylin.metadata.filter;
 
 import java.nio.ByteBuffer;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -31,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 /**
  * 
@@ -81,6 +83,44 @@ public abstract class TupleFilter {
         SWAP_OP_MAP.put(FilterOperatorEnum.LT, FilterOperatorEnum.GT);
         SWAP_OP_MAP.put(FilterOperatorEnum.GTE, FilterOperatorEnum.LTE);
     }
+    
+    public static CompareTupleFilter compare(TblColRef col, FilterOperatorEnum op) {
+        CompareTupleFilter r = new CompareTupleFilter(op);
+        r.addChild(new ColumnTupleFilter(col));
+        return r;
+    }
+    
+    public static CompareTupleFilter compare(TblColRef col, FilterOperatorEnum op, Object val) {
+        CompareTupleFilter r = new CompareTupleFilter(op);
+        r.addChild(new ColumnTupleFilter(col));
+        if (val instanceof TupleFilter)
+            r.addChild((TupleFilter) val);
+        else if (val instanceof TblColRef)
+            r.addChild(new ColumnTupleFilter((TblColRef) col));
+        else
+            r.addChild(new ConstantTupleFilter(val));
+        return r;
+    }
+
+    public static LogicalTupleFilter and(TupleFilter... children) {
+        LogicalTupleFilter r = new LogicalTupleFilter(FilterOperatorEnum.AND);
+        r.addChildren(children);
+        return r;
+    }
+    
+    public static LogicalTupleFilter or(TupleFilter... children) {
+        LogicalTupleFilter r = new LogicalTupleFilter(FilterOperatorEnum.OR);
+        r.addChildren(children);
+        return r;
+    }
+    
+    public static LogicalTupleFilter not(TupleFilter child) {
+        LogicalTupleFilter r = new LogicalTupleFilter(FilterOperatorEnum.NOT);
+        r.addChild(child);
+        return r;
+    }
+    
+    // ============================================================================
 
     protected final List<TupleFilter> children;
     protected FilterOperatorEnum operator;
@@ -245,7 +285,61 @@ public abstract class TupleFilter {
         }
         return oldProductFilters;
     }
+    
+    public HashMap<TblColRef, Object> findMustEqualColsAndValues(Collection<TblColRef> lookingForCols) {
+        HashMap<TblColRef, Object> result = new HashMap<>();
+        findMustEqualColsAndValues(this, lookingForCols, result);
+        return result;
+    }
 
+    private void findMustEqualColsAndValues(TupleFilter filter, Collection<TblColRef> lookingForCols, HashMap<TblColRef, Object> result) {
+        if (filter instanceof CompareTupleFilter) {
+            CompareTupleFilter comp = (CompareTupleFilter) filter;
+            TblColRef col = comp.getColumn();
+            if (lookingForCols.contains(col)) {
+                if (comp.getOperator() == FilterOperatorEnum.EQ)
+                    result.put(col, comp.getFirstValue());
+                else if (comp.getOperator() == FilterOperatorEnum.ISNULL)
+                    result.put(col, null);
+            }
+            return;
+        }
+
+        if (filter instanceof LogicalTupleFilter) {
+            LogicalTupleFilter logic = (LogicalTupleFilter) filter;
+            if (logic.getOperator() == FilterOperatorEnum.AND) {
+                for (TupleFilter child : logic.getChildren())
+                    findMustEqualColsAndValues(child, lookingForCols, result);
+            }
+            return;
+        }
+    }
+
+    //find must true compareTupleFilter
+    public Set<CompareTupleFilter> findMustTrueCompareFilters() {
+        Set<CompareTupleFilter> result = Sets.newHashSet();
+        findMustTrueCompareFilters(this, result);
+        return result;
+    }
+
+    private void findMustTrueCompareFilters(TupleFilter filter, Set<CompareTupleFilter> result) {
+        if (filter instanceof CompareTupleFilter) {
+            if (((CompareTupleFilter) filter).getColumn() != null) {
+                result.add((CompareTupleFilter) filter);
+            }
+            return;
+        }
+        
+        if (filter instanceof LogicalTupleFilter) {
+            if (filter.getOperator() == FilterOperatorEnum.AND) {
+                for (TupleFilter child : filter.getChildren()) {
+                    findMustTrueCompareFilters(child, result);
+                }
+            }
+            return;
+        }
+    }
+    
     public abstract boolean isEvaluable();
 
     public abstract boolean evaluate(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs);
@@ -284,26 +378,4 @@ public abstract class TupleFilter {
         }
     }
 
-    public static TupleFilter and(TupleFilter f1, TupleFilter f2) {
-        if (f1 == null)
-            return f2;
-        if (f2 == null)
-            return f1;
-
-        if (f1.getOperator() == FilterOperatorEnum.AND) {
-            f1.addChild(f2);
-            return f1;
-        }
-
-        if (f2.getOperator() == FilterOperatorEnum.AND) {
-            f2.addChild(f1);
-            return f2;
-        }
-
-        LogicalTupleFilter and = new LogicalTupleFilter(FilterOperatorEnum.AND);
-        and.addChild(f1);
-        and.addChild(f2);
-        return and;
-    }
-
 }
diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/DataTypeOrderTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/DataTypeOrderTest.java
new file mode 100644
index 0000000..7666ffe
--- /dev/null
+++ b/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/DataTypeOrderTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Set;
+
+public class DataTypeOrderTest {
+    @Test
+    public void testDataTypeOrder() {
+        DataType intType = DataType.getType("integer");
+        DataTypeOrder dataTypeOrder = intType.getOrder();
+        Set<String> integers = Sets.newHashSet("100000", "2", "1000", "100", "77", "10", "9", "2000000", "-10000", "0");
+        Assert.assertEquals("2000000", dataTypeOrder.max(integers));
+        Assert.assertEquals("-10000", dataTypeOrder.min(integers));
+
+        DataType doubleType = DataType.getType("double");
+        dataTypeOrder = doubleType.getOrder();
+        Set<String> doubels = Sets.newHashSet("1.1", "-299.5", "100000", "1.000", "4.000000001", "0.00", "-1000000.231231", "8000000",
+                "10", "10.00");
+        Assert.assertEquals("8000000", dataTypeOrder.max(doubels));
+        Assert.assertEquals("-1000000.231231", dataTypeOrder.min(doubels));
+
+        DataType datetimeType = DataType.getType("date");
+        dataTypeOrder = datetimeType.getOrder();
+        Set<String> datetimes = Sets.newHashSet("2010-01-02", "2888-08-09", "2018-05-26", "1527512082000", "2010-02-03 23:59:59",
+                "2000-12-12 12:00:00", "1970-01-19 00:18:32", "1998-12-02", "2018-05-28 10:00:00.255", "1995-09-20 20:00:00.220");
+        Assert.assertEquals("2888-08-09", dataTypeOrder.max(datetimes));
+        Assert.assertEquals("1970-01-19 00:18:32", dataTypeOrder.min(datetimes));
+
+        DataType stringType = new DataType("varchar", 256, 10);
+        dataTypeOrder = stringType.getOrder();
+        Set<String> strings = Sets.newHashSet(null, "", "中国", "China No.1", "神兽麒麟", "Rocket", "Apache Kylin", "google", "NULL",
+                "empty");
+        Assert.assertEquals("神兽麒麟", dataTypeOrder.max(strings));
+        Assert.assertEquals("", dataTypeOrder.min(strings));
+    }
+}
diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/filter/TupleFilterTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/filter/TupleFilterTest.java
index e17f4d1..95609eb 100644
--- a/core-metadata/src/test/java/org/apache/kylin/metadata/filter/TupleFilterTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/metadata/filter/TupleFilterTest.java
@@ -18,9 +18,21 @@
 
 package org.apache.kylin.metadata.filter;
 
+import static org.apache.kylin.metadata.filter.TupleFilter.and;
+import static org.apache.kylin.metadata.filter.TupleFilter.compare;
+import static org.apache.kylin.metadata.filter.TupleFilter.not;
+
+import java.util.HashMap;
+import java.util.Set;
+
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.google.common.collect.Sets;
+
 public class TupleFilterTest {
     @Test
     // true ==> true
@@ -62,4 +74,69 @@ public class TupleFilterTest {
         andFilter2.addChildren(ConstantTupleFilter.FALSE, ConstantTupleFilter.FALSE);
         Assert.assertEquals(andFilter2, andFilter.removeNot());
     }
+    
+    @Test
+    public void testFindMustEqualColsAndValues() {
+        TableDesc tbl = TableDesc.mockup("mockup_table");
+        TblColRef colA = TblColRef.mockup(tbl, 0, "A", "bigint");
+        TblColRef colB = TblColRef.mockup(tbl, 1, "B", "char(256)");
+        Set<TblColRef> cols = Sets.newHashSet(colA, colB);
+        
+        {
+            TupleFilter f = compare(colA, FilterOperatorEnum.EQ, "1234");
+            Assert.assertEquals(map(colA, "1234"), f.findMustEqualColsAndValues(cols));
+        }
+        
+        {
+            TupleFilter f = compare(colA, FilterOperatorEnum.ISNULL);
+            Assert.assertEquals(map(colA, null), f.findMustEqualColsAndValues(cols));
+        }
+        
+        {
+            TupleFilter f = and(compare(colA, FilterOperatorEnum.ISNULL), compare(colB, FilterOperatorEnum.EQ, "1234"));
+            Assert.assertEquals(map(colA, null, colB, "1234"), f.findMustEqualColsAndValues(cols));
+            Assert.assertTrue(not(f).findMustEqualColsAndValues(cols).isEmpty());
+        }
+        
+        {
+            TupleFilter f = compare(colA, FilterOperatorEnum.LT, "1234");
+            Assert.assertTrue(f.findMustEqualColsAndValues(cols).isEmpty());
+        }
+    }
+
+    private HashMap<TblColRef, Object> map(TblColRef col, String v) {
+        HashMap<TblColRef, Object> r = new HashMap<>();
+        r.put(col, v);
+        return r;
+    }
+    
+    private HashMap<TblColRef, Object> map(TblColRef col, String v, TblColRef col2, String v2) {
+        HashMap<TblColRef, Object> r = new HashMap<>();
+        r.put(col, v);
+        r.put(col2, v2);
+        return r;
+    }
+
+    @Test
+    public void testMustTrueTupleFilter() {
+        TupleFilter andFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
+        TupleFilter andFilter2  = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
+        TupleFilter orFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.OR);
+        andFilter.addChild(andFilter2);
+        andFilter.addChild(orFilter);
+
+        Set<CompareTupleFilter> trueTupleFilters = andFilter.findMustTrueCompareFilters();
+        Assert.assertTrue(trueTupleFilters.isEmpty());
+
+        TupleFilter compFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
+        compFilter.addChild(new ColumnTupleFilter(TblColRef.newInnerColumn("test1", TblColRef.InnerDataTypeEnum.LITERAL)));
+        TupleFilter compFilter2 = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
+        compFilter2.addChild(new ColumnTupleFilter(TblColRef.newInnerColumn("test2", TblColRef.InnerDataTypeEnum.LITERAL)));
+        andFilter2.addChild(compFilter);
+        orFilter.addChild(compFilter2);
+        Assert.assertEquals(Sets.newHashSet(compFilter), andFilter.findMustTrueCompareFilters());
+        
+        Assert.assertEquals(Sets.newHashSet(compFilter2), compFilter2.findMustTrueCompareFilters());
+    }
+    
 }
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
index 5f86a45..229ef01 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
@@ -31,7 +31,6 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.common.FuzzyValueCombination;
 import org.apache.kylin.cube.cuboid.Cuboid;
@@ -39,7 +38,6 @@ import org.apache.kylin.cube.gridtable.CubeGridTable;
 import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
 import org.apache.kylin.cube.gridtable.RecordComparators;
 import org.apache.kylin.cube.gridtable.ScanRangePlannerBase;
-import org.apache.kylin.cube.gridtable.SegmentGTStartAndEnd;
 import org.apache.kylin.cube.kv.CubeDimEncMap;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.gridtable.GTInfo;
@@ -137,15 +135,9 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
         this.gtDynColumns = new ImmutableBitSet(tmpGtDynCols);
         this.gtRtAggrMetrics = mapping.makeGridTableColumns(tmpRtAggrMetrics);
 
-        if (cubeSegment.getModel().getPartitionDesc().isPartitioned()) {
-            int index = mapping.getIndexOf(cubeSegment.getModel().getPartitionDesc().getPartitionDateColumnRef());
-            if (index >= 0) {
-                SegmentGTStartAndEnd segmentGTStartAndEnd = new SegmentGTStartAndEnd(cubeSegment, gtInfo);
-                this.gtStartAndEnd = segmentGTStartAndEnd.getSegmentStartAndEnd(index);
-                this.isPartitionColUsingDatetimeEncoding = segmentGTStartAndEnd.isUsingDatetimeEncoding(index);
-                this.gtPartitionCol = gtInfo.colRef(index);
-            }
-        }
+        this.gtAggrGroups = mapping.makeGridTableColumns(replaceDerivedColumns(groupByPushDown, cubeSegment.getCubeDesc()));
+        this.gtAggrMetrics = mapping.makeGridTableColumns(metrics);
+        this.gtAggrFuncs = mapping.makeAggrFuncs(metrics);
     }
 
     protected StorageContext context;
@@ -153,7 +145,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
     /**
      * Construct  GTScanRangePlanner with incomplete information. For UT only.
      */
-    public CubeScanRangePlanner(GTInfo info, Pair<ByteArray, ByteArray> gtStartAndEnd, TblColRef gtPartitionCol, TupleFilter gtFilter) {
+    public CubeScanRangePlanner(GTInfo info, TblColRef gtPartitionCol, TupleFilter gtFilter) {
 
         this.maxScanRanges = KylinConfig.getInstanceFromEnv().getQueryStorageVisitScanRangeMax();
         this.maxFuzzyKeysPerSplit = KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeyMax();
@@ -170,8 +162,6 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
         this.rangeStartEndComparator = RecordComparators.getRangeStartEndComparator(comp);
 
         this.gtFilter = gtFilter;
-        this.gtStartAndEnd = gtStartAndEnd;
-        this.gtPartitionCol = gtPartitionCol;
     }
 
     public GTScanRequest planScanRequest() {
@@ -237,18 +227,6 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
         Map<Integer, Set<ByteArray>> fuzzyValues = Maps.newHashMap();
 
         for (ColumnRange range : andDimRanges) {
-            if (gtPartitionCol != null && range.column.equals(gtPartitionCol)) {
-                int beginCompare = rangeStartEndComparator.comparator.compare(range.begin, gtStartAndEnd.getSecond());
-                int endCompare = rangeStartEndComparator.comparator.compare(gtStartAndEnd.getFirst(), range.end);
-
-                if ((isPartitionColUsingDatetimeEncoding && endCompare <= 0 && beginCompare < 0) || (!isPartitionColUsingDatetimeEncoding && endCompare <= 0 && beginCompare <= 0)) {
-                    //segment range is [Closed,Open), but segmentStartAndEnd.getSecond() might be rounded when using dict encoding, so use <= when has equals in condition. 
-                } else {
-                    logger.debug("Pre-check partition col filter failed, partitionColRef {}, segment start {}, segment end {}, range begin {}, range end {}", //
-                            gtPartitionCol, makeReadable(gtStartAndEnd.getFirst()), makeReadable(gtStartAndEnd.getSecond()), makeReadable(range.begin), makeReadable(range.end));
-                    return null;
-                }
-            }
 
             int col = range.column.getColumnDesc().getZeroBasedIndex();
             if (!gtInfo.getPrimaryKey().get(col))
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 352a868..269833f 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -33,6 +33,7 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.RawQueryLastHacker;
+import org.apache.kylin.cube.common.SegmentPruner;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
 import org.apache.kylin.cube.gridtable.CuboidToGridTableMappingExt;
@@ -87,14 +88,10 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         GTCubeStorageQueryRequest request = getStorageQueryRequest(context, sqlDigest, returnTupleInfo);
 
         List<CubeSegmentScanner> scanners = Lists.newArrayList();
-        for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
+        SegmentPruner segPruner = new SegmentPruner(sqlDigest.filter);
+        for (CubeSegment cubeSeg : segPruner.listSegmentsForQuery(cubeInstance)) {
             CubeSegmentScanner scanner;
 
-            if (cubeDesc.getConfig().isSkippingEmptySegments() && cubeSeg.getInputRecords() == 0) {
-                logger.info("Skip cube segment {} because its input record is 0", cubeSeg);
-                continue;
-            }
-
             scanner = new CubeSegmentScanner(cubeSeg, request.getCuboid(), request.getDimensions(), //
                     request.getGroups(), request.getDynGroups(), request.getDynGroupExprs(), //
                     request.getMetrics(), request.getDynFuncs(), //
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java
deleted file mode 100644
index 56b1106..0000000
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.translate;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.cube.kv.RowKeyColumnOrder;
-import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.collect.Sets;
-
-/**
- * 
- * @author xjiang
- * 
- */
-public class ColumnValueRange {
-    private TblColRef column;
-    private RowKeyColumnOrder order;
-    private String beginValue;
-    private String endValue;
-    private Set<String> equalValues;
-
-    public ColumnValueRange(TblColRef column, Collection<String> values, FilterOperatorEnum op) {
-        this.column = column;
-        this.order = RowKeyColumnOrder.getInstance(column.getType());
-
-        switch (op) {
-        case EQ:
-        case IN:
-            equalValues = new HashSet<String>(values);
-            refreshBeginEndFromEquals();
-            break;
-        case LT:
-        case LTE:
-            endValue = order.max(values);
-            break;
-        case GT:
-        case GTE:
-            beginValue = order.min(values);
-            break;
-        case NEQ:
-        case NOTIN:
-        case ISNULL: // TODO ISNULL worth pass down as a special equal value
-        case ISNOTNULL:
-            // let Optiq filter it!
-            break;
-        default:
-            throw new UnsupportedOperationException(op.name());
-        }
-    }
-
-    public ColumnValueRange(TblColRef column, String beginValue, String endValue, Set<String> equalValues) {
-        copy(column, beginValue, endValue, equalValues);
-    }
-
-    void copy(TblColRef column, String beginValue, String endValue, Set<String> equalValues) {
-        this.column = column;
-        this.order = RowKeyColumnOrder.getInstance(column.getType());
-        this.beginValue = beginValue;
-        this.endValue = endValue;
-        this.equalValues = equalValues;
-    }
-
-    public TblColRef getColumn() {
-        return column;
-    }
-
-    public String getBeginValue() {
-        return beginValue;
-    }
-
-    public String getEndValue() {
-        return endValue;
-    }
-
-    public Set<String> getEqualValues() {
-        return equalValues;
-    }
-
-    private void refreshBeginEndFromEquals() {
-        this.beginValue = order.min(this.equalValues);
-        this.endValue = order.max(this.equalValues);
-    }
-
-    public boolean satisfyAll() {
-        return beginValue == null && endValue == null && equalValues == null; // the NEQ case
-    }
-
-    public boolean satisfyNone() {
-        if (equalValues != null) {
-            return equalValues.isEmpty();
-        } else if (beginValue != null && endValue != null) {
-            return order.compare(beginValue, endValue) > 0;
-        } else {
-            return false;
-        }
-    }
-
-    public void andMerge(ColumnValueRange another) {
-        assert this.column.equals(another.column);
-
-        if (another.satisfyAll()) {
-            return;
-        }
-
-        if (this.satisfyAll()) {
-            copy(another.column, another.beginValue, another.endValue, another.equalValues);
-            return;
-        }
-
-        if (this.equalValues != null && another.equalValues != null) {
-            this.equalValues.retainAll(another.equalValues);
-            refreshBeginEndFromEquals();
-            return;
-        }
-
-        if (this.equalValues != null) {
-            this.equalValues = filter(this.equalValues, another.beginValue, another.endValue);
-            refreshBeginEndFromEquals();
-            return;
-        }
-
-        if (another.equalValues != null) {
-            this.equalValues = filter(another.equalValues, this.beginValue, this.endValue);
-            refreshBeginEndFromEquals();
-            return;
-        }
-
-        this.beginValue = order.max(this.beginValue, another.beginValue);
-        this.endValue = order.min(this.endValue, another.endValue);
-    }
-
-    private Set<String> filter(Set<String> equalValues, String beginValue, String endValue) {
-        Set<String> result = Sets.newHashSetWithExpectedSize(equalValues.size());
-        for (String v : equalValues) {
-            if (between(v, beginValue, endValue)) {
-                result.add(v);
-            }
-        }
-        return equalValues;
-    }
-
-    private boolean between(String v, String beginValue, String endValue) {
-        return (beginValue == null || order.compare(beginValue, v) <= 0) && (endValue == null || order.compare(v, endValue) <= 0);
-    }
-
-    // remove invalid EQ/IN values and round start/end according to dictionary
-    public void preEvaluateWithDict(Dictionary<String> dict) {
-        if (dict == null || dict.getSize() == 0)
-            return;
-
-        if (equalValues != null) {
-            Iterator<String> it = equalValues.iterator();
-            while (it.hasNext()) {
-                String v = it.next();
-                try {
-                    dict.getIdFromValue(v);
-                } catch (IllegalArgumentException e) {
-                    // value not in dictionary
-                    it.remove();
-                }
-            }
-            refreshBeginEndFromEquals();
-        }
-
-        if (beginValue != null) {
-            try {
-                beginValue = dict.getValueFromId(dict.getIdFromValue(beginValue, 1));
-            } catch (IllegalArgumentException e) {
-                // beginValue is greater than the biggest in dictionary, mark FALSE
-                equalValues = Sets.newHashSet();
-            }
-        }
-
-        if (endValue != null) {
-            try {
-                endValue = dict.getValueFromId(dict.getIdFromValue(endValue, -1));
-            } catch (IllegalArgumentException e) {
-                // endValue is lesser than the smallest in dictionary, mark FALSE
-                equalValues = Sets.newHashSet();
-            }
-        }
-    }
-
-    public String toString() {
-        if (equalValues == null) {
-            return column.getName() + " between " + beginValue + " and " + endValue;
-        } else {
-            return column.getName() + " in " + equalValues;
-        }
-    }
-}
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
index 7fa426f..4a80d29 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
@@ -24,10 +24,10 @@ import java.util.Set;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Array;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.kv.RowKeyColumnOrder;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.cube.model.CubeDesc.DeriveType;
 import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.metadata.datatype.DataTypeOrder;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.ConstantTupleFilter;
@@ -164,9 +164,9 @@ public class DerivedFilterTranslator {
 
     private static void findMinMax(Set<Array<String>> satisfyingHostRecords, TblColRef[] hostCols, String[] min, String[] max) {
 
-        RowKeyColumnOrder[] orders = new RowKeyColumnOrder[hostCols.length];
+        DataTypeOrder[] orders = new DataTypeOrder[hostCols.length];
         for (int i = 0; i < hostCols.length; i++) {
-            orders[i] = RowKeyColumnOrder.getInstance(hostCols[i].getType());
+            orders[i] = hostCols[i].getType().getOrder();
         }
 
         for (Array<String> rec : satisfyingHostRecords) {
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
deleted file mode 100644
index 85678ac..0000000
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.translate;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.kylin.common.debug.BackdoorToggles;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
-import org.apache.kylin.cube.kv.FuzzyKeyEncoder;
-import org.apache.kylin.cube.kv.FuzzyMaskEncoder;
-import org.apache.kylin.cube.kv.LazyRowKeyEncoder;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * @author xjiang
- */
-public class HBaseKeyRange implements Comparable<HBaseKeyRange> {
-
-    private static final Logger logger = LoggerFactory.getLogger(HBaseKeyRange.class);
-
-    private static final int FUZZY_VALUE_CAP = 100;
-    private static final byte[] ZERO_TAIL_BYTES = new byte[] { 0 };
-
-    private final CubeSegment cubeSeg;
-    private final Cuboid cuboid;
-    private final List<Collection<ColumnValueRange>> flatOrAndFilter; // OR-AND filter, (A AND B AND ..) OR (C AND D AND ..) OR ..
-
-    private byte[] startKey;
-    private byte[] stopKey;
-    private List<Pair<byte[], byte[]>> fuzzyKeys;
-
-    private String startKeyString;
-    private String stopKeyString;
-    private String fuzzyKeyString;
-
-    private long partitionColumnStartDate = Long.MIN_VALUE;
-    private long partitionColumnEndDate = Long.MAX_VALUE;
-
-    public HBaseKeyRange(CubeSegment cubeSeg, Cuboid cuboid, byte[] startKey, byte[] stopKey, List<Pair<byte[], byte[]>> fuzzyKeys, List<Collection<ColumnValueRange>> flatColumnValueFilter, long partitionColumnStartDate, long partitionColumnEndDate) {
-        this.cubeSeg = cubeSeg;
-        this.cuboid = cuboid;
-        this.startKey = startKey;
-        this.stopKey = stopKey;
-        this.fuzzyKeys = fuzzyKeys;
-        this.flatOrAndFilter = flatColumnValueFilter;
-        this.partitionColumnStartDate = partitionColumnStartDate;
-        this.partitionColumnEndDate = partitionColumnEndDate;
-        initDebugString();
-    }
-
-    public HBaseKeyRange(Collection<TblColRef> dimensionColumns, Collection<ColumnValueRange> andDimensionRanges, CubeSegment cubeSeg, CubeDesc cubeDesc) {
-        this.cubeSeg = cubeSeg;
-        long cuboidId = this.calculateCuboidID(cubeDesc, dimensionColumns);
-        this.cuboid = Cuboid.findById(cubeSeg, cuboidId);
-        this.flatOrAndFilter = Lists.newLinkedList();
-        this.flatOrAndFilter.add(andDimensionRanges);
-        init(andDimensionRanges);
-        initDebugString();
-    }
-
-    private long calculateCuboidID(CubeDesc cube, Collection<TblColRef> dimensions) {
-        long cuboidID = 0;
-        for (TblColRef column : dimensions) {
-            int index = cube.getRowkey().getColumnBitIndex(column);
-            cuboidID |= 1L << index;
-        }
-        return cuboidID;
-    }
-
-    private void init(Collection<ColumnValueRange> andDimensionRanges) {
-        int size = andDimensionRanges.size();
-        Map<TblColRef, String> startValues = Maps.newHashMapWithExpectedSize(size);
-        Map<TblColRef, String> stopValues = Maps.newHashMapWithExpectedSize(size);
-        Map<TblColRef, Set<String>> fuzzyValues = Maps.newHashMapWithExpectedSize(size);
-        for (ColumnValueRange dimRange : andDimensionRanges) {
-            TblColRef column = dimRange.getColumn();
-            startValues.put(column, dimRange.getBeginValue());
-            stopValues.put(column, dimRange.getEndValue());
-            fuzzyValues.put(column, dimRange.getEqualValues());
-
-            TblColRef partitionDateColumnRef = cubeSeg.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
-            if (column.equals(partitionDateColumnRef)) {
-                initPartitionRange(dimRange);
-            }
-        }
-
-        AbstractRowKeyEncoder encoder = new LazyRowKeyEncoder(cubeSeg, cuboid);
-        encoder.setBlankByte(RowConstants.ROWKEY_LOWER_BYTE);
-        this.startKey = encoder.encode(startValues);
-        encoder.setBlankByte(RowConstants.ROWKEY_UPPER_BYTE);
-        // In order to make stopRow inclusive add a trailing 0 byte. #See Scan.setStopRow(byte [] stopRow)
-        this.stopKey = Bytes.add(encoder.encode(stopValues), ZERO_TAIL_BYTES);
-
-        // always fuzzy match cuboid ID to lock on the selected cuboid
-        this.fuzzyKeys = buildFuzzyKeys(fuzzyValues);
-    }
-
-    private void initPartitionRange(ColumnValueRange dimRange) {
-        if (null != dimRange.getBeginValue()) {
-            this.partitionColumnStartDate = DateFormat.stringToMillis(dimRange.getBeginValue());
-        }
-        if (null != dimRange.getEndValue()) {
-            this.partitionColumnEndDate = DateFormat.stringToMillis(dimRange.getEndValue());
-        }
-    }
-
-    private void initDebugString() {
-        this.startKeyString = BytesUtil.toHex(this.startKey);
-        this.stopKeyString = BytesUtil.toHex(this.stopKey);
-        StringBuilder buf = new StringBuilder();
-        for (Pair<byte[], byte[]> fuzzyKey : this.fuzzyKeys) {
-            buf.append(BytesUtil.toHex(fuzzyKey.getFirst()));
-            buf.append(" ");
-            buf.append(BytesUtil.toHex(fuzzyKey.getSecond()));
-            buf.append(";");
-        }
-        this.fuzzyKeyString = buf.toString();
-    }
-
-    private List<Pair<byte[], byte[]>> buildFuzzyKeys(Map<TblColRef, Set<String>> fuzzyValueSet) {
-        ArrayList<Pair<byte[], byte[]>> result = new ArrayList<Pair<byte[], byte[]>>();
-
-        // debug/profiling purpose
-        if (BackdoorToggles.getDisableFuzzyKey()) {
-            logger.info("The execution of this query will not use fuzzy key");
-            return result;
-        }
-
-        FuzzyKeyEncoder fuzzyKeyEncoder = new FuzzyKeyEncoder(cubeSeg, cuboid);
-        FuzzyMaskEncoder fuzzyMaskEncoder = new FuzzyMaskEncoder(cubeSeg, cuboid);
-
-        List<Map<TblColRef, String>> fuzzyValues = FuzzyValueCombination.calculate(fuzzyValueSet, FUZZY_VALUE_CAP);
-        for (Map<TblColRef, String> fuzzyValue : fuzzyValues) {
-            result.add(Pair.newPair(fuzzyKeyEncoder.encode(fuzzyValue), fuzzyMaskEncoder.encode(fuzzyValue)));
-        }
-        return result;
-    }
-
-    public CubeSegment getCubeSegment() {
-        return this.cubeSeg;
-    }
-
-    public Cuboid getCuboid() {
-        return cuboid;
-    }
-
-    public byte[] getStartKey() {
-        return startKey;
-    }
-
-    public byte[] getStopKey() {
-        return stopKey;
-    }
-
-    public List<Pair<byte[], byte[]>> getFuzzyKeys() {
-        return fuzzyKeys;
-    }
-
-    public String getStartKeyAsString() {
-        return startKeyString;
-    }
-
-    public String getStopKeyAsString() {
-        return stopKeyString;
-    }
-
-    public String getFuzzyKeyAsString() {
-        return fuzzyKeyString;
-    }
-
-    public List<Collection<ColumnValueRange>> getFlatOrAndFilter() {
-        return flatOrAndFilter;
-    }
-
-    public long getPartitionColumnStartDate() {
-        return partitionColumnStartDate;
-    }
-
-    public long getPartitionColumnEndDate() {
-        return partitionColumnEndDate;
-    }
-
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + ((cubeSeg == null) ? 0 : cubeSeg.hashCode());
-        result = prime * result + ((cuboid == null) ? 0 : cuboid.hashCode());
-        result = prime * result + ((fuzzyKeyString == null) ? 0 : fuzzyKeyString.hashCode());
-        result = prime * result + ((startKeyString == null) ? 0 : startKeyString.hashCode());
-        result = prime * result + ((stopKeyString == null) ? 0 : stopKeyString.hashCode());
-        return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        HBaseKeyRange other = (HBaseKeyRange) obj;
-        if (cubeSeg == null) {
-            if (other.cubeSeg != null)
-                return false;
-        } else if (!cubeSeg.equals(other.cubeSeg))
-            return false;
-        if (cuboid == null) {
-            if (other.cuboid != null)
-                return false;
-        } else if (!cuboid.equals(other.cuboid))
-            return false;
-        if (fuzzyKeyString == null) {
-            if (other.fuzzyKeyString != null)
-                return false;
-        } else if (!fuzzyKeyString.equals(other.fuzzyKeyString))
-            return false;
-        if (startKeyString == null) {
-            if (other.startKeyString != null)
-                return false;
-        } else if (!startKeyString.equals(other.startKeyString))
-            return false;
-        if (stopKeyString == null) {
-            if (other.stopKeyString != null)
-                return false;
-        } else if (!stopKeyString.equals(other.stopKeyString))
-            return false;
-        return true;
-    }
-
-    @Override
-    public int compareTo(HBaseKeyRange other) {
-        return Bytes.compareTo(this.startKey, other.startKey);
-    }
-
-    public boolean hitSegment() {
-        return cubeSeg.getTSRange().start.v <= getPartitionColumnEndDate() && cubeSeg.getTSRange().end.v >= getPartitionColumnStartDate();
-    }
-}
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
index 073c12c..08bcb65 100644
--- a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
+++ b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
@@ -32,7 +32,6 @@ import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.gridtable.CubeCodeSystem;
 import org.apache.kylin.dict.NumberDictionaryForestBuilder;
 import org.apache.kylin.dict.StringBytesConverter;
@@ -121,13 +120,11 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
 
         ByteArray segmentStart = enc(info, 0, "2015-01-14");
         ByteArray segmentStartX = enc(info, 0, "2015-01-14 00:00:00");//when partition col is dict encoded, time format will be free
-        ByteArray segmentEnd = enc(info, 0, "2015-01-15");
         assertEquals(segmentStart, segmentStartX);
 
         {
             LogicalTupleFilter filter = and(timeComp0, ageComp1);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals(1, r.size());//scan range are [close,close]
             assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
@@ -136,64 +133,57 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         }
         {
             LogicalTupleFilter filter = and(timeComp2, ageComp1);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(0, r.size());
+            assertEquals(1, r.size());
         }
         {
             LogicalTupleFilter filter = and(timeComp4, ageComp1);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(0, r.size());
+            assertEquals(1, r.size());
         }
         {
             LogicalTupleFilter filter = and(timeComp5, ageComp1);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(0, r.size());
+            assertEquals(1, r.size());
         }
         {
             LogicalTupleFilter filter = or(and(timeComp2, ageComp1), and(timeComp1, ageComp1),
                     and(timeComp6, ageComp1));
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(1, r.size());
-            assertEquals("[1421193600000, 10]-[null, 10]", r.get(0).toString());
+            assertEquals(2, r.size());
+            assertEquals("[1421193600000, 10]-[null, 10]", r.get(1).toString());
             assertEquals("[[null, 10, null, null, null], [1421193600000, 10, null, null, null]]",
-                    r.get(0).fuzzyKeys.toString());
+                    r.get(1).fuzzyKeys.toString());
         }
         {
             LogicalTupleFilter filter = or(and(timeComp3, ageComp3), and(timeComp7, ageComp1));
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals("[[0, 10]-[1421280000000, 30]]", r.toString());
         }
         {
             LogicalTupleFilter filter = or(timeComp2, timeComp1, timeComp6);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(1, r.size());
-            assertEquals("[1421193600000, null]-[null, null]", r.get(0).toString());
-            assertEquals(0, r.get(0).fuzzyKeys.size());
+            assertEquals(2, r.size());
+            assertEquals("[1421193600000, null]-[null, null]", r.get(1).toString());
+            assertEquals(0, r.get(1).fuzzyKeys.size());
         }
         {
             //skip FALSE filter
             LogicalTupleFilter filter = and(ageComp1, ConstantTupleFilter.FALSE);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals(0, r.size());
         }
         {
             //TRUE or FALSE filter
             LogicalTupleFilter filter = or(ConstantTupleFilter.TRUE, ConstantTupleFilter.FALSE);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals(1, r.size());
             assertEquals("[null, null]-[null, null]", r.get(0).toString());
@@ -201,8 +191,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         {
             //TRUE or other filter
             LogicalTupleFilter filter = or(ageComp1, ConstantTupleFilter.TRUE);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(segmentStart, segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals(1, r.size());
             assertEquals("[null, null]-[null, null]", r.get(0).toString());
@@ -211,12 +200,9 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
 
     @Test
     public void verifySegmentSkipping2() {
-        ByteArray segmentEnd = enc(info, 0, "2015-01-15");
-
         {
             LogicalTupleFilter filter = and(timeComp0, ageComp1);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals(1, r.size());//scan range are [close,close]
             assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
@@ -226,10 +212,9 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
 
         {
             LogicalTupleFilter filter = and(timeComp5, ageComp1);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, Pair.newPair(new ByteArray(), segmentEnd),
-                    info.colRef(0), filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
             List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(0, r.size());//scan range are [close,close]
+            assertEquals(1, r.size());//scan range are [close,close]
         }
     }
 
@@ -239,7 +224,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         // flatten or-and & hbase fuzzy value
         {
             LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2));
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals(1, r.size());
             assertEquals("[1421193600000, 10]-[null, 20]", r.get(0).toString());
@@ -249,7 +234,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         // pre-evaluate ever false
         {
             LogicalTupleFilter filter = and(timeComp1, timeComp2);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals(0, r.size());
         }
@@ -257,7 +242,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         // pre-evaluate ever true
         {
             LogicalTupleFilter filter = or(timeComp1, ageComp4);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals("[[null, null]-[null, null]]", r.toString());
         }
@@ -265,7 +250,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         // merge overlap range
         {
             LogicalTupleFilter filter = or(timeComp1, timeComp3);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals("[[null, null]-[null, null]]", r.toString());
         }
@@ -274,7 +259,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         {
             LogicalTupleFilter filter = or(and(timeComp4, ageComp1), and(timeComp4, ageComp2),
                     and(timeComp4, ageComp3));
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, null, filter);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
             List<GTScanRange> r = planner.planScanRanges();
             assertEquals(3, r.size());
             assertEquals("[1421280000000, 10]-[1421280000000, 10]", r.get(0).toString());
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/translate/ColumnValueRangeTest.java b/core-storage/src/test/java/org/apache/kylin/storage/translate/ColumnValueRangeTest.java
deleted file mode 100644
index 0e7e91f..0000000
--- a/core-storage/src/test/java/org/apache/kylin/storage/translate/ColumnValueRangeTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.translate;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.dict.StringBytesConverter;
-import org.apache.kylin.dict.TrieDictionaryBuilder;
-import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class ColumnValueRangeTest extends LocalFileMetadataTestCase {
-
-    @BeforeClass
-    public static void setUp() throws Exception {
-        staticCreateTestMetadata();
-    }
-
-    @AfterClass
-    public static void after() throws Exception {
-        cleanAfterClass();
-    }
-
-    @Test
-    public void testPreEvaluateWithDict() {
-        TblColRef col = mockupTblColRef();
-        Dictionary<String> dict = mockupDictionary(col, "CN", "US");
-
-        ColumnValueRange r1 = new ColumnValueRange(col, set("CN", "US", "Other"), FilterOperatorEnum.EQ);
-        r1.preEvaluateWithDict(dict);
-        assertEquals(set("CN", "US"), r1.getEqualValues());
-
-        // less than rounding
-        {
-            ColumnValueRange r2 = new ColumnValueRange(col, set("CN"), FilterOperatorEnum.LT);
-            r2.preEvaluateWithDict(dict);
-            assertEquals(null, r2.getBeginValue());
-            assertEquals("CN", r2.getEndValue());
-
-            ColumnValueRange r3 = new ColumnValueRange(col, set("Other"), FilterOperatorEnum.LT);
-            r3.preEvaluateWithDict(dict);
-            assertEquals(null, r3.getBeginValue());
-            assertEquals("CN", r3.getEndValue());
-
-            ColumnValueRange r4 = new ColumnValueRange(col, set("UT"), FilterOperatorEnum.LT);
-            r4.preEvaluateWithDict(dict);
-            assertEquals(null, r4.getBeginValue());
-            assertEquals("US", r4.getEndValue());
-        }
-
-        // greater than rounding
-        {
-            ColumnValueRange r2 = new ColumnValueRange(col, set("CN"), FilterOperatorEnum.GTE);
-            r2.preEvaluateWithDict(dict);
-            assertEquals("CN", r2.getBeginValue());
-            assertEquals(null, r2.getEndValue());
-
-            ColumnValueRange r3 = new ColumnValueRange(col, set("Other"), FilterOperatorEnum.GTE);
-            r3.preEvaluateWithDict(dict);
-            assertEquals("US", r3.getBeginValue());
-            assertEquals(null, r3.getEndValue());
-
-            ColumnValueRange r4 = new ColumnValueRange(col, set("CI"), FilterOperatorEnum.GTE);
-            r4.preEvaluateWithDict(dict);
-            assertEquals("CN", r4.getBeginValue());
-            assertEquals(null, r4.getEndValue());
-        }
-
-        // ever false check
-        {
-            ColumnValueRange r2 = new ColumnValueRange(col, set("UT"), FilterOperatorEnum.GTE);
-            r2.preEvaluateWithDict(dict);
-            assertTrue(r2.satisfyNone());
-
-            ColumnValueRange r3 = new ColumnValueRange(col, set("CM"), FilterOperatorEnum.LT);
-            r3.preEvaluateWithDict(dict);
-            assertTrue(r3.satisfyNone());
-        }
-    }
-
-    public static Dictionary<String> mockupDictionary(TblColRef col, String... values) {
-        TrieDictionaryBuilder<String> builder = new TrieDictionaryBuilder<String>(new StringBytesConverter());
-        for (String v : values) {
-            builder.addValue(v);
-        }
-        return builder.build(0);
-    }
-
-    private static Set<String> set(String... values) {
-        HashSet<String> list = new HashSet<String>();
-        list.addAll(Arrays.asList(values));
-        return list;
-    }
-
-    public static TblColRef mockupTblColRef() {
-        TableDesc t = TableDesc.mockup("table_a");
-        return TblColRef.mockup(t, 1, "col_1", "string");
-    }
-}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
index 40f1ac5..5dd55b2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
@@ -43,10 +43,10 @@ import com.google.common.collect.Sets;
 
 /**
  */
+@SuppressWarnings("serial")
 public class BaseCuboidBuilder implements java.io.Serializable {
 
     protected static final Logger logger = LoggerFactory.getLogger(BaseCuboidBuilder.class);
-    public static final String HIVE_NULL = "\\N";
     protected String cubeName;
     protected Cuboid baseCuboid;
     protected CubeDesc cubeDesc;
@@ -95,7 +95,6 @@ public class BaseCuboidBuilder implements java.io.Serializable {
 
     private void initNullBytes() {
         nullStrs = Sets.newHashSet();
-        nullStrs.add(HIVE_NULL);
         String[] nullStrings = cubeDesc.getNullStrings();
         if (nullStrings != null) {
             for (String s : nullStrings) {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index 0ad4b9e..091f9a2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -18,6 +18,10 @@
 
 package org.apache.kylin.engine.mr.steps;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
@@ -34,15 +38,10 @@ import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
 /**
  */
 abstract public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, Text, Text> {
     protected static final Logger logger = LoggerFactory.getLogger(BaseCuboidMapperBase.class);
-    public static final String HIVE_NULL = "\\N";
     public static final byte[] ONE = Bytes.toBytes("1");
     protected String cubeName;
     protected String segmentID;
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
index f3bdabd..4305a25 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
@@ -104,7 +104,6 @@ public class CalculateStatsFromBaseCuboidJob extends AbstractHadoopJob {
 
     private void setupReducer(Path output, CubeSegment cubeSeg) throws IOException {
         int hllShardBase = MapReduceUtil.getCuboidHLLCounterReducerNum(cubeSeg.getCubeInstance());
-        job.getConfiguration().setInt(BatchConstants.CFG_HLL_REDUCER_NUM, hllShardBase);
 
         job.setReducerClass(CalculateStatsFromBaseCuboidReducer.class);
         job.setOutputFormatClass(SequenceFileOutputFormat.class);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
index 9bede82..4ea04d5 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
@@ -55,8 +55,7 @@ public class FactDistinctColumnPartitioner extends Partitioner<SelfDefineSortabl
         String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
         CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
 
-        reducerMapping = new FactDistinctColumnsReducerMapping(cube,
-                conf.getInt(BatchConstants.CFG_HLL_REDUCER_NUM, 1));
+        reducerMapping = new FactDistinctColumnsReducerMapping(cube);
     }
 
     @Override
@@ -65,8 +64,6 @@ public class FactDistinctColumnPartitioner extends Partitioner<SelfDefineSortabl
         if (key.getBytes()[0] == FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER) {
             Long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG);
             return reducerMapping.getReducerIdForCuboidRowCount(cuboidId);
-        } else if (key.getBytes()[0] == FactDistinctColumnsReducerMapping.MARK_FOR_PARTITION_COL) {
-            return reducerMapping.getReducerIdForDatePartitionColumn();
         } else {
             return BytesUtil.readUnsigned(key.getBytes(), 0, 1);
         }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index f96944a..8f5d176 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -131,6 +131,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
             throws IOException {
         FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(cubeSeg.getCubeInstance());
         int numberOfReducers = reducerMapping.getTotalReducerNum();
+        logger.info("{} has reducers {}.", this.getClass().getName(), numberOfReducers);
         if (numberOfReducers > 250) {
             throw new IllegalArgumentException(
                     "The max reducer number for FactDistinctColumnsJob is 250, but now it is "
@@ -141,7 +142,6 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
         job.setReducerClass(FactDistinctColumnsReducer.class);
         job.setPartitionerClass(FactDistinctColumnPartitioner.class);
         job.setNumReduceTasks(numberOfReducers);
-        job.getConfiguration().setInt(BatchConstants.CFG_HLL_REDUCER_NUM, reducerMapping.getCuboidRowCounterReducerNum());
 
         // make each reducer output to respective dir
         MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class, NullWritable.class, Text.class);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
index 569b810..fc9dc65 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
@@ -37,7 +37,6 @@ import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.hllc.HLLCounter;
 import org.apache.kylin.measure.hllc.RegisterType;
 import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,9 +67,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
 
     private static final Text EMPTY_TEXT = new Text();
 
-    private int partitionColumnIndex = -1;
-    private boolean needFetchPartitionCol = true;
-
     private SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
 
     @Override
@@ -96,18 +92,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
             allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision(), RegisterType.DENSE);
         }
 
-        TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
-        if (partitionColRef != null) {
-            partitionColumnIndex = intermediateTableDesc.getColumnIndex(partitionColRef);
-        }
-
-        // check whether need fetch the partition col values
-        if (partitionColumnIndex < 0) {
-            // if partition col not on cube, no need
-            needFetchPartitionCol = false;
-        } else {
-            needFetchPartitionCol = true;
-        }
         //for KYLIN-2518 backward compatibility
         boolean isUsePutRowKeyToHllNewAlgorithm;
         if (KylinVersion.isBefore200(cubeDesc.getVersion())) {
@@ -172,12 +156,12 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
 
         for (String[] row : rowCollection) {
             context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row));
-            for (int i = 0; i < dictCols.size(); i++) {
-                String fieldValue = row[dictionaryColumnIndex[i]];
+            for (int i = 0; i < allDimDictCols.size(); i++) {
+                String fieldValue = row[columnIndex[i]];
                 if (fieldValue == null)
                     continue;
 
-                int reducerIndex = reducerMapping.getReducerIdForDictCol(i, fieldValue);
+                int reducerIndex = reducerMapping.getReducerIdForCol(i, fieldValue);
                 
                 tmpbuf.clear();
                 byte[] valueBytes = Bytes.toBytes(fieldValue);
@@ -188,15 +172,14 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
                 tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
                 tmpbuf.put(valueBytes);
                 outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
-                DataType type = dictCols.get(i).getType();
+                DataType type = allDimDictCols.get(i).getType();
                 sortableKey.init(outputKey, type);
-                //judge type
                 context.write(sortableKey, EMPTY_TEXT);
 
                 // log a few rows for troubleshooting
                 if (rowCount < 10) {
                     logger.info(
-                            "Sample output: " + dictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex);
+                            "Sample output: " + allDimDictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex);
                 }
             }
 
@@ -204,22 +187,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
                 putRowKeyToHLL(row);
             }
 
-            if (needFetchPartitionCol == true) {
-                String fieldValue = row[partitionColumnIndex];
-                if (fieldValue != null) {
-                    tmpbuf.clear();
-                    byte[] valueBytes = Bytes.toBytes(fieldValue);
-                    int size = valueBytes.length + 1;
-                    if (size >= tmpbuf.capacity()) {
-                        tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size));
-                    }
-                    tmpbuf.put((byte) FactDistinctColumnsReducerMapping.MARK_FOR_PARTITION_COL);
-                    tmpbuf.put(valueBytes);
-                    outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
-                    sortableKey.init(outputKey, (byte) 0);
-                    context.write(sortableKey, EMPTY_TEXT);
-                }
-            }
             rowCount++;
         }
     }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index c66042b..ceddeb5 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -39,8 +39,6 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.metadata.model.TblColRef;
 
-import com.google.common.collect.Lists;
-
 /**
  */
 abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, SelfDefineSortableKey, Text> {
@@ -50,8 +48,8 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends Kyli
     protected CubeSegment cubeSeg;
     protected CubeDesc cubeDesc;
     protected long baseCuboidId;
-    protected List<TblColRef> dictCols;
     protected IMRTableInputFormat flatTableInputFormat;
+    protected List<TblColRef> allDimDictCols;
 
     protected Text outputKey = new Text();
     //protected SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
@@ -59,7 +57,7 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends Kyli
     protected int errorRecordCounter = 0;
 
     protected CubeJoinedFlatTableEnrich intermediateTableDesc;
-    protected int[] dictionaryColumnIndex;
+    protected int[] columnIndex;
 
     protected FactDistinctColumnsReducerMapping reducerMapping;
     
@@ -74,20 +72,18 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends Kyli
         cubeSeg = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID));
         cubeDesc = cube.getDescriptor();
         baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        dictCols = Lists.newArrayList(cubeDesc.getAllColumnsNeedDictionaryBuilt());
+        reducerMapping = new FactDistinctColumnsReducerMapping(cube);
+        allDimDictCols = reducerMapping.getAllDimDictCols();
 
         flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
 
         intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc);
-        dictionaryColumnIndex = new int[dictCols.size()];
-        for (int i = 0; i < dictCols.size(); i++) {
-            TblColRef colRef = dictCols.get(i);
+        columnIndex = new int[allDimDictCols.size()];
+        for (int i = 0; i < allDimDictCols.size(); i++) {
+            TblColRef colRef = allDimDictCols.get(i);
             int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef);
-            dictionaryColumnIndex[i] = columnIndexOnFlatTbl;
+            columnIndex[i] = columnIndexOnFlatTbl;
         }
-
-        reducerMapping = new FactDistinctColumnsReducerMapping(cube,
-                conf.getInt(BatchConstants.CFG_HLL_REDUCER_NUM, 1));
     }
 
     protected void handleErrorRecord(String[] record, Exception ex) throws IOException {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 801771a..61ba247 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -71,17 +70,16 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
     private boolean isStatistics = false;
     private KylinConfig cubeConfig;
     private int taskId;
-    private boolean isPartitionCol = false;
     private int rowCount = 0;
     private FactDistinctColumnsReducerMapping reducerMapping;
 
     //local build dict
     private boolean buildDictInReducer;
     private IDictionaryBuilder builder;
-    private long timeMaxValue = Long.MIN_VALUE;
-    private long timeMinValue = Long.MAX_VALUE;
+    private String maxValue = null;
+    private String minValue = null;
     public static final String DICT_FILE_POSTFIX = ".rldict";
-    public static final String PARTITION_COL_INFO_FILE_POSTFIX = ".pci";
+    public static final String DIMENSION_COL_INFO_FILE_POSTFIX = ".dci";
 
     private MultipleOutputs mos;
 
@@ -97,11 +95,9 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
         cubeConfig = cube.getConfig();
         cubeDesc = cube.getDescriptor();
 
-        int numberOfTasks = context.getNumReduceTasks();
         taskId = context.getTaskAttemptID().getTaskID().getId();
 
-        reducerMapping = new FactDistinctColumnsReducerMapping(cube,
-                conf.getInt(BatchConstants.CFG_HLL_REDUCER_NUM, 1));
+        reducerMapping = new FactDistinctColumnsReducerMapping(cube);
         
         logger.info("reducer no " + taskId + ", role play " + reducerMapping.getRolePlayOfReducer(taskId));
 
@@ -114,18 +110,9 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
             samplingPercentage = Integer
                     .parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
             logger.info("Reducer " + taskId + " handling stats");
-        } else if (reducerMapping.isPartitionColReducer(taskId)) {
-            // partition col
-            isPartitionCol = true;
-            col = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
-            if (col == null) {
-                logger.info("No partition col. This reducer will do nothing");
-            } else {
-                logger.info("Reducer " + taskId + " handling partition col " + col.getIdentity());
-            }
         } else {
             // normal col
-            col = reducerMapping.getDictColForReducer(taskId);
+            col = reducerMapping.getColForReducer(taskId);
             Preconditions.checkNotNull(col);
 
             // local build dict
@@ -133,7 +120,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
             if (cubeDesc.getDictionaryBuilderClass(col) != null) { // only works with default dictionary builder
                 buildDictInReducer = false;
             }
-            if (reducerMapping.getReducerNumForDictCol(col) > 1) {
+            if (reducerMapping.getReducerNumForDimCol(col) > 1) {
                 buildDictInReducer = false; // only works if this is the only reducer of a dictionary column
             }
             if (buildDictInReducer) {
@@ -167,24 +154,29 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
                     cuboidHLLMap.put(cuboidId, hll);
                 }
             }
-        } else if (isPartitionCol) {
-            // partition col
+        } else {
             String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1);
             logAFewRows(value);
-            long time = DateFormat.stringToMillis(value);
-            timeMinValue = Math.min(timeMinValue, time);
-            timeMaxValue = Math.max(timeMaxValue, time);
-        } else {
-            // normal col
-            if (buildDictInReducer) {
-                String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1);
-                logAFewRows(value);
-                builder.addValue(value);
-            } else {
-                byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
-                // output written to baseDir/colName/-r-00000 (etc)
-                String fileName = col.getIdentity() + "/";
-                mos.write(BatchConstants.CFG_OUTPUT_COLUMN, NullWritable.get(), new Text(keyBytes), fileName);
+            // if dimension col, compute max/min value
+            if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
+                if (minValue == null || col.getType().compare(minValue, value) > 0) {
+                    minValue = value;
+                }
+                if (maxValue == null || col.getType().compare(maxValue, value) < 0) {
+                    maxValue = value;
+                }
+            }
+
+            //if dict column
+            if (cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) {
+                if (buildDictInReducer) {
+                    builder.addValue(value);
+                } else {
+                    byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
+                    // output written to baseDir/colName/-r-00000 (etc)
+                    String fileName = col.getIdentity() + "/";
+                    mos.write(BatchConstants.CFG_OUTPUT_COLUMN, NullWritable.get(), new Text(keyBytes), fileName);
+                }
             }
         }
 
@@ -207,11 +199,12 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
 
             logMapperAndCuboidStatistics(allCuboids); // for human check
             outputStatistics(allCuboids);
-        } else if (isPartitionCol) {
-            // partition col
-            outputPartitionInfo();
         } else {
-            // normal col
+            //dimension col
+            if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
+                outputDimRangeInfo();
+            }
+            // dic col
             if (buildDictInReducer) {
                 Dictionary<String> dict = builder.build();
                 outputDict(col, dict);
@@ -221,14 +214,17 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
         mos.close();
     }
 
-    private void outputPartitionInfo() throws IOException, InterruptedException {
-        if (col != null) {
-            // output written to baseDir/colName/colName.pci-r-00000 (etc)
-            String partitionFileName = col.getIdentity() + "/" + col.getName() + PARTITION_COL_INFO_FILE_POSTFIX;
-
-            mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMinValue), partitionFileName);
-            mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMaxValue), partitionFileName);
-            logger.info("write partition info for col : " + col.getName() + "  minValue:" + timeMinValue + " maxValue:" + timeMaxValue);
+    private void outputDimRangeInfo() throws IOException, InterruptedException {
+        if (col != null && minValue != null) {
+            // output written to baseDir/colName/colName.dci-r-00000 (etc)
+            String dimRangeFileName = col.getIdentity() + "/" + col.getName() + DIMENSION_COL_INFO_FILE_POSTFIX;
+
+            mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new Text(minValue.getBytes()),
+                    dimRangeFileName);
+            mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new Text(maxValue.getBytes()),
+                    dimRangeFileName);
+            logger.info("write dimension range info for col : " + col.getName() + "  minValue:" + minValue + " maxValue:"
+                    + maxValue);
         }
     }
 
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java
index 51594c3..b60e4ce 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMapping.java
@@ -18,73 +18,74 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.common.MapReduceUtil;
 import org.apache.kylin.metadata.model.TblColRef;
 
+import com.google.common.collect.Lists;
+
 /**
  * Reducers play different roles based on reducer-id:
- * - (start from 0) one reducer for each dictionary column, UHC may have more than one reducer
- * - one reducer to get min/max of date partition column
+ * - (start from 0) one reducer for each dimension column, dictionary column, UHC may have more than one reducer
  * - (at the end) one or more reducers to collect row counts for cuboids using HLL
  */
 public class FactDistinctColumnsReducerMapping {
 
-    public static final int MARK_FOR_PARTITION_COL = -2;
     public static final int MARK_FOR_HLL_COUNTER = -1;
 
-    final private int nDictValueCollectors;
-    final private int datePartitionReducerId;
     final private int nCuboidRowCounters;
+    final private int nDimReducers;
     final private int nTotalReducers;
 
-    final private List<TblColRef> allDictCols;
-    final private int[] dictColIdToReducerBeginId;
+    final private List<TblColRef> allDimDictCols = Lists.newArrayList();
+    final private int[] colIdToReducerBeginId;
     final private int[] reducerRolePlay; // >=0 for dict col id, <0 for partition col and hll counter (using markers)
 
     public FactDistinctColumnsReducerMapping(CubeInstance cube) {
         this(cube, 0);
     }
 
-    public FactDistinctColumnsReducerMapping(CubeInstance cube, int cuboidRowCounterReducerNum) {
+    private FactDistinctColumnsReducerMapping(CubeInstance cube, int cuboidRowCounterReducerNum) {
         CubeDesc desc = cube.getDescriptor();
+        Set<TblColRef> allCols = cube.getAllColumns();
+        Set<TblColRef> dictCols = desc.getAllColumnsNeedDictionaryBuilt();
+        List<TblColRef> dimCols = desc.listDimensionColumnsExcludingDerived(true);
+        for (TblColRef colRef : allCols) {
+            if (dictCols.contains(colRef)) {
+                allDimDictCols.add(colRef);
+            } else if (dimCols.indexOf(colRef) >= 0){
+                allDimDictCols.add(colRef);
+            }
+        }
 
-        allDictCols = new ArrayList(desc.getAllColumnsNeedDictionaryBuilt());
-
-        dictColIdToReducerBeginId = new int[allDictCols.size() + 1];
+        colIdToReducerBeginId = new int[allDimDictCols.size() + 1];
 
         int uhcReducerCount = cube.getConfig().getUHCReducerCount();
         List<TblColRef> uhcList = desc.getAllUHCColumns();
         int counter = 0;
-        for (int i = 0; i < allDictCols.size(); i++) {
-            dictColIdToReducerBeginId[i] = counter;
-            boolean isUHC = uhcList.contains(allDictCols.get(i));
+        for (int i = 0; i < allDimDictCols.size(); i++) {
+            colIdToReducerBeginId[i] = counter;
+            boolean isUHC = uhcList.contains(allDimDictCols.get(i));
             counter += (isUHC) ? uhcReducerCount : 1;
         }
-
-        dictColIdToReducerBeginId[allDictCols.size()] = counter;
-        nDictValueCollectors = counter;
-        datePartitionReducerId = counter;
+        colIdToReducerBeginId[allDimDictCols.size()] = counter;
+        nDimReducers = counter;
 
         nCuboidRowCounters = cuboidRowCounterReducerNum == 0 ? //
                 MapReduceUtil.getCuboidHLLCounterReducerNum(cube) : cuboidRowCounterReducerNum;
-        nTotalReducers = nDictValueCollectors + 1 + nCuboidRowCounters;
+        nTotalReducers = nDimReducers + nCuboidRowCounters;
 
         reducerRolePlay = new int[nTotalReducers];
         for (int i = 0, dictId = 0; i < nTotalReducers; i++) {
-            if (i > datePartitionReducerId) {
+            if (i >= nDimReducers) {
                 // cuboid HLL counter reducer
                 reducerRolePlay[i] = MARK_FOR_HLL_COUNTER;
-            } else if (i == datePartitionReducerId) {
-                // date partition min/max reducer
-                reducerRolePlay[i] = MARK_FOR_PARTITION_COL;
             } else {
-                // dict value collector reducer
-                if (i == dictColIdToReducerBeginId[dictId + 1])
+                if (i == colIdToReducerBeginId[dictId + 1])
                     dictId++;
 
                 reducerRolePlay[i] = dictId;
@@ -92,8 +93,8 @@ public class FactDistinctColumnsReducerMapping {
         }
     }
     
-    public List<TblColRef> getAllDictCols() {
-        return allDictCols;
+    public List<TblColRef> getAllDimDictCols() {
+        return allDimDictCols;
     }
     
     public int getTotalReducerNum() {
@@ -104,9 +105,9 @@ public class FactDistinctColumnsReducerMapping {
         return nCuboidRowCounters;
     }
 
-    public int getReducerIdForDictCol(int dictColId, Object fieldValue) {
-        int begin = dictColIdToReducerBeginId[dictColId];
-        int span = dictColIdToReducerBeginId[dictColId + 1] - begin;
+    public int getReducerIdForCol(int colId, Object fieldValue) {
+        int begin = colIdToReducerBeginId[colId];
+        int span = colIdToReducerBeginId[colId + 1] - begin;
         
         if (span == 1)
             return begin;
@@ -120,37 +121,29 @@ public class FactDistinctColumnsReducerMapping {
     }
 
     public int getRolePlayOfReducer(int reducerId) {
-        return reducerRolePlay[reducerId];
+        return reducerRolePlay[reducerId % nTotalReducers];
     }
     
     public boolean isCuboidRowCounterReducer(int reducerId) {
         return getRolePlayOfReducer(reducerId) == MARK_FOR_HLL_COUNTER;
     }
-    
-    public boolean isPartitionColReducer(int reducerId) {
-        return getRolePlayOfReducer(reducerId) == MARK_FOR_PARTITION_COL;
-    }
 
-    public TblColRef getDictColForReducer(int reducerId) {
-        int role = getRolePlayOfReducer(reducerId);
+    public TblColRef getColForReducer(int reducerId) {
+        int role = getRolePlayOfReducer(reducerId % nTotalReducers);
         if (role < 0)
             throw new IllegalStateException();
         
-        return allDictCols.get(role);
-    }
-
-    public int getReducerNumForDictCol(TblColRef col) {
-        int dictColId = allDictCols.indexOf(col);
-        return dictColIdToReducerBeginId[dictColId + 1] - dictColIdToReducerBeginId[dictColId];
+        return allDimDictCols.get(role);
     }
 
-    public int getReducerIdForDatePartitionColumn() {
-        return datePartitionReducerId;
+    public int getReducerNumForDimCol(TblColRef col) {
+        int dictColId = allDimDictCols.indexOf(col);
+        return colIdToReducerBeginId[dictColId + 1] - colIdToReducerBeginId[dictColId];
     }
 
     public int getReducerIdForCuboidRowCount(long cuboidId) {
         int rowCounterId = (int) (Math.abs(cuboidId) % nCuboidRowCounters);
-        return datePartitionReducerId + 1 + rowCounterId;
+        return nDimReducers + rowCounterId;
     }
     
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index f749c80..fdb19db 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -23,15 +23,18 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.DimensionRangeInfo;
 import org.apache.kylin.cube.model.SnapshotTableDesc;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.LookupMaterializeContext;
@@ -40,11 +43,14 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.datatype.DataTypeOrder;
 import org.apache.kylin.metadata.model.SegmentRange.TSRange;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Sets;
+
 /**
  */
 public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
@@ -76,9 +82,7 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
 
         try {
             saveExtSnapshotIfNeeded(cubeManager, cube, segment);
-            if (segment.isOffsetCube()) {
-                updateTimeRange(segment);
-            }
+            updateSegment(segment);
 
             cubeManager.promoteNewlyBuiltSegments(cube, segment);
             return new ExecuteResult();
@@ -115,40 +119,51 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
         }
     }
 
-    private void updateTimeRange(CubeSegment segment) throws IOException {
+    private void updateSegment(CubeSegment segment) throws IOException {
         final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
 
-        if (partitionCol == null) {
-            return;
-        }
-        final String factColumnsInputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
-        Path colDir = new Path(factColumnsInputPath, partitionCol.getIdentity());
-        FileSystem fs = HadoopUtil.getWorkingFileSystem();
-        Path outputFile = HadoopUtil.getFilterOnlyPath(fs, colDir,
-                partitionCol.getName() + FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX);
-        if (outputFile == null) {
-            throw new IOException("fail to find the partition file in base dir: " + colDir);
-        }
-
-        FSDataInputStream is = null;
-        BufferedReader bufferedReader = null;
-        InputStreamReader isr = null;
-        long minValue, maxValue;
-        try {
-            is = fs.open(outputFile);
-            isr = new InputStreamReader(is);
-            bufferedReader = new BufferedReader(isr);
-            minValue = Long.parseLong(bufferedReader.readLine());
-            maxValue = Long.parseLong(bufferedReader.readLine());
-        } finally {
-            IOUtils.closeQuietly(is);
-            IOUtils.closeQuietly(isr);
-            IOUtils.closeQuietly(bufferedReader);
-        }
+        for (TblColRef dimColRef : segment.getCubeDesc().listDimensionColumnsExcludingDerived(true)) {
+            final String factColumnsInputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
+            Path colDir = new Path(factColumnsInputPath, dimColRef.getIdentity());
+            FileSystem fs = HadoopUtil.getWorkingFileSystem();
+
+            //handle multiple reducers
+            Path[] outputFiles = HadoopUtil.getFilteredPath(fs, colDir,
+                    dimColRef.getName() + FactDistinctColumnsReducer.DIMENSION_COL_INFO_FILE_POSTFIX);
+            if (outputFiles == null || outputFiles.length == 0) {
+                segment.getDimensionRangeInfoMap().put(dimColRef.getIdentity(), new DimensionRangeInfo(null, null));
+                continue;
+            }
 
-        logger.info("updateTimeRange step. minValue:" + minValue + " maxValue:" + maxValue);
-        if (minValue != timeMinValue && maxValue != timeMaxValue) {
-            segment.setTSRange(new TSRange(minValue, maxValue + 1));
+            FSDataInputStream is = null;
+            BufferedReader bufferedReader = null;
+            InputStreamReader isr = null;
+            Set<String> minValues = Sets.newHashSet(), maxValues = Sets.newHashSet();
+            for (Path outputFile : outputFiles) {
+                try {
+                    is = fs.open(outputFile);
+                    isr = new InputStreamReader(is);
+                    bufferedReader = new BufferedReader(isr);
+                    minValues.add(bufferedReader.readLine());
+                    maxValues.add(bufferedReader.readLine());
+                } finally {
+                    IOUtils.closeQuietly(is);
+                    IOUtils.closeQuietly(isr);
+                    IOUtils.closeQuietly(bufferedReader);
+                }
+            }
+            DataTypeOrder order = dimColRef.getType().getOrder();
+            String minValue = order.min(minValues);
+            String maxValue = order.max(maxValues);
+            logger.info("updateSegment step. {} minValue:" + minValue + " maxValue:" + maxValue, dimColRef.getName());
+
+            if (segment.isOffsetCube() && partitionCol != null && partitionCol.getIdentity().equals(dimColRef.getIdentity())) {
+                logger.info("update partition. {} timeMinValue:" + minValue + " timeMaxValue:" + maxValue, dimColRef.getName());
+                if (DateFormat.stringToMillis(minValue) != timeMinValue && DateFormat.stringToMillis(maxValue) != timeMaxValue) {
+                    segment.setTSRange(new TSRange(DateFormat.stringToMillis(minValue), DateFormat.stringToMillis(maxValue) + 1));
+                }
+            }
+            segment.getDimensionRangeInfoMap().put(dimColRef.getIdentity(), new DimensionRangeInfo(minValue, maxValue));
         }
     }
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
index e7aec8c..4cf6fc6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
@@ -20,10 +20,12 @@ package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.DimensionRangeInfo;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.exception.SegmentNotFoundException;
 import org.apache.kylin.job.exception.ExecuteException;
@@ -61,9 +63,9 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
         if (mergingSegmentIds.isEmpty()) {
             return ExecuteResult.createFailed(new SegmentNotFoundException("there are no merging segments"));
         }
+        
         long sourceCount = 0L;
         long sourceSize = 0L;
-
         boolean isOffsetCube = mergedSegment.isOffsetCube();
         Long tsStartMin = Long.MAX_VALUE, tsEndMax = 0L;
         for (String id : mergingSegmentIds) {
@@ -74,14 +76,26 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
             tsEndMax = Math.max(tsEndMax, segment.getTSRange().end.v);
         }
 
+        Map<String, DimensionRangeInfo> mergedSegDimRangeMap = null;
+        for (String id : mergingSegmentIds) {
+            CubeSegment segment = cube.getSegmentById(id);
+            Map<String, DimensionRangeInfo> segDimRangeMap = segment.getDimensionRangeInfoMap();
+            if (mergedSegDimRangeMap == null) {
+                mergedSegDimRangeMap = segDimRangeMap;
+            } else {
+                mergedSegDimRangeMap = DimensionRangeInfo.mergeRangeMap(cube.getModel(), segDimRangeMap,
+                        mergedSegDimRangeMap);
+            }
+        }
+
         // update segment info
         mergedSegment.setSizeKB(cubeSizeBytes / 1024);
         mergedSegment.setInputRecords(sourceCount);
         mergedSegment.setInputRecordsSize(sourceSize);
         mergedSegment.setLastBuildJobID(CubingExecutableUtil.getCubingJobId(this.getParams()));
         mergedSegment.setLastBuildTime(System.currentTimeMillis());
-
-        if (isOffsetCube == true) {
+        mergedSegment.setDimensionRangeInfoMap(mergedSegDimRangeMap);
+        if (isOffsetCube) {
             SegmentRange.TSRange tsRange = new SegmentRange.TSRange(tsStartMin, tsEndMax);
             mergedSegment.setTSRange(tsRange);
         }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
index d013386..03aa616 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
@@ -59,6 +59,7 @@ public class UpdateCubeInfoAfterOptimizeStep extends AbstractExecutable {
         segment.setSizeKB(cubeSizeBytes / 1024);
         segment.setInputRecords(sourceCount);
         segment.setInputRecordsSize(sourceSizeBytes);
+        segment.setDimensionRangeInfoMap(originalSegment.getDimensionRangeInfoMap());
 
         try {
             cubeManager.promoteNewlyOptimizeSegments(cube, segment);
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java
index a6bc019..3c58b26 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java
@@ -60,24 +60,22 @@ public class FactDistinctColumnsReducerMappingTest extends LocalFileMetadataTest
         int totalReducerNum = mapping.getTotalReducerNum();
         Assert.assertEquals(2, mapping.getCuboidRowCounterReducerNum());
         
-        // check partition column reducer & cuboid row count reducers
+        // check cuboid row count reducers
         Assert.assertEquals(FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER,
                 mapping.getRolePlayOfReducer(totalReducerNum - 1));
         Assert.assertEquals(FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER,
                 mapping.getRolePlayOfReducer(totalReducerNum - 2));
-        Assert.assertEquals(FactDistinctColumnsReducerMapping.MARK_FOR_PARTITION_COL,
-                mapping.getRolePlayOfReducer(totalReducerNum - 3));
         
         // check all dict column reducers
-        int dictEnd = totalReducerNum - 3;
+        int dictEnd = totalReducerNum - 2;
         for (int i = 0; i < dictEnd; i++)
             Assert.assertTrue(mapping.getRolePlayOfReducer(i) >= 0);
         
         // check a UHC dict column
-        Assert.assertEquals(2, mapping.getReducerNumForDictCol(aUHC));
+        Assert.assertEquals(2, mapping.getReducerNumForDimCol(aUHC));
         int uhcReducerBegin = -1;
         for (int i = 0; i < dictEnd; i++) {
-            if (mapping.getDictColForReducer(i).equals(aUHC)) {
+            if (mapping.getColForReducer(i).equals(aUHC)) {
                 uhcReducerBegin = i;
                 break;
             }
@@ -86,7 +84,7 @@ public class FactDistinctColumnsReducerMappingTest extends LocalFileMetadataTest
         int[] allRolePlay = mapping.getAllRolePlaysForReducers();
         Assert.assertEquals(allRolePlay[uhcReducerBegin], allRolePlay[uhcReducerBegin + 1]);
         for (int i = 0; i < 5; i++) {
-            int reducerId = mapping.getReducerIdForDictCol(uhcReducerBegin, i);
+            int reducerId = mapping.getReducerIdForCol(uhcReducerBegin, i);
             Assert.assertTrue(uhcReducerBegin <= reducerId && reducerId <= uhcReducerBegin + 1);
         }
     }
diff --git a/examples/test_case_data/localmeta/cube/ssb_cube_with_dimention_range.json b/examples/test_case_data/localmeta/cube/ssb_cube_with_dimention_range.json
new file mode 100644
index 0000000..124081c
--- /dev/null
+++ b/examples/test_case_data/localmeta/cube/ssb_cube_with_dimention_range.json
@@ -0,0 +1,110 @@
+{
+  "uuid" : "70a9f288-3c01-4745-a04b-5641e82d6c72",
+  "name" : "ssb_cube_with_dimention_range",
+  "owner" : "ADMIN",
+  "cost" : 50,
+  "status" : "DISABLED",
+  "segments" : [{
+    "uuid": "f34e2a88-50e4-4c8c-8dd1-64e922ce8c37",
+    "name": "19920523163722_20180523173026",
+    "storage_location_identifier": "KYLIN_KFV177RJOS",
+    "date_range_start": 706639042000,
+    "date_range_end": 1527096626000,
+    "source_offset_start": 0,
+    "source_offset_end": 0,
+    "status": "READY",
+    "size_kb": 100,
+    "input_records": 1000,
+    "input_records_size": 102400,
+    "last_build_time": 1527068515334,
+    "last_build_job_id": "a5010166-b64a-4289-ac56-064640f7f2fa",
+    "create_time_utc": 1527067828660,
+    "total_shards": 0,
+    "blackout_cuboids": [],
+    "binary_signature": null,
+    "dimension_range_info_map": {
+      "PART.P_BRAND": {
+        "min": "MFGR#1101",
+        "max": "MFGR#5540"
+      },
+      "V_LINEORDER.LO_QUANTITY": {
+        "min": "1",
+        "max": "50"
+      },
+      "PART.P_MFGR": {
+        "min": "MFGR#1",
+        "max": "MFGR#5"
+      },
+      "DATES.D_YEARMONTHNUM": {
+        "min": "199205",
+        "max": "199808"
+      },
+      "CUSTOMER.C_NATION": {
+        "min": "ALGERIA",
+        "max": "VIETNAM"
+      },
+      "DATES.D_YEARMONTH": {
+        "min": "Apr1993",
+        "max": "Sep1997"
+      },
+      "CUSTOMER.C_CUSTKEY": {
+        "min": "1",
+        "max": "2999"
+      },
+      "DATES.D_DATEKEY": {
+        "min": "19920523",
+        "max": "19980802"
+      },
+      "SUPPLIER.S_SUPPKEY": {
+        "min": "1",
+        "max": "200"
+      },
+      "SUPPLIER.S_REGION": {
+        "min": "AFRICA",
+        "max": "MIDDLE EAST"
+      },
+      "PART.P_PARTKEY": {
+        "min": "1",
+        "max": "20000"
+      },
+      "PART.P_CATEGORY": {
+        "min": "MFGR#11",
+        "max": "MFGR#55"
+      },
+      "DATES.D_WEEKNUMINYEAR": {
+        "min": "1",
+        "max": "53"
+      },
+      "CUSTOMER.C_CITY": {
+        "min": "ALGERIA  0",
+        "max": "VIETNAM  9"
+      },
+      "SUPPLIER.S_NATION": {
+        "min": "ALGERIA",
+        "max": "VIETNAM"
+      },
+      "SUPPLIER.S_CITY": {
+        "min": "ALGERIA  1",
+        "max": "VIETNAM  9"
+      },
+      "CUSTOMER.C_REGION": {
+        "min": "AFRICA",
+        "max": "MIDDLE EAST"
+      },
+      "DATES.D_YEAR": {
+        "min": "1992",
+        "max": "1998"
+      },
+      "V_LINEORDER.LO_DISCOUNT": {
+        "min": "0",
+        "max": "10"
+      }
+    }
+  } ],
+  "last_modified" : 1457534216410,
+  "descriptor" : "ssb_cube_with_dimention_range",
+  "create_time_utc" : 1457444500888,
+  "size_kb" : 100,
+  "input_records_count" : 1000,
+  "input_records_size" : 102400
+}
\ No newline at end of file
diff --git a/examples/test_case_data/localmeta/cube_desc/ssb_cube_with_dimention_range.json b/examples/test_case_data/localmeta/cube_desc/ssb_cube_with_dimention_range.json
new file mode 100644
index 0000000..d91b420
--- /dev/null
+++ b/examples/test_case_data/localmeta/cube_desc/ssb_cube_with_dimention_range.json
@@ -0,0 +1,269 @@
+{
+  "uuid" : "5c44df30-daec-486e-af90-927bf7851060",
+  "name" : "ssb_cube_with_dimention_range",
+  "description" : "",
+  "dimensions" : [ {
+    "name" : "PART.P_MFGR",
+    "table" : "PART",
+    "column" : "P_MFGR",
+    "derived" : null
+  }, {
+    "name" : "DATES.D_YEARMONTH",
+    "table" : "DATES",
+    "column" : "D_YEARMONTH",
+    "derived" : null
+  }, {
+    "name" : "SUPPLIER.S_NATION",
+    "table" : "SUPPLIER",
+    "column" : "S_NATION",
+    "derived" : null
+  }, {
+    "name" : "V_LINEORDER.LO_DISCOUNT",
+    "table" : "V_LINEORDER",
+    "column" : "LO_DISCOUNT",
+    "derived" : null
+  }, {
+    "name" : "CUSTOMER.C_CUSTKEY",
+    "table" : "CUSTOMER",
+    "column" : "C_CUSTKEY",
+    "derived" : null
+  }, {
+    "name" : "CUSTOMER.C_NATION",
+    "table" : "CUSTOMER",
+    "column" : "C_NATION",
+    "derived" : null
+  }, {
+    "name" : "SUPPLIER.S_REGION",
+    "table" : "SUPPLIER",
+    "column" : "S_REGION",
+    "derived" : null
+  }, {
+    "name" : "CUSTOMER.C_CITY",
+    "table" : "CUSTOMER",
+    "column" : "C_CITY",
+    "derived" : null
+  }, {
+    "name" : "SUPPLIER.S_SUPPKEY",
+    "table" : "SUPPLIER",
+    "column" : "S_SUPPKEY",
+    "derived" : null
+  }, {
+    "name" : "V_LINEORDER.LO_QUANTITY",
+    "table" : "V_LINEORDER",
+    "column" : "LO_QUANTITY",
+    "derived" : null
+  }, {
+    "name" : "PART.P_BRAND",
+    "table" : "PART",
+    "column" : "P_BRAND",
+    "derived" : null
+  }, {
+    "name" : "SUPPLIER.S_CITY",
+    "table" : "SUPPLIER",
+    "column" : "S_CITY",
+    "derived" : null
+  }, {
+    "name" : "PART.P_PARTKEY",
+    "table" : "PART",
+    "column" : "P_PARTKEY",
+    "derived" : null
+  }, {
+    "name" : "DATES.D_YEARMONTHNUM",
+    "table" : "DATES",
+    "column" : "D_YEARMONTHNUM",
+    "derived" : null
+  }, {
+    "name" : "DATES.D_WEEKNUMINYEAR",
+    "table" : "DATES",
+    "column" : "D_WEEKNUMINYEAR",
+    "derived" : null
+  }, {
+    "name" : "CUSTOMER.C_REGION",
+    "table" : "CUSTOMER",
+    "column" : "C_REGION",
+    "derived" : null
+  }, {
+    "name" : "PART.P_CATEGORY",
+    "table" : "PART",
+    "column" : "P_CATEGORY",
+    "derived" : null
+  }, {
+    "name" : "DATES.D_YEAR",
+    "table" : "DATES",
+    "column" : "D_YEAR",
+    "derived" : null
+  }, {
+    "name" : "DATES.D_DATEKEY",
+    "table" : "DATES",
+    "column" : "D_DATEKEY",
+    "derived" : null
+  } ],
+  "measures" : [ {
+    "name" : "_COUNT_",
+    "function" : {
+      "expression" : "COUNT",
+      "parameter" : {
+        "type" : "constant",
+        "value" : "1",
+        "next_parameter" : null
+      },
+      "returntype" : "bigint"
+    },
+    "dependent_measure_ref" : null
+  }, {
+    "name" : "TOTAL_REVENUE",
+    "function" : {
+      "expression" : "SUM",
+      "parameter" : {
+        "type" : "column",
+        "value" : "LO_REVENUE",
+        "next_parameter" : null
+      },
+      "returntype" : "bigint"
+    },
+    "dependent_measure_ref" : null
+  }, {
+    "name" : "TOTAL_SUPPLYCOST",
+    "function" : {
+      "expression" : "SUM",
+      "parameter" : {
+        "type" : "column",
+        "value" : "LO_SUPPLYCOST",
+        "next_parameter" : null
+      },
+      "returntype" : "bigint"
+    },
+    "dependent_measure_ref" : null
+  } ],
+  "rowkey" : {
+    "rowkey_columns" : [ {
+      "column" : "PART.P_MFGR",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "DATES.D_YEARMONTHNUM",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "CUSTOMER.C_REGION",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "V_LINEORDER.LO_QUANTITY",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "DATES.D_YEARMONTH",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "DATES.D_WEEKNUMINYEAR",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "SUPPLIER.S_REGION",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "V_LINEORDER.LO_DISCOUNT",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "CUSTOMER.C_CITY",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "PART.P_CATEGORY",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "DATES.D_YEAR",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "PART.P_BRAND",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "SUPPLIER.S_CITY",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "SUPPLIER.S_NATION",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "CUSTOMER.C_NATION",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "DATES.D_DATEKEY",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "SUPPLIER.S_SUPPKEY",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    }, {
+      "column" : "CUSTOMER.C_CUSTKEY",
+      "encoding" : "dict",
+      "isShardBy" : true,
+      "index" : "eq"
+    }, {
+      "column" : "PART.P_PARTKEY",
+      "encoding" : "dict",
+      "isShardBy" : false,
+      "index" : "eq"
+    } ]
+  },
+  "signature" : "",
+  "last_modified" : 1457503036686,
+  "model_name" : "ssb",
+  "null_string" : null,
+  "hbase_mapping" : {
+    "column_family" : [ {
+      "name" : "F1",
+      "columns" : [ {
+        "qualifier" : "M",
+        "measure_refs" : [ "_COUNT_", "TOTAL_REVENUE", "TOTAL_SUPPLYCOST"]
+      } ]
+    } ]
+  },
+  "aggregation_groups" : [ {
+    "includes" : [ "SUPPLIER.S_REGION", "CUSTOMER.C_NATION", "SUPPLIER.S_NATION", "CUSTOMER.C_REGION", "DATES.D_YEAR", "DATES.D_WEEKNUMINYEAR", "DATES.D_YEARMONTH", "DATES.D_YEARMONTHNUM", "SUPPLIER.S_CITY" ],
+    "select_rule" : {
+      "hierarchy_dims" : [ [ "S_REGION", "S_NATION", "S_CITY" ] ],
+      "mandatory_dims" : [ "DATES.D_YEAR" ],
+      "joint_dims" : [ ]
+    }
+  } ],
+  "notify_list" : [ ],
+  "status_need_notify" : [ ],
+  "partition_date_start" : 3153000000000,
+  "partition_date_end" : 3153600000000,
+  "auto_merge_time_ranges" : [ 604800000, 2419200000 ],
+  "retention_range" : 0,
+  "engine_type" : 2,
+  "storage_type" : 2,
+  "override_kylin_properties" : {
+    "kylin.storage.hbase.compression-codec" : "lz4",
+    "kylin.cube.aggrgroup.is-mandatory-only-valid" : "true"
+  }
+}
\ No newline at end of file
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index afd9788..16ceede 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -18,15 +18,31 @@
 
 package org.apache.kylin.provision;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.Pair;
@@ -34,6 +50,7 @@ import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.DimensionRangeInfo;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.EngineFactory;
@@ -48,6 +65,7 @@ import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
 import org.apache.kylin.metadata.model.SegmentRange.TSRange;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.rest.job.StorageCleanupJob;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator;
@@ -55,21 +73,9 @@ import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 public class BuildCubeWithEngine {
 
@@ -304,21 +310,30 @@ public class BuildCubeWithEngine {
         
         if (!buildSegment(cubeName, date1, date2))
             return false;
+        checkNormalSegRangeInfo(cubeManager.getCube(cubeName));
         if (!buildSegment(cubeName, date2, date3))
             return false;
+        checkNormalSegRangeInfo(cubeManager.getCube(cubeName));
         if (!optimizeCube(cubeName))
             return false;
+        checkNormalSegRangeInfo(cubeManager.getCube(cubeName));
         if (!buildSegment(cubeName, date3, date4))
             return false;
+        checkNormalSegRangeInfo(cubeManager.getCube(cubeName));
         if (!buildSegment(cubeName, date4, date5)) // one empty segment
             return false;
+        checkEmptySegRangeInfo(cubeManager.getCube(cubeName));
         if (!buildSegment(cubeName, date5, date6)) // another empty segment
             return false;
+        checkEmptySegRangeInfo(cubeManager.getCube(cubeName));
+
 
         if (!mergeSegment(cubeName, date2, date4)) // merge 2 normal segments
             return false;
+        checkNormalSegRangeInfo(cubeManager.getCube(cubeName));
         if (!mergeSegment(cubeName, date2, date5)) // merge normal and empty
             return false;
+        checkNormalSegRangeInfo(cubeManager.getCube(cubeName));
         
         // now have 2 normal segments [date1, date2) [date2, date5) and 1 empty segment [date5, date6)
         return true;
@@ -460,4 +475,40 @@ public class BuildCubeWithEngine {
         }
     }
 
+    private void checkEmptySegRangeInfo(CubeInstance cube) {
+        CubeSegment segment = getLastModifiedSegment(cube);
+        for (String colId : segment.getDimensionRangeInfoMap().keySet()) {
+            DimensionRangeInfo range = segment.getDimensionRangeInfoMap().get(colId);
+            if (!(range.getMax() == null && range.getMin() == null)) {
+                throw new RuntimeException("Empty segment must have null info.");
+            }
+        }
+    }
+
+    private void checkNormalSegRangeInfo(CubeInstance cube) {
+        CubeSegment segment = getLastModifiedSegment(cube);
+        if (segment.getModel().getPartitionDesc().isPartitioned()) {
+            TblColRef colRef = segment.getModel().getPartitionDesc().getPartitionDateColumnRef();
+            DimensionRangeInfo dmRangeInfo = segment.getDimensionRangeInfoMap().get(colRef.getIdentity());
+            long min_v = DateFormat.stringToMillis(dmRangeInfo.getMin());
+            long max_v = DateFormat.stringToMillis(dmRangeInfo.getMax());
+            long ts_range_start = segment.getTSRange().start.v;
+            long ts_range_end = segment.getTSRange().end.v;
+            if (!(ts_range_start <= min_v && max_v <= ts_range_end -1)) {
+                throw new RuntimeException(String.format(
+                        "Build cube failed, wrong partition column min/max value."
+                                + " Segment: %s, min value: %s, TsRange.start: %s, max value: %s, TsRange.end: %s",
+                        segment, min_v, ts_range_start, max_v, ts_range_end));
+            }
+        }
+    }
+    
+    private CubeSegment getLastModifiedSegment(CubeInstance cube) {
+        return Collections.max(cube.getSegments(), new Comparator<CubeSegment>() {
+            @Override
+            public int compare(CubeSegment o1, CubeSegment o2) {
+                return Long.compare(o1.getLastBuildTime(), o2.getLastBuildTime());
+            }
+        });
+    }
 }
diff --git a/query/src/main/java/org/apache/kylin/query/optrule/AggregateMultipleExpandRule.java b/query/src/main/java/org/apache/kylin/query/optrule/AggregateMultipleExpandRule.java
index cdd6004..39c90e8 100644
--- a/query/src/main/java/org/apache/kylin/query/optrule/AggregateMultipleExpandRule.java
+++ b/query/src/main/java/org/apache/kylin/query/optrule/AggregateMultipleExpandRule.java
@@ -18,8 +18,12 @@
 
 package org.apache.kylin.query.optrule;
 
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelOptRuleOperand;
@@ -33,13 +37,11 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.tools.RelBuilder;
 import org.apache.calcite.util.ImmutableBitSet;
 
-import javax.annotation.Nullable;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
 
 /**
- * Supoort grouping query. Expand the non-simple aggregate to more than one simple aggregates.
+ * Support grouping query. Expand the non-simple aggregate to more than one simple aggregates.
  * Add project on expanded simple aggregate to add indicators of origin aggregate.
  * All projects on aggregate added into one union, which replace the origin aggregate.
  * The new aggregates will be transformed by {@link org.apache.kylin.query.optrule.AggregateProjectReduceRule}, to reduce rolled up dimensions.
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
index 39a2669..b34b42e 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
@@ -39,7 +39,9 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexProgram;
 import org.apache.calcite.rex.RexProgramBuilder;
 import org.apache.kylin.metadata.filter.FilterOptimizeTransformer;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.query.relnode.visitor.TupleFilterVisitor;
 
@@ -117,7 +119,29 @@ public class OLAPFilterRel extends Filter implements OLAPRel {
             }
         }
 
-        context.filter = TupleFilter.and(context.filter, filter);
+        context.filter = and(context.filter, filter);
+    }
+    
+    private TupleFilter and(TupleFilter f1, TupleFilter f2) {
+        if (f1 == null)
+            return f2;
+        if (f2 == null)
+            return f1;
+
+        if (f1.getOperator() == FilterOperatorEnum.AND) {
+            f1.addChild(f2);
+            return f1;
+        }
+
+        if (f2.getOperator() == FilterOperatorEnum.AND) {
+            f2.addChild(f1);
+            return f2;
+        }
+
+        LogicalTupleFilter and = new LogicalTupleFilter(FilterOperatorEnum.AND);
+        and.addChild(f1);
+        and.addChild(f2);
+        return and;
     }
 
     @Override
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 8aec8b9..bfea632 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -45,7 +45,6 @@ import com.google.common.collect.Lists;
 
 public class HiveMRInput extends HiveInputBase implements IMRInput {
 
-    @SuppressWarnings("unused")
     private static final Logger logger = LoggerFactory.getLogger(HiveMRInput.class);
 
     @Override
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
index 75f322f..4ebdf3d 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
@@ -126,7 +126,7 @@ public class HiveTableReader implements TableReader {
         String[] arr = new String[record.size()];
         for (int i = 0; i < arr.length; i++) {
             Object o = record.get(i);
-            arr[i] = (o == null) ? null : o.toString();
+            arr[i] = (o == null || "\\N".equals(o)) ? null : o.toString();
         }
         return arr;
     }


[kylin] 06/06: KYLIN-3403 Use IntegerCodeSystem for date type filter

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 31073af0285bb43cdee5ec844ef455d98e193c86
Author: Yifan Zhang <ev...@gmail.com>
AuthorDate: Tue Jun 12 14:57:29 2018 +0800

    KYLIN-3403 Use IntegerCodeSystem for date type filter
    
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 .../metadata/filter/FilterCodeSystemFactory.java   |  2 ++
 kylin-it/src/test/resources/query/sql/query112.sql | 29 ++++++++++++++++++++++
 2 files changed, 31 insertions(+)

diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/FilterCodeSystemFactory.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/FilterCodeSystemFactory.java
index bae8cf9..cf98956 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/FilterCodeSystemFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/FilterCodeSystemFactory.java
@@ -41,6 +41,8 @@ public class FilterCodeSystemFactory {
             return codeSystemMap.get("integer");
         } else if (dataType.isNumberFamily()) {
             return codeSystemMap.get("decimal");
+        } else if (dataType.isDateTimeFamily()) {
+            return codeSystemMap.get("integer");
         } else {
             return codeSystemMap.get("string");
         }
diff --git a/kylin-it/src/test/resources/query/sql/query112.sql b/kylin-it/src/test/resources/query/sql/query112.sql
new file mode 100644
index 0000000..efd0d88
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql/query112.sql
@@ -0,0 +1,29 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+SELECT COUNT(1) AS TRANS_CNT
+FROM test_kylin_fact 
+ inner JOIN edw.test_cal_dt as test_cal_dt
+ ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
+ inner JOIN test_category_groupings
+ ON test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id AND test_kylin_fact.lstg_site_id = test_category_groupings.site_id
+ inner JOIN edw.test_sites as test_sites
+ ON test_kylin_fact.lstg_site_id = test_sites.site_id
+WHERE test_cal_dt.WEEK_BEG_DT >= '2001-09-09'
+ AND test_cal_dt.WEEK_BEG_DT <= '2018-05-16'
+ 
\ No newline at end of file


[kylin] 05/06: KYLIN-3423 Performance improvement in FactDistinctColumnsMapper

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit d1ed1072728ca0bd12b83a04e2308784b2e045a6
Author: ZhansShaoxiong <sh...@gmail.com>
AuthorDate: Tue Jun 26 02:07:42 2018 +0800

    KYLIN-3423 Performance improvement in FactDistinctColumnsMapper
    
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 .../org/apache/kylin/cube/DimensionRangeInfo.java  |   8 ++
 .../engine/mr/steps/FactDistinctColumnsMapper.java | 127 +++++++++++++++++----
 .../mr/steps/FactDistinctColumnsMapperBase.java    |  11 +-
 .../FactDistinctColumnsReducerMappingTest.java     |   4 +-
 .../kylin/engine/mr/steps/DictColDeduperTest.java  |  65 +++++++++++
 5 files changed, 185 insertions(+), 30 deletions(-)

diff --git a/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java b/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
index e36ca96..0b0d1c4 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
@@ -92,9 +92,17 @@ public class DimensionRangeInfo {
         return min;
     }
 
+    public void setMin(String min) {
+        this.min = min;
+    }
+
     public String getMax() {
         return max;
     }
+
+    public void setMax(String max) {
+        this.max = max;
+    }
     
     @Override
     public String toString() {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
index fc9dc65..7bffce7 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -29,7 +31,9 @@ import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinVersion;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.MemoryBudgetController;
 import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.DimensionRangeInfo;
 import org.apache.kylin.cube.cuboid.CuboidUtil;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil;
@@ -37,9 +41,11 @@ import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.hllc.HLLCounter;
 import org.apache.kylin.measure.hllc.RegisterType;
 import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
@@ -62,6 +68,9 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
     private int rowCount = 0;
     private int samplingPercentage;
     private ByteBuffer tmpbuf;
+    
+    private DictColDeduper dictColDeduper;
+    private Map<Integer, DimensionRangeInfo> dimensionRangeInfoMap = Maps.newHashMap();
 
     private CuboidStatCalculator[] cuboidStatCalculators;
 
@@ -132,6 +141,14 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
             cuboidStatCalculators[i] = calculator;
             calculator.start();
         }
+        
+        // setup dict col deduper
+        dictColDeduper = new DictColDeduper();
+        Set<TblColRef> dictCols = cubeDesc.getAllColumnsNeedDictionaryBuilt();
+        for (int i = 0; i < allCols.size(); i++) {
+            if (dictCols.contains(allCols.get(i)))
+                dictColDeduper.setIsDictCol(i);
+        }
     }
 
     private int getStatsThreadNum(int cuboidNum) {
@@ -156,41 +173,42 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
 
         for (String[] row : rowCollection) {
             context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row));
-            for (int i = 0; i < allDimDictCols.size(); i++) {
+            for (int i = 0; i < allCols.size(); i++) {
                 String fieldValue = row[columnIndex[i]];
                 if (fieldValue == null)
                     continue;
 
-                int reducerIndex = reducerMapping.getReducerIdForCol(i, fieldValue);
-                
-                tmpbuf.clear();
-                byte[] valueBytes = Bytes.toBytes(fieldValue);
-                int size = valueBytes.length + 1;
-                if (size >= tmpbuf.capacity()) {
-                    tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size));
-                }
-                tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
-                tmpbuf.put(valueBytes);
-                outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
-                DataType type = allDimDictCols.get(i).getType();
-                sortableKey.init(outputKey, type);
-                context.write(sortableKey, EMPTY_TEXT);
-
-                // log a few rows for troubleshooting
-                if (rowCount < 10) {
-                    logger.info(
-                            "Sample output: " + allDimDictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex);
+                final DataType type = allCols.get(i).getType();
+
+                //for dic column, de dup before write value; for dim not dic column, hold util doCleanup()
+                if (dictColDeduper.isDictCol(i)) {
+                    if (dictColDeduper.add(i, fieldValue)) {
+                        writeFieldValue(context, type, i, fieldValue);
+                    }
+                } else {
+                    DimensionRangeInfo old = dimensionRangeInfoMap.get(i);
+                    if (old == null) {
+                        old = new DimensionRangeInfo(fieldValue, fieldValue);
+                        dimensionRangeInfoMap.put(i, old);
+                    } else {
+                        old.setMax(type.getOrder().max(old.getMax(), fieldValue));
+                        old.setMin(type.getOrder().min(old.getMin(), fieldValue));
+                    }
                 }
             }
 
             if (rowCount % 100 < samplingPercentage) {
                 putRowKeyToHLL(row);
             }
+            
+            if (rowCount % 100 == 0) {
+                dictColDeduper.resetIfShortOfMem();
+            }
 
             rowCount++;
         }
     }
-
+    
     private void putRowKeyToHLL(String[] row) {
         for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
             cuboidStatCalculator.putRow(row);
@@ -231,6 +249,12 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
                 context.write(sortableKey, outputValue);
             }
         }
+        for (Integer colIndex : dimensionRangeInfoMap.keySet()) {
+            DimensionRangeInfo rangeInfo = dimensionRangeInfoMap.get(colIndex);
+            DataType dataType = allCols.get(colIndex).getType();
+            writeFieldValue(context, dataType, colIndex, rangeInfo.getMin());
+            writeFieldValue(context, dataType, colIndex, rangeInfo.getMax());
+        }
     }
 
     private int countNewSize(int oldSize, int dataSize) {
@@ -241,6 +265,26 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
         return newSize;
     }
 
+    private void writeFieldValue(Context context, DataType type, Integer colIndex, String value)
+            throws IOException, InterruptedException {
+        int reducerIndex = reducerMapping.getReducerIdForCol(colIndex, value);
+        tmpbuf.clear();
+        byte[] valueBytes = Bytes.toBytes(value);
+        int size = valueBytes.length + 1;
+        if (size >= tmpbuf.capacity()) {
+            tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size));
+        }
+        tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
+        tmpbuf.put(valueBytes);
+        outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+        sortableKey.init(outputKey, type);
+        context.write(sortableKey, EMPTY_TEXT);
+        // log a few rows for troubleshooting
+        if (rowCount < 10) {
+            logger.info("Sample output: " + allCols.get(colIndex) + " '" + value + "' => reducer " + reducerIndex);
+        }
+    }
+
     public static class CuboidStatCalculator implements Runnable {
         private final int id;
         private final int nRowKey;
@@ -371,4 +415,45 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
             }
         }
     }
+    
+    public static class DictColDeduper {
+
+        final boolean enabled;
+        final int resetThresholdMB;
+        final Map<Integer, Set<String>> colValueSets = Maps.newHashMap();
+        
+        public DictColDeduper() {
+            this(200, 100);
+        }
+        
+        public DictColDeduper(int enableThresholdMB, int resetThresholdMB) {
+            // only enable when there is sufficient memory
+            this.enabled = MemoryBudgetController.getSystemAvailMB() >= enableThresholdMB;
+            this.resetThresholdMB = resetThresholdMB;
+        }
+        
+        public void setIsDictCol(int i) {
+            colValueSets.put(i, new HashSet<String>());
+        }
+        
+        public boolean isDictCol(int i) {
+            return colValueSets.containsKey(i);
+        }
+
+        public boolean add(int i, String fieldValue) {
+            return colValueSets.get(i).add(fieldValue);
+        }
+        
+        public Set<String> getValueSet(int i) {
+            return colValueSets.get(i);
+        }
+
+        public void resetIfShortOfMem() {
+            if (MemoryBudgetController.getSystemAvailMB() < resetThresholdMB) {
+                for (Set<String> set : colValueSets.values())
+                    set.clear();
+            }
+        }
+        
+    }
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index ceddeb5..ad9030c 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -49,10 +49,9 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends Kyli
     protected CubeDesc cubeDesc;
     protected long baseCuboidId;
     protected IMRTableInputFormat flatTableInputFormat;
-    protected List<TblColRef> allDimDictCols;
+    protected List<TblColRef> allCols;
 
     protected Text outputKey = new Text();
-    //protected SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
     protected Text outputValue = new Text();
     protected int errorRecordCounter = 0;
 
@@ -73,14 +72,14 @@ abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends Kyli
         cubeDesc = cube.getDescriptor();
         baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
         reducerMapping = new FactDistinctColumnsReducerMapping(cube);
-        allDimDictCols = reducerMapping.getAllDimDictCols();
+        allCols = reducerMapping.getAllDimDictCols();
 
         flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
 
         intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc);
-        columnIndex = new int[allDimDictCols.size()];
-        for (int i = 0; i < allDimDictCols.size(); i++) {
-            TblColRef colRef = allDimDictCols.get(i);
+        columnIndex = new int[allCols.size()];
+        for (int i = 0; i < allCols.size(); i++) {
+            TblColRef colRef = allCols.get(i);
             int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef);
             columnIndex[i] = columnIndexOnFlatTbl;
         }
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java
index 3c58b26..9a00d55 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerMappingTest.java
@@ -54,9 +54,7 @@ public class FactDistinctColumnsReducerMappingTest extends LocalFileMetadataTest
         TblColRef aUHC = cube.getModel().findColumn("TEST_COUNT_DISTINCT_BITMAP");
 
         FactDistinctColumnsReducerMapping mapping = new FactDistinctColumnsReducerMapping(cube);
-        //System.out.println(mapping.getAllDictCols());
-        //System.out.println(Arrays.toString(mapping.getAllRolePlaysForReducers()));
-
+        
         int totalReducerNum = mapping.getTotalReducerNum();
         Assert.assertEquals(2, mapping.getCuboidRowCounterReducerNum());
         
diff --git a/kylin/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/DictColDeduperTest.java b/kylin/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/DictColDeduperTest.java
new file mode 100644
index 0000000..1634843
--- /dev/null
+++ b/kylin/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/DictColDeduperTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsMapper.DictColDeduper;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ */
+public class DictColDeduperTest {
+
+    @Test
+    public void testBasics() {
+        DictColDeduper dd = new DictColDeduper(50, 0);
+        
+        dd.setIsDictCol(0);
+        dd.setIsDictCol(2);
+        
+        Assert.assertTrue(dd.isDictCol(0));
+        Assert.assertTrue(!dd.isDictCol(1));
+        Assert.assertTrue(dd.isDictCol(2));
+        Assert.assertTrue(!dd.isDictCol(3));
+        
+        Assert.assertTrue(dd.add(0, "abc"));
+        dd.resetIfShortOfMem();
+        Assert.assertTrue(!dd.add(0, "abc"));
+        
+        Assert.assertTrue(dd.add(2, "abc"));
+        dd.resetIfShortOfMem();
+        Assert.assertTrue(!dd.add(2, "abc"));
+    }
+    
+    @Test
+    public void testReset() {
+        DictColDeduper dd = new DictColDeduper(50, Integer.MAX_VALUE);
+        
+        dd.setIsDictCol(0);
+        dd.setIsDictCol(2);
+        
+        Assert.assertTrue(dd.add(0, "abc"));
+        Assert.assertTrue(dd.add(2, "abc"));
+        
+        dd.resetIfShortOfMem();
+        
+        Assert.assertTrue(dd.add(0, "abc"));
+        Assert.assertTrue(dd.add(2, "abc"));
+    }
+}