You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/07/06 03:51:53 UTC

[kylin] branch revert-1679-kylin-on-parquet-v2 created (now d92e408)

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a change to branch revert-1679-kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git.


      at d92e408  Revert "Remove unused property, class and maven module which is for Kylin 3 only"

This branch includes the following new commits:

     new d92e408  Revert "Remove unused property, class and maven module which is for Kylin 3 only"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[kylin] 01/01: Revert "Remove unused property, class and maven module which is for Kylin 3 only"

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch revert-1679-kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit d92e40878612fe1d4297169365dee0642ec45bb4
Author: Xiaoxiang Yu <xx...@apache.org>
AuthorDate: Tue Jul 6 11:50:58 2021 +0800

    Revert "Remove unused property, class and maven module which is for Kylin 3 only"
    
    This reverts commit e9291e3b232cb628bbfa8dc9fc21fbab1d491a19.
---
 .../kylin/engine/mr/SortedColumnDFSFile.java       |    6 +-
 .../engine/mr/common/MapReduceExecutable.java      |    6 +
 .../kylin/engine/mr/common/MapReduceUtil.java      |  267 ++--
 .../org/apache/kylin/common/KylinConfigBase.java   |  813 ++++++++----
 .../apache/kylin/common/annotation/ConfigTag.java  |    3 -
 .../org/apache/kylin/common/util/HadoopUtil.java   |    4 +
 .../org/apache/kylin/common/KylinConfigTest.java   |   20 +-
 core-cube/pom.xml                                  |    8 +-
 .../java/org/apache/kylin/cube/CubeManager.java    |  341 ++---
 .../java/org/apache/kylin/cube/CubeSegment.java    |   34 +-
 .../kylin/cube/cli/DictionaryGeneratorCLI.java     |  187 +++
 .../apache/kylin/cube/cli/DumpDictionaryCLI.java   |   62 +
 .../java/org/apache/kylin/cube/model/CubeDesc.java |   46 +-
 .../apache/kylin/cube/model/SnapshotTableDesc.java |   10 +-
 .../cube/model/validation/rule/DictionaryRule.java |    6 +
 .../org/apache/kylin/cube/util/CubingUtils.java    |  284 ++--
 .../model/validation/rule/DictionaryRuleTest.java  |   14 +
 .../org/apache/kylin/dict/DictionaryGenerator.java |  382 +++---
 .../apache/kylin/dict/lookup}/ILookupTable.java    |    2 +-
 .../apache/kylin/dict/lookup/SnapshotManager.java  |   12 +-
 .../java/org/apache/kylin/job/JoinedFlatTable.java |   22 +-
 .../kylin/dimension/DimensionEncodingFactory.java  |    1 -
 .../org/apache/kylin/storage/StorageFactory.java   |   13 +
 .../storage/gtrecord/CubeScanRangePlanner.java     |   15 +-
 .../kylin/storage/gtrecord/CubeSegmentScanner.java |    6 +
 .../kylin/storage/gtrecord/CubeTupleConverter.java |    4 +-
 .../storage/gtrecord/GTCubeStorageQueryBase.java   |    2 +-
 .../storage/translate/DerivedFilterTranslator.java |    2 +-
 .../kylin/storage/gtrecord/DictGridTableTest.java  | 1380 ++++++++++----------
 pom.xml                                            |  122 +-
 .../query/enumerator/LookupTableEnumerator.java    |  147 +++
 .../apache/kylin/query/enumerator/OLAPQuery.java   |    4 +
 .../apache/kylin/query/relnode/OLAPFilterRel.java  |    3 +-
 .../query/relnode/visitor/TupleFilterVisitor.java  |    5 +-
 .../kylin/rest/controller/CubeController.java      |   30 +-
 .../org/apache/kylin/rest/service/CubeService.java |   30 +-
 .../apache/kylin/rest/service/TableService.java    |   61 +-
 .../apache/kylin/source/hive/HiveInputBase.java    |   33 +-
 38 files changed, 2673 insertions(+), 1714 deletions(-)

diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java
index 507898d..bcf4b98 100644
--- a/build-engine/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java
+++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.dict.ByteComparator;
+import org.apache.kylin.dict.StringBytesConverter;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.source.IReadableTable;
 import org.slf4j.Logger;
@@ -90,9 +92,9 @@ public class SortedColumnDFSFile implements IReadableTable {
     }
 
     private Comparator<String> getComparatorByType(DataType type) {
-        Comparator<String> comparator = null;
+        Comparator<String> comparator;
         if (!type.isNumberFamily()) {
-//            comparator = new ByteComparator<>(new StringBytesConverter());
+            comparator = new ByteComparator<>(new StringBytesConverter());
         } else if (type.isIntegerFamily()) {
             comparator = new Comparator<String>() {
                 @Override
diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index c6c80f4..4978fa0 100755
--- a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -500,6 +500,12 @@ public class MapReduceExecutable extends AbstractExecutable {
         for (Map.Entry<String, String> entry : configOverride.getMRConfigOverride().entrySet()) {
             conf.set(entry.getKey(), entry.getValue());
         }
+        if (conf.get("mapreduce.job.is-mem-hungry") != null
+                && Boolean.parseBoolean(conf.get("mapreduce.job.is-mem-hungry"))) {
+            for (Map.Entry<String, String> entry : configOverride.getMemHungryConfigOverride().entrySet()) {
+                conf.set(entry.getKey(), entry.getValue());
+            }
+        }
 
         if (StringUtils.isNotBlank(cubeName)) {
             remainingArgs.add("-" + BatchConstants.ARG_CUBE_NAME);
diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
index 4fa025b..ecde4aa 100644
--- a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
+++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
@@ -18,130 +18,147 @@
 
 package org.apache.kylin.engine.mr.common;
 
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.job.exception.JobException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
 public class MapReduceUtil {
-//
-//    private static final Logger logger = LoggerFactory.getLogger(MapReduceUtil.class);
-//
-//    /**
-//     * @return reducer number for calculating hll
-//     */
-//    public static int getCuboidHLLCounterReducerNum(CubeInstance cube) {
-//        int nCuboids = cube.getCuboidScheduler().getAllCuboidIds().size();
-//        int shardBase = (nCuboids - 1) / cube.getConfig().getHadoopJobPerReducerHLLCuboidNumber() + 1;
-//
-//        int hllMaxReducerNumber = cube.getConfig().getHadoopJobHLLMaxReducerNumber();
-//        if (shardBase > hllMaxReducerNumber) {
-//            shardBase = hllMaxReducerNumber;
-//        }
-//        return shardBase;
-//    }
-//
-//    /**
-//     * @param cuboidScheduler specified can provide more flexibility
-//     * */
-//    public static int getLayeredCubingReduceTaskNum(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler,
-//            double totalMapInputMB, int level)
-//            throws ClassNotFoundException, IOException, InterruptedException, JobException {
-//        CubeDesc cubeDesc = cubeSegment.getCubeDesc();
-//        KylinConfig kylinConfig = cubeDesc.getConfig();
-//
-//        double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
-//        double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
-//        logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level "
-//                + level);
-//
-//        CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, cuboidScheduler, kylinConfig);
-//
-//        double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst;
-//
-//        if (level == -1) {
-//            //merge case
-//            double estimatedSize = cubeStatsReader.estimateCubeSize();
-//            adjustedCurrentLayerSizeEst = estimatedSize > totalMapInputMB ? totalMapInputMB : estimatedSize;
-//            logger.debug("estimated size {}, input size {}, adjustedCurrentLayerSizeEst: {}", estimatedSize,
-//                    totalMapInputMB, adjustedCurrentLayerSizeEst);
-//        } else if (level == 0) {
-//            //base cuboid case TODO: the estimation could be very WRONG because it has no correction
-//            adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0);
-//            logger.debug("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst);
-//        } else {
-//            parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1);
-//            currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level);
-//            adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst;
-//            logger.debug(
-//                    "totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}",
-//                    totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst);
-//        }
-//
-//        // number of reduce tasks
-//        int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / perReduceInputMB * reduceCountRatio + 0.99);
-//
-//        // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance
-//        if (cubeDesc.hasMemoryHungryMeasures()) {
-//            logger.debug("Multiply reducer num by 4 to boost performance for memory hungry measures");
-//            numReduceTasks = numReduceTasks * 4;
-//        }
-//
-//        // at least 1 reducer by default
-//        numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
-//        // no more than 500 reducer by default
-//        numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
-//
-//        return numReduceTasks;
-//    }
-//
-//    public static int getInmemCubingReduceTaskNum(CubeSegment cubeSeg, CuboidScheduler cuboidScheduler)
-//            throws IOException {
-//        KylinConfig kylinConfig = cubeSeg.getConfig();
-//
-//        Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, cuboidScheduler, kylinConfig).getCuboidSizeMap();
-//        double totalSizeInM = 0;
-//        for (Double cuboidSize : cubeSizeMap.values()) {
-//            totalSizeInM += cuboidSize;
-//        }
-//        return getReduceTaskNum(totalSizeInM, kylinConfig);
-//    }
-//
-//    // @return the first indicates the total reducer number, the second indicates the reducer number for base cuboid
-//    public static Pair<Integer, Integer> getConvergeCuboidDataReduceTaskNums(CubeSegment cubeSeg) throws IOException {
-//        long baseCuboidId = cubeSeg.getCuboidScheduler().getBaseCuboidId();
-//
-//        Set<Long> overlapCuboids = Sets.newHashSet(cubeSeg.getCuboidScheduler().getAllCuboidIds());
-//        overlapCuboids.retainAll(cubeSeg.getCubeInstance().getCuboidsRecommend());
-//        overlapCuboids.add(baseCuboidId);
-//
-//        Pair<Map<Long, Long>, Long> cuboidStats = CuboidStatsReaderUtil
-//                .readCuboidStatsWithSourceFromSegment(overlapCuboids, cubeSeg);
-//        Map<Long, Double> cubeSizeMap = CubeStatsReader.getCuboidSizeMapFromRowCount(cubeSeg, cuboidStats.getFirst(),
-//                cuboidStats.getSecond());
-//        double totalSizeInM = 0;
-//        for (Double cuboidSize : cubeSizeMap.values()) {
-//            totalSizeInM += cuboidSize;
-//        }
-//
-//        double baseSizeInM = cubeSizeMap.get(baseCuboidId);
-//
-//        KylinConfig kylinConfig = cubeSeg.getConfig();
-//        int nBase = getReduceTaskNum(baseSizeInM, kylinConfig);
-//        int nOther = getReduceTaskNum(totalSizeInM - baseSizeInM, kylinConfig);
-//        return new Pair<>(nBase + nOther, nBase);
-//    }
-//
-//    private static int getReduceTaskNum(double totalSizeInM, KylinConfig kylinConfig) {
-//        double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
-//        double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
-//
-//        // number of reduce tasks
-//        int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB * reduceCountRatio);
-//
-//        // at least 1 reducer by default
-//        numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
-//        // no more than 500 reducer by default
-//        numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
-//
-//        logger.info("Having total map input MB " + Math.round(totalSizeInM));
-//        logger.info("Having per reduce MB " + perReduceInputMB);
-//        logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks);
-//        return numReduceTasks;
-//    }
+
+    private static final Logger logger = LoggerFactory.getLogger(MapReduceUtil.class);
+
+    /**
+     * @return reducer number for calculating hll
+     */
+    public static int getCuboidHLLCounterReducerNum(CubeInstance cube) {
+        int nCuboids = cube.getCuboidScheduler().getAllCuboidIds().size();
+        int shardBase = (nCuboids - 1) / cube.getConfig().getHadoopJobPerReducerHLLCuboidNumber() + 1;
+
+        int hllMaxReducerNumber = cube.getConfig().getHadoopJobHLLMaxReducerNumber();
+        if (shardBase > hllMaxReducerNumber) {
+            shardBase = hllMaxReducerNumber;
+        }
+        return shardBase;
+    }
+
+    /**
+     * @param cuboidScheduler specified can provide more flexibility
+     * */
+    public static int getLayeredCubingReduceTaskNum(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler,
+            double totalMapInputMB, int level)
+            throws ClassNotFoundException, IOException, InterruptedException, JobException {
+        CubeDesc cubeDesc = cubeSegment.getCubeDesc();
+        KylinConfig kylinConfig = cubeDesc.getConfig();
+
+        double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
+        double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
+        logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level "
+                + level);
+
+        CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, cuboidScheduler, kylinConfig);
+
+        double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst;
+
+        if (level == -1) {
+            //merge case
+            double estimatedSize = cubeStatsReader.estimateCubeSize();
+            adjustedCurrentLayerSizeEst = estimatedSize > totalMapInputMB ? totalMapInputMB : estimatedSize;
+            logger.debug("estimated size {}, input size {}, adjustedCurrentLayerSizeEst: {}", estimatedSize,
+                    totalMapInputMB, adjustedCurrentLayerSizeEst);
+        } else if (level == 0) {
+            //base cuboid case TODO: the estimation could be very WRONG because it has no correction
+            adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0);
+            logger.debug("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst);
+        } else {
+            parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1);
+            currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level);
+            adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst;
+            logger.debug(
+                    "totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}",
+                    totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst);
+        }
+
+        // number of reduce tasks
+        int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / perReduceInputMB * reduceCountRatio + 0.99);
+
+        // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance
+        if (cubeDesc.hasMemoryHungryMeasures()) {
+            logger.debug("Multiply reducer num by 4 to boost performance for memory hungry measures");
+            numReduceTasks = numReduceTasks * 4;
+        }
+
+        // at least 1 reducer by default
+        numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
+        // no more than 500 reducer by default
+        numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
+
+        return numReduceTasks;
+    }
+
+    public static int getInmemCubingReduceTaskNum(CubeSegment cubeSeg, CuboidScheduler cuboidScheduler)
+            throws IOException {
+        KylinConfig kylinConfig = cubeSeg.getConfig();
+
+        Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, cuboidScheduler, kylinConfig).getCuboidSizeMap();
+        double totalSizeInM = 0;
+        for (Double cuboidSize : cubeSizeMap.values()) {
+            totalSizeInM += cuboidSize;
+        }
+        return getReduceTaskNum(totalSizeInM, kylinConfig);
+    }
+
+    // @return the first indicates the total reducer number, the second indicates the reducer number for base cuboid
+    public static Pair<Integer, Integer> getConvergeCuboidDataReduceTaskNums(CubeSegment cubeSeg) throws IOException {
+        long baseCuboidId = cubeSeg.getCuboidScheduler().getBaseCuboidId();
+
+        Set<Long> overlapCuboids = Sets.newHashSet(cubeSeg.getCuboidScheduler().getAllCuboidIds());
+        overlapCuboids.retainAll(cubeSeg.getCubeInstance().getCuboidsRecommend());
+        overlapCuboids.add(baseCuboidId);
+
+        Pair<Map<Long, Long>, Long> cuboidStats = CuboidStatsReaderUtil
+                .readCuboidStatsWithSourceFromSegment(overlapCuboids, cubeSeg);
+        Map<Long, Double> cubeSizeMap = CubeStatsReader.getCuboidSizeMapFromRowCount(cubeSeg, cuboidStats.getFirst(),
+                cuboidStats.getSecond());
+        double totalSizeInM = 0;
+        for (Double cuboidSize : cubeSizeMap.values()) {
+            totalSizeInM += cuboidSize;
+        }
+
+        double baseSizeInM = cubeSizeMap.get(baseCuboidId);
+
+        KylinConfig kylinConfig = cubeSeg.getConfig();
+        int nBase = getReduceTaskNum(baseSizeInM, kylinConfig);
+        int nOther = getReduceTaskNum(totalSizeInM - baseSizeInM, kylinConfig);
+        return new Pair<>(nBase + nOther, nBase);
+    }
+
+    private static int getReduceTaskNum(double totalSizeInM, KylinConfig kylinConfig) {
+        double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
+        double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
+
+        // number of reduce tasks
+        int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB * reduceCountRatio);
+
+        // at least 1 reducer by default
+        numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
+        // no more than 500 reducer by default
+        numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
+
+        logger.info("Having total map input MB " + Math.round(totalSizeInM));
+        logger.info("Having per reduce MB " + perReduceInputMB);
+        logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks);
+        return numReduceTasks;
+    }
 }
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index fc76506..39f8eae 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -292,14 +292,6 @@ public abstract class KylinConfigBase implements Serializable {
                 || "LOCAL".equals(getOptional("kylin.env", "DEV"));
     }
 
-    public boolean isUTEnv() {
-        return "UT".equals(getDeployEnv());
-    }
-
-    public boolean isLocalEnv() {
-        return "LOCAL".equals(getDeployEnv());
-    }
-
     public String getDeployEnv() {
         return getOptional("kylin.env", "DEV");
     }
@@ -351,7 +343,7 @@ public abstract class KylinConfigBase implements Serializable {
         String root = getOptional("kylin.env.hdfs-metastore-bigcell-dir");
 
         if (root == null) {
-            return getHdfsWorkingDirectory();
+            return getJdbcHdfsWorkingDirectory();
         }
 
         Path path = new Path(root);
@@ -360,8 +352,12 @@ public abstract class KylinConfigBase implements Serializable {
                     "kylin.env.hdfs-metastore-bigcell-dir must be absolute, but got " + root);
 
         // make sure path is qualified
-        FileSystem fs = HadoopUtil.getWorkingFileSystem();
-        path = fs.makeQualified(path);
+        try {
+            FileSystem fs = HadoopUtil.getReadFileSystem();
+            path = fs.makeQualified(path);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
 
         root = new Path(path, StringUtils.replaceChars(getMetadataUrlPrefix(), ':', '-')).toString();
 
@@ -378,6 +374,28 @@ public abstract class KylinConfigBase implements Serializable {
         return cachedBigCellDirectory;
     }
 
+    public String getReadHdfsWorkingDirectory() {
+        if (StringUtils.isNotEmpty(getHBaseClusterFs())) {
+            Path workingDir = new Path(getHdfsWorkingDirectory());
+            return new Path(getHBaseClusterFs(), Path.getPathWithoutSchemeAndAuthority(workingDir)).toString() + "/";
+        }
+
+        return getHdfsWorkingDirectory();
+    }
+
+    private String getJdbcHdfsWorkingDirectory() {
+        if (StringUtils.isNotEmpty(getJdbcFileSystem())) {
+            Path workingDir = new Path(getReadHdfsWorkingDirectory());
+            return new Path(getJdbcFileSystem(), Path.getPathWithoutSchemeAndAuthority(workingDir)).toString() + "/";
+        }
+
+        return getReadHdfsWorkingDirectory();
+    }
+
+    private String getJdbcFileSystem() {
+        return getOptional("kylin.storage.columnar.jdbc.file-system", "");
+    }
+
     public String getHdfsWorkingDirectory(String project) {
         if (isProjectIsolationEnabled() && project != null) {
             return new Path(getHdfsWorkingDirectory(), project).toString() + "/";
@@ -461,7 +479,7 @@ public abstract class KylinConfigBase implements Serializable {
         Map<String, String> r = Maps.newLinkedHashMap();
         // ref constants in ISourceAware
         r.put("", "org.apache.kylin.common.persistence.FileResourceStore");
-//        r.put("hbase", "org.apache.kylin.storage.hbase.HBaseResourceStore");
+        r.put("hbase", "org.apache.kylin.storage.hbase.HBaseResourceStore");
         r.put("hdfs", "org.apache.kylin.common.persistence.HDFSResourceStore");
         r.put("ifile", "org.apache.kylin.common.persistence.IdentifierFileResourceStore");
         r.put("jdbc", "org.apache.kylin.common.persistence.JDBCResourceStore");
@@ -516,7 +534,6 @@ public abstract class KylinConfigBase implements Serializable {
         return (DistributedLockFactory) ClassUtil.newInstance(clsName);
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHBaseMappingAdapter() {
         return getOptional("kylin.metadata.hbasemapping-adapter");
     }
@@ -525,17 +542,14 @@ public abstract class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.metadata.check-copy-on-write", FALSE));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHbaseClientScannerTimeoutPeriod() {
         return getOptional("kylin.metadata.hbase-client-scanner-timeout-period", "10000");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHbaseRpcTimeout() {
         return getOptional("kylin.metadata.hbase-rpc-timeout", "5000");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHbaseClientRetriesNumber() {
         return getOptional("kylin.metadata.hbase-client-retries-number", "1");
     }
@@ -548,31 +562,119 @@ public abstract class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.metadata.able-change-string-to-datetime", "false"));
     }
 
-    public String getMetadataDialect() {
-        return getOptional("kylin.metadata.jdbc.dialect", "mysql");
+    // ============================================================================
+    // DICTIONARY & SNAPSHOT
+    // ============================================================================
+
+    public boolean isUseForestTrieDictionary() {
+        return Boolean.parseBoolean(getOptional("kylin.dictionary.use-forest-trie", TRUE));
     }
 
-    public boolean isJsonAlwaysSmallCell() {
-        return Boolean.parseBoolean(getOptional("kylin.metadata.jdbc.json-always-small-cell", TRUE));
+    public long getTrieDictionaryForestMaxTrieSizeMB() {
+        return Integer.parseInt(getOptional("kylin.dictionary.forest-trie-max-mb", "500"));
     }
 
-    public int getSmallCellMetadataWarningThreshold() {
-        return Integer.parseInt(
-                getOptional("kylin.metadata.jdbc.small-cell-meta-size-warning-threshold", String.valueOf(100 << 20))); //100mb
+    public long getCachedDictMaxEntrySize() {
+        return Long.parseLong(getOptional("kylin.dictionary.max-cache-entry", "3000"));
     }
 
-    public int getSmallCellMetadataErrorThreshold() {
-        return Integer.parseInt(
-                getOptional("kylin.metadata.jdbc.small-cell-meta-size-error-threshold", String.valueOf(1 << 30))); // 1gb
+    public int getCachedDictMaxSize() {
+        return Integer.parseInt(getOptional("kylin.dictionary.max-cache-size", "-1"));
     }
 
-    public int getJdbcResourceStoreMaxCellSize() {
-        return Integer.parseInt(getOptional("kylin.metadata.jdbc.max-cell-size", "1048576")); // 1mb
+    public boolean isGrowingDictEnabled() {
+        return Boolean.parseBoolean(this.getOptional("kylin.dictionary.growing-enabled", FALSE));
+    }
+
+    public boolean isDictResuable() {
+        return Boolean.parseBoolean(this.getOptional("kylin.dictionary.resuable", FALSE));
+    }
+
+    public long getCachedDictionaryMaxEntrySize() {
+        return Long.parseLong(getOptional("kylin.dictionary.cached-dict-max-cache-entry", "50000"));
+    }
+
+    public int getAppendDictEntrySize() {
+        return Integer.parseInt(getOptional("kylin.dictionary.append-entry-size", "10000000"));
+    }
+
+    public int getAppendDictMaxVersions() {
+        return Integer.parseInt(getOptional("kylin.dictionary.append-max-versions", "3"));
+    }
+
+    public int getAppendDictVersionTTL() {
+        return Integer.parseInt(getOptional("kylin.dictionary.append-version-ttl", "259200000"));
+    }
+
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    public int getCachedSnapshotMaxEntrySize() {
+        return Integer.parseInt(getOptional("kylin.snapshot.max-cache-entry", "500"));
+    }
+
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    public int getTableSnapshotMaxMB() {
+        return Integer.parseInt(getOptional("kylin.snapshot.max-mb", "300"));
+    }
+
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    public int getExtTableSnapshotShardingMB() {
+        return Integer.parseInt(getOptional("kylin.snapshot.ext.shard-mb", "500"));
+    }
+
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    public String getExtTableSnapshotLocalCachePath() {
+        return getOptional("kylin.snapshot.ext.local.cache.path", "lookup_cache");
+    }
+
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    public double getExtTableSnapshotLocalCacheMaxSizeGB() {
+        return Double.parseDouble(getOptional("kylin.snapshot.ext.local.cache.max-size-gb", "200"));
+    }
+
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    public long getExtTableSnapshotLocalCacheCheckVolatileRange() {
+        return Long.parseLong(getOptional("kylin.snapshot.ext.local.cache.check.volatile", "3600000"));
     }
 
+    @ConfigTag({ConfigTag.Tag.DEPRECATED, ConfigTag.Tag.CUBE_LEVEL})
+    public boolean isShrunkenDictFromGlobalEnabled() {
+        return Boolean.parseBoolean(this.getOptional("kylin.dictionary.shrunken-from-global-enabled", TRUE));
+    }
+
+
     // ============================================================================
-    // DICTIONARY & SNAPSHOT
+    // Hive Global Dictionary
+    //
+    // ============================================================================
+
+    /**
+     * @return if mr-hive dict not enabled, return empty array
+     * else return array contains "{TABLE_NAME}_{COLUMN_NAME}"
+     */
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    public String[] getMrHiveDictColumns() {
+        String columnStr = getOptional("kylin.dictionary.mr-hive.columns", "");
+        if (!columnStr.equals("")) {
+            return columnStr.split(",");
+        }
+        return new String[0];
+    }
+
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    public String getMrHiveDictDB() {
+        return getOptional("kylin.dictionary.mr-hive.database", getHiveDatabaseForIntermediateTable());
+    }
+
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    public String getMrHiveDictTableSuffix() {
+        return getOptional("kylin.dictionary.mr-hive.table.suffix", "_global_dict");
+    }
+
     // ============================================================================
+    // Distributed/Spark Global dictionary
+    // Add wiki link here
+    // ============================================================================
+
 
     public int getGlobalDictV2MinHashPartitions() {
         return Integer.parseInt(getOptional("kylin.dictionary.globalV2-min-hash-partitions", "10"));
@@ -606,42 +708,6 @@ public abstract class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.dictionary.globalV2-check", "true"));
     }
 
-    /*
-     * Detect dataset skew in dictionary encode step.
-     * */
-    public boolean detectDataSkewInDictEncodingEnabled() {
-        return Boolean.valueOf(getOptional("kylin.dictionary.detect.data.skew.in.encoding", "false"));
-    }
-
-    /*
-     * In some data skew cases, the repartition step during dictionary encoding will be slow.
-     * We can choose to sample from the dataset to detect skewed. This configuration is used to set the sample rate.
-     * */
-    public double sampleRateInEncodingSkewDetection() {
-        return Double.valueOf(getOptional("kylin.dictionary.detect.data.skew.sample.rate", "0.1"));
-    }
-
-    /*
-     * In KYLIN4, dictionaries are hashed into several buckets, column data are repartitioned by the same hash algorithm
-     * during encoding step too. In data skew cases, the repartition step will be very slow. Kylin will automatically
-     * sample from the source to detect skewed data and repartition these skewed data to random partitions.
-     * This configuration is used to set the skew data threshhold, valued from 0 to 1.
-     * e.g.
-     *   if you set this value to 0.05, for each value that takes up more than 5% percent of the total will be regarded
-     *   as skew data, as a result the skewed data will be no more than 20 records
-     * */
-    public double skewPercentageThreshHold() {
-        return Double.valueOf(getOptional("kylin.dictionary.data.skew.percentage.threshhold", "0.05"));
-    }
-
-    public boolean isSnapshotParallelBuildEnabled() {
-        return Boolean.parseBoolean(getOptional("kylin.snapshot.parallel-build-enabled", "true"));
-    }
-
-    public int snapshotParallelBuildTimeoutSeconds() {
-        return Integer.parseInt(getOptional("kylin.snapshot.parallel-build-timeout-seconds", "3600"));
-    }
-
     // ============================================================================
     // CUBE
     // ============================================================================
@@ -973,6 +1039,14 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.job.error-record-threshold", "0"));
     }
 
+    public boolean isAdvancedFlatTableUsed() {
+        return Boolean.parseBoolean(getOptional("kylin.job.use-advanced-flat-table", FALSE));
+    }
+
+    public String getAdvancedFlatTableClass() {
+        return getOptional("kylin.job.advanced-flat-table.class");
+    }
+
     public String getJobTrackingURLPattern() {
         return getOptional("kylin.job.tracking-url-pattern", "");
     }
@@ -1032,7 +1106,7 @@ public abstract class KylinConfigBase implements Serializable {
     /**
      * was for route to hive, not used any more
      */
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    @Deprecated
     public String getHiveUrl() {
         return getOptional("kylin.source.hive.connection-url", "");
     }
@@ -1040,7 +1114,7 @@ public abstract class KylinConfigBase implements Serializable {
     /**
      * was for route to hive, not used any more
      */
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    @Deprecated
     public String getHiveUser() {
         return getOptional("kylin.source.hive.connection-user", "");
     }
@@ -1048,87 +1122,71 @@ public abstract class KylinConfigBase implements Serializable {
     /**
      * was for route to hive, not used any more
      */
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    @Deprecated
     public String getHivePassword() {
         return getOptional("kylin.source.hive.connection-password", "");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public Map<String, String> getHiveConfigOverride() {
         return getPropertiesByPrefix("kylin.source.hive.config-override.");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getOverrideHiveTableLocation(String table) {
         return getOptional("kylin.source.hive.table-location." + table.toUpperCase(Locale.ROOT));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isHiveKeepFlatTable() {
         return Boolean.parseBoolean(this.getOptional("kylin.source.hive.keep-flat-table", FALSE));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHiveDatabaseForIntermediateTable() {
         return CliCommandExecutor.checkHiveProperty(this.getOptional("kylin.source.hive.database-for-flat-table", DEFAULT));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getFlatTableStorageFormat() {
         return this.getOptional("kylin.source.hive.flat-table-storage-format", "SEQUENCEFILE");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getFlatTableFieldDelimiter() {
         return this.getOptional("kylin.source.hive.flat-table-field-delimiter", "\u001F");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isHiveRedistributeEnabled() {
         return Boolean.parseBoolean(this.getOptional("kylin.source.hive.redistribute-flat-table", TRUE));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHiveClientMode() {
         return getOptional("kylin.source.hive.client", "cli");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHiveBeelineShell() {
         return getOptional("kylin.source.hive.beeline-shell", "beeline");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHiveBeelineParams() {
         return getOptional("kylin.source.hive.beeline-params", "");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean getEnableSparkSqlForTableOps() {
         return Boolean.parseBoolean(getOptional("kylin.source.hive.enable-sparksql-for-table-ops", FALSE));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getSparkSqlBeelineShell() {
         return getOptional("kylin.source.hive.sparksql-beeline-shell", "");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getSparkSqlBeelineParams() {
         return getOptional("kylin.source.hive.sparksql-beeline-params", "");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean getHiveTableDirCreateFirst() {
         return Boolean.parseBoolean(getOptional("kylin.source.hive.table-dir-create-first", FALSE));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getFlatHiveTableClusterByDictColumn() {
         return getOptional("kylin.source.hive.flat-table-cluster-by-dict-column");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getHiveRedistributeColumnCount() {
         return Integer.parseInt(getOptional("kylin.source.hive.redistribute-column-count", "3"));
     }
@@ -1185,7 +1243,7 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
     // ============================================================================
-    // SOURCE.KAFKA(Removed)
+    // SOURCE.KAFKA
     // ============================================================================
 
     @ConfigTag(ConfigTag.Tag.NOT_IMPLEMENTED)
@@ -1194,7 +1252,7 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
     // ============================================================================
-    // SOURCE.JDBC(Removed)
+    // SOURCE.JDBC
     // ============================================================================
 
     @ConfigTag(ConfigTag.Tag.NOT_IMPLEMENTED)
@@ -1253,7 +1311,7 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
     // ============================================================================
-    // STORAGE.HBASE(Removed)
+    // STORAGE.HBASE
     // ============================================================================
 
     public Map<Integer, String> getStorageEngines() {
@@ -1463,7 +1521,7 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
     // ============================================================================
-    // ENGINE.MR(Removed)
+    // ENGINE.MR
     // ============================================================================
 
     public Map<Integer, String> getJobEngines() {
@@ -1482,6 +1540,8 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.engine.default", "6"));
     }
 
+
+
     public String getKylinJobJarPath() {
         final String jobJar = getOptional(KYLIN_ENGINE_MR_JOB_JAR);
         if (StringUtils.isNotEmpty(jobJar)) {
@@ -1499,63 +1559,76 @@ public abstract class KylinConfigBase implements Serializable {
         System.setProperty(KYLIN_ENGINE_MR_JOB_JAR, path);
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getKylinJobMRLibDir() {
         return getOptional("kylin.engine.mr.lib-dir", "");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public Map<String, String> getMRConfigOverride() {
         return getPropertiesByPrefix("kylin.engine.mr.config-override.");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
-    public int getHadoopJobMinReducerNumber() {
-        return Integer.parseInt(getOptional("kylin.engine.mr.min-reducer-number", "1"));
+    // used for some mem-hungry step
+    public Map<String, String> getMemHungryConfigOverride() {
+        return getPropertiesByPrefix("kylin.engine.mr.mem-hungry-config-override.");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
-    public int getHadoopJobMaxReducerNumber() {
-        return Integer.parseInt(getOptional("kylin.engine.mr.max-reducer-number", "500"));
+    public Map<String, String> getUHCMRConfigOverride() {
+        return getPropertiesByPrefix("kylin.engine.mr.uhc-config-override.");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
-    public int getHadoopJobMapperInputRows() {
-        return Integer.parseInt(getOptional("kylin.engine.mr.mapper-input-rows", "1000000"));
+    public Map<String, String> getBaseCuboidMRConfigOverride() {
+        return getPropertiesByPrefix("kylin.engine.mr.base-cuboid-config-override.");
+    }
+
+    public Map<String, String> getSparkConfigOverride() {
+        return getPropertiesByPrefix("kylin.engine.spark-conf.");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public Map<String, String> getFlinkConfigOverride() {
         return getPropertiesByPrefix("kylin.engine.flink-conf.");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public Map<String, String> getSparkConfigOverrideWithSpecificName(String configName) {
         return getPropertiesByPrefix("kylin.engine.spark-conf-" + configName + ".");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public Map<String, String> getFlinkConfigOverrideWithSpecificName(String configName) {
         return getPropertiesByPrefix("kylin.engine.flink-conf-" + configName + ".");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    public double getDefaultHadoopJobReducerInputMB() {
+        return Double.parseDouble(getOptional("kylin.engine.mr.reduce-input-mb", "500"));
+    }
+
+    public double getDefaultHadoopJobReducerCountRatio() {
+        return Double.parseDouble(getOptional("kylin.engine.mr.reduce-count-ratio", "1.0"));
+    }
+
+    public int getHadoopJobMinReducerNumber() {
+        return Integer.parseInt(getOptional("kylin.engine.mr.min-reducer-number", "1"));
+    }
+
+    public int getHadoopJobMaxReducerNumber() {
+        return Integer.parseInt(getOptional("kylin.engine.mr.max-reducer-number", "500"));
+    }
+
+    public int getHadoopJobMapperInputRows() {
+        return Integer.parseInt(getOptional("kylin.engine.mr.mapper-input-rows", "1000000"));
+    }
+
     public int getCuboidStatsCalculatorMaxNumber() {
         // set 1 to disable multi-thread statistics calculation
         return Integer.parseInt(getOptional("kylin.engine.mr.max-cuboid-stats-calculator-number", "1"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getCuboidNumberPerStatsCalculator() {
         return Integer.parseInt(getOptional("kylin.engine.mr.cuboid-number-per-stats-calculator", "100"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getHadoopJobPerReducerHLLCuboidNumber() {
         return Integer.parseInt(getOptional("kylin.engine.mr.per-reducer-hll-cuboid-number", "100"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getHadoopJobHLLMaxReducerNumber() {
         // by default multi-reducer hll calculation is disabled
         return Integer.parseInt(getOptional("kylin.engine.mr.hll-max-reducer-number", "1"));
@@ -1566,27 +1639,22 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.engine.mr.uhc-reducer-count", "3"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isBuildUHCDictWithMREnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.mr.build-uhc-dict-in-additional-step", FALSE));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isBuildDictInReducerEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.mr.build-dict-in-reducer", TRUE));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getYarnStatusCheckUrl() {
         return getOptional("kylin.engine.mr.yarn-check-status-url", null);
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getYarnStatusCheckIntervalSeconds() {
         return Integer.parseInt(getOptional("kylin.engine.mr.yarn-check-interval-seconds", "10"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isUseLocalClasspathEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.mr.use-local-classpath", TRUE));
     }
@@ -1595,114 +1663,122 @@ public abstract class KylinConfigBase implements Serializable {
      * different version hive use different UNION style
      * https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Union
      */
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHiveUnionStyle() {
         return getOptional("kylin.hive.union.style", "UNION");
     }
 
     // ============================================================================
-    // ENGINE.SPARK (DEPRECATED)
+    // ENGINE.SPARK
     // ============================================================================
 
     public String getHadoopConfDir() {
         return getOptional("kylin.env.hadoop-conf-dir", "");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    /**
+     * Get the sparder app name, default value is: 'sparder_on_localhost-7070'
+     */
+    public String getSparderAppName() {
+        String customSparderAppName = getOptional("kylin.query.sparder-context.app-name", "");
+        if (StringUtils.isEmpty(customSparderAppName)) {
+            customSparderAppName =
+                    "sparder_on_" + getServerRestAddress().replaceAll(":", "-");
+        }
+        return customSparderAppName;
+    }
+
     public String getSparkAdditionalJars() {
         return getOptional("kylin.engine.spark.additional-jars", "");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getFlinkAdditionalJars() {
         return getOptional("kylin.engine.flink.additional-jars", "");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public float getSparkRDDPartitionCutMB() {
         return Float.parseFloat(getOptional("kylin.engine.spark.rdd-partition-cut-mb", "10.0"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public float getFlinkPartitionCutMB() {
         return Float.parseFloat(getOptional("kylin.engine.flink.partition-cut-mb", "10.0"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getSparkMinPartition() {
         return Integer.parseInt(getOptional("kylin.engine.spark.min-partition", "1"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getFlinkMinPartition() {
         return Integer.parseInt(getOptional("kylin.engine.flink.min-partition", "1"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getSparkMaxPartition() {
         return Integer.parseInt(getOptional("kylin.engine.spark.max-partition", "5000"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getFlinkMaxPartition() {
         return Integer.parseInt(getOptional("kylin.engine.flink.max-partition", "5000"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getSparkStorageLevel() {
         return getOptional("kylin.engine.spark.storage-level", "MEMORY_AND_DISK_SER");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isSparkSanityCheckEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.spark.sanity-check-enabled", FALSE));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    public boolean isSparkFactDistinctEnable() {
+        return Boolean.parseBoolean(getOptional("kylin.engine.spark-fact-distinct", "false"));
+    }
+
+    public boolean isSparkUHCDictionaryEnable() {
+        return Boolean.parseBoolean(getOptional("kylin.engine.spark-udc-dictionary", "false"));
+    }
+
+    public boolean isSparkCardinalityEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.engine.spark-cardinality", "false"));
+    }
+
     public int getSparkOutputMaxSize() {
         return Integer.valueOf(getOptional("kylin.engine.spark.output.max-size", "10485760"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isSparkDimensionDictionaryEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.spark-dimension-dictionary", "false"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isFlinkSanityCheckEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.flink.sanity-check-enabled", FALSE));
     }
 
+    public boolean isSparCreateHiveTableViaSparkEnable() {
+        return Boolean.parseBoolean(getOptional("kylin.engine.spark-create-table-enabled", "false"));
+    }
+
     // ============================================================================
     // ENGINE.LIVY
     // ============================================================================
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isLivyEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.livy-conf.livy-enabled", FALSE));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getLivyRestApiBacktick() {
         return getOptional("kylin.engine.livy.backtick.quote", "");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getLivyUrl() {
         return getOptional("kylin.engine.livy-conf.livy-url");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public Map<String, String> getLivyKey() {
         return getPropertiesByPrefix("kylin.engine.livy-conf.livy-key.");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public Map<String, String> getLivyArr() {
         return getPropertiesByPrefix("kylin.engine.livy-conf.livy-arr.");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public Map<String, String> getLivyMap() {
         return getPropertiesByPrefix("kylin.engine.livy-conf.livy-map.");
     }
@@ -1763,7 +1839,6 @@ public abstract class KylinConfigBase implements Serializable {
 
     // check KYLIN-3358, need deploy coprocessor if enabled
     // finally should be deprecated
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isDynamicColumnEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.query.enable-dynamic-column", FALSE));
     }
@@ -1818,13 +1893,11 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.query.project-concurrent-running-threshold", "0"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public long getQueryMaxScanBytes() {
         long value = Long.parseLong(getOptional("kylin.query.max-scan-bytes", "0"));
         return value > 0 ? value : Long.MAX_VALUE;
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public long getQueryMaxReturnRows() {
         return Integer.parseInt(this.getOptional("kylin.query.max-return-rows", "5000000"));
     }
@@ -1942,7 +2015,6 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.query.max-dimension-count-distinct", "5000000"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public Map<String, String> getUDFs() {
         Map<String, String> udfMap = Maps.newLinkedHashMap();
         udfMap.put("version", "org.apache.kylin.query.udf.VersionUDF");
@@ -1975,12 +2047,10 @@ public abstract class KylinConfigBase implements Serializable {
         return this.getOptional("kylin.query.schema-factory", "org.apache.kylin.query.schema.OLAPSchemaFactory");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getPushDownRunnerClassName() {
         return getOptional("kylin.query.pushdown.runner-class-name", "");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public List<String> getPushDownRunnerIds() {
         List<String> ids = Lists.newArrayList();
         String idsStr = getOptional("kylin.query.pushdown.runner.ids", "");
@@ -1992,7 +2062,6 @@ public abstract class KylinConfigBase implements Serializable {
         return ids;
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String[] getPushDownConverterClassNames() {
         return getOptionalStringArray("kylin.query.pushdown.converter-class-names",
                 new String[]{"org.apache.kylin.source.adhocquery.HivePushDownConverter"});
@@ -2002,7 +2071,6 @@ public abstract class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(this.getOptional("kylin.query.pushdown.cache-enabled", FALSE));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getJdbcUrl(String id) {
         if (null == id) {
             return getOptional("kylin.query.pushdown.jdbc.url", "");
@@ -2011,7 +2079,6 @@ public abstract class KylinConfigBase implements Serializable {
         }
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getJdbcDriverClass(String id) {
         if (null == id) {
             return getOptional("kylin.query.pushdown.jdbc.driver", "");
@@ -2020,7 +2087,6 @@ public abstract class KylinConfigBase implements Serializable {
         }
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getJdbcUsername(String id) {
         if (null == id) {
             return getOptional("kylin.query.pushdown.jdbc.username", "");
@@ -2029,7 +2095,6 @@ public abstract class KylinConfigBase implements Serializable {
         }
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getJdbcPassword(String id) {
         if (null == id) {
             return getOptional("kylin.query.pushdown.jdbc.password", "");
@@ -2038,7 +2103,6 @@ public abstract class KylinConfigBase implements Serializable {
         }
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getPoolMaxTotal(String id) {
         if (null == id) {
             return Integer.parseInt(
@@ -2051,7 +2115,6 @@ public abstract class KylinConfigBase implements Serializable {
         }
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getPoolMaxIdle(String id) {
         if (null == id) {
             return Integer.parseInt(
@@ -2064,7 +2127,6 @@ public abstract class KylinConfigBase implements Serializable {
         }
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getPoolMinIdle(String id) {
         if (null == id) {
             return Integer.parseInt(
@@ -2378,23 +2440,221 @@ public abstract class KylinConfigBase implements Serializable {
         return getOptional("kylin.tool.auto-migrate-cube.dest-config", "");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
-    public String getJdbcSourceAdaptor() {
-        return getOptional("kylin.source.jdbc.adaptor");
-    }
-
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
-    public boolean isLimitPushDownEnabled() {
-        return Boolean.parseBoolean(getOptional("kylin.storage.limit-push-down-enabled", TRUE));
-    }
-
     // ============================================================================
-    // Realtime streaming (Removed)
+    // jdbc metadata resource store
     // ============================================================================
 
+    public String getMetadataDialect() {
+        return getOptional("kylin.metadata.jdbc.dialect", "mysql");
+    }
+
+    public boolean isJsonAlwaysSmallCell() {
+        return Boolean.parseBoolean(getOptional("kylin.metadata.jdbc.json-always-small-cell", TRUE));
+    }
+
+    public int getSmallCellMetadataWarningThreshold() {
+        return Integer.parseInt(
+                getOptional("kylin.metadata.jdbc.small-cell-meta-size-warning-threshold", String.valueOf(100 << 20))); //100mb
+    }
+
+    public int getSmallCellMetadataErrorThreshold() {
+        return Integer.parseInt(
+                getOptional("kylin.metadata.jdbc.small-cell-meta-size-error-threshold", String.valueOf(1 << 30))); // 1gb
+    }
+
+    public int getJdbcResourceStoreMaxCellSize() {
+        return Integer.parseInt(getOptional("kylin.metadata.jdbc.max-cell-size", "1048576")); // 1mb
+    }
+
+    public String getJdbcSourceAdaptor() {
+        return getOptional("kylin.source.jdbc.adaptor");
+    }
+
+    public boolean isLimitPushDownEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.storage.limit-push-down-enabled", TRUE));
+    }
+
+    // ============================================================================
+    // Realtime streaming
+    // ============================================================================
+    public String getStreamingStoreClass() {
+        return getOptional("kylin.stream.store.class",
+                "org.apache.kylin.stream.core.storage.columnar.ColumnarSegmentStore");
+    }
+
+    public String getStreamingBasicCuboidJobDFSBlockSize() {
+        return getOptional("kylin.stream.job.dfs.block.size", String.valueOf(16 * 1024 * 1024));
+    }
+
+    public String getStreamingIndexPath() {
+        return getOptional("kylin.stream.index.path", "stream_index");
+    }
+
+    public int getStreamingCubeConsumerTasksNum() {
+        return Integer.parseInt(getOptional("kylin.stream.cube-num-of-consumer-tasks", "3"));
+    }
+
+    public int getStreamingCubeWindowInSecs() {
+        return Integer.parseInt(getOptional("kylin.stream.cube.window", "3600"));
+    }
+
+    public int getStreamingCubeDurationInSecs() {
+        return Integer.parseInt(getOptional("kylin.stream.cube.duration", "7200"));
+    }
+
+    public int getStreamingCubeMaxDurationInSecs() {
+        return Integer.parseInt(getOptional("kylin.stream.cube.duration.max", "43200"));
+    }
+
+    public int getStreamingCheckPointFileMaxNum() {
+        return Integer.parseInt(getOptional("kylin.stream.checkpoint.file.max.num", "5"));
+    }
+
+    public int getStreamingCheckPointIntervalsInSecs() {
+        return Integer.parseInt(getOptional("kylin.stream.index.checkpoint.intervals", "300"));
+    }
+
+    public int getStreamingIndexMaxRows() {
+        return Integer.parseInt(getOptional("kylin.stream.index.maxrows", "50000"));
+    }
+
+    public int getStreamingMaxImmutableSegments() {
+        return Integer.parseInt(getOptional("kylin.stream.immutable.segments.max.num", "100"));
+    }
+
+    public boolean isStreamingConsumeFromLatestOffsets() {
+        return Boolean.parseBoolean(getOptional("kylin.stream.consume.offsets.latest", "true"));
+    }
+
+    public String getStreamingNode() {
+        return getOptional("kylin.stream.node", null);
+    }
+
+    public Map<String, String> getStreamingNodeProperties() {
+        return getPropertiesByPrefix("kylin.stream.node");
+    }
+
+    public String getStreamingMetadataStoreType() {
+        return getOptional("kylin.stream.metadata.store.type", "zk");
+    }
+
+    public String getStreamingSegmentRetentionPolicy() {
+        return getOptional("kylin.stream.segment.retention.policy", "fullBuild");
+    }
+
+    public String getStreamingAssigner() {
+        return getOptional("kylin.stream.assigner", "DefaultAssigner");
+    }
+
+    public int getCoordinatorHttpClientTimeout() {
+        return Integer.parseInt(getOptional("kylin.stream.coordinator.client.timeout.millsecond", "5000"));
+    }
+
+    public int getReceiverHttpClientTimeout() {
+        return Integer.parseInt(getOptional("kylin.stream.receiver.client.timeout.millsecond", "5000"));
+    }
+
+    public int getStreamingReceiverHttpMaxThreads() {
+        return Integer.parseInt(getOptional("kylin.stream.receiver.http.max.threads", "200"));
+    }
+
+    public int getStreamingReceiverHttpMinThreads() {
+        return Integer.parseInt(getOptional("kylin.stream.receiver.http.min.threads", "10"));
+    }
+
+    public int getStreamingReceiverQueryCoreThreads() {
+        return Integer.parseInt(getOptional("kylin.stream.receiver.query-core-threads", "50"));
+    }
+
+    public int getStreamingReceiverQueryMaxThreads() {
+        return Integer.parseInt(getOptional("kylin.stream.receiver.query-max-threads", "200"));
+    }
+
+    public int getStreamingReceiverUseThreadsPerQuery() {
+        return Integer.parseInt(getOptional("kylin.stream.receiver.use-threads-per-query", "8"));
+    }
+
+    public int getStreamingRPCHttpConnTimeout() {
+        return Integer.parseInt(getOptional("kylin.stream.rpc.http.connect.timeout", "10000"));
+    }
+
+    public int getStreamingRPCHttpReadTimeout() {
+        return Integer.parseInt(getOptional("kylin.stream.rpc.http.read.timeout", "60000"));
+    }
+
+    public boolean isStreamingBuildAdditionalCuboids() {
+        return Boolean.parseBoolean(getOptional("kylin.stream.build.additional.cuboids", "false"));
+    }
+
+    public Map<String, String> getStreamingSegmentRetentionPolicyProperties(String policyName) {
+        return getPropertiesByPrefix("kylin.stream.segment.retention.policy." + policyName + ".");
+    }
+
+    public int getStreamingMaxFragmentsInSegment() {
+        return Integer.parseInt(getOptional("kylin.stream.segment-max-fragments", "50"));
+    }
+
+    public int getStreamingMinFragmentsInSegment() {
+        return Integer.parseInt(getOptional("kylin.stream.segment-min-fragments", "15"));
+    }
+
+    public int getStreamingMaxFragmentSizeInMb() {
+        return Integer.parseInt(getOptional("kylin.stream.max-fragment-size-mb", "300"));
+    }
+
+    public boolean isStreamingFragmentsAutoMergeEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.stream.fragments-auto-merge-enable", "true"));
+    }
+
+    public boolean isStreamingConcurrentScanEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.stream.segment.concurrent.scan", "false"));
+    }
+
+    public boolean isStreamingStandAloneMode() {
+        return Boolean.parseBoolean(getOptional("kylin.stream.stand-alone.mode", "false"));
+    }
+
+    public boolean isNewCoordinatorEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.stream.new.coordinator-enabled", "true"));
+    }
+
+    public String getLocalStorageImpl() {
+        return getOptional("kylin.stream.settled.storage", null);
+    }
+
+    public String getStreamMetrics() {
+        return getOptional("kylin.stream.metrics.option", "");
+    }
+
+    /**
+     * whether to print encode integer value for count distinct string value, only for debug/test purpose
+     */
+    public boolean isPrintRealtimeDictEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.stream.print-realtime-dict-enabled", "false"));
+    }
+
+    public long getStreamMetricsInterval() {
+        return Long.parseLong(getOptional("kylin.stream.metrics.interval", "5"));
+    }
+
+    /**
+     * whether realtime query should add timezone offset by kylin's web-timezone, please refer to KYLIN-4010 for detail
+     */
+    public String getStreamingDerivedTimeTimezone() {
+        return (getOptional("kylin.stream.event.timezone", ""));
+    }
+
+    public boolean isAutoResubmitDiscardJob() {
+        return Boolean.parseBoolean(getOptional("kylin.stream.auto-resubmit-after-discard-enabled", "true"));
+    }
+
+    public String getHiveDatabaseLambdaCube() {
+        return this.getOptional("kylin.stream.hive.database-for-lambda-cube", DEFAULT);
+    }
+
+    // ============================================================================
+    // Health Check CLI
     // ============================================================================
-    // Health Check CLI
-    // ============================================================================
 
     public int getWarningSegmentNum() {
         return Integer.parseInt(getOptional("kylin.tool.health-check.warning-segment-num", "-1"));
@@ -2417,7 +2677,7 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
     // ============================================================================
-    // ENGINE.SPARK (Kylin 4)
+    // Kylin 4.x related
     // ============================================================================
 
     public String getKylinParquetJobJarPath() {
@@ -2516,106 +2776,22 @@ public abstract class KylinConfigBase implements Serializable {
         return new Path(path);
     }
 
-    public Map<String, String> getSparkConfigOverride() {
-        return getPropertiesByPrefix("kylin.engine.spark-conf.");
-    }
-
-    public boolean isAutoSetSparkConf() {
-        return Boolean.parseBoolean(getOptional("kylin.spark-conf.auto.prior", "true"));
-    }
-
-    public String getBuildConf() {
-        return getOptional("kylin.engine.submit-hadoop-conf-dir", "");
-    }
-
-    public boolean isJobLogPrintEnabled() {
-        return Boolean.parseBoolean(getOptional("kylin.job.log-print-enabled", "true"));
-    }
-
-    @ConfigTag(ConfigTag.Tag.DEBUG_HACK)
-    public String getClusterInfoFetcherClassName() {
-        return getOptional("kylin.engine.spark.cluster-info-fetcher-class-name",
-                "org.apache.kylin.cluster.YarnInfoFetcher");
-    }
-
-    @ConfigTag(ConfigTag.Tag.DEBUG_HACK)
-    public String getSparkMergeClassName() {
-        return getOptional("kylin.engine.spark.merge-class-name", "org.apache.kylin.engine.spark.job.CubeMergeJob");
-    }
-
-    public String getParentDatasetStorageLevel() {
-        return getOptional("kylin.engine.spark.parent-dataset.storage.level", "NONE");
-    }
-
-    public int getMaxParentDatasetPersistCount() {
-        return Integer.parseInt(getOptional("kylin.engine.spark.parent-dataset.max.persist.count", "1"));
-    }
-
-    public int getRepartitionNumAfterEncode() {
-        return Integer.valueOf(getOptional("kylin.engine.spark.dataset.repartition.num.after.encoding", "0"));
-    }
-
-
-    public int getSparkEngineMaxRetryTime() {
-        return Integer.parseInt(getOptional("kylin.engine.max-retry-time", "3"));
-    }
-
-    public double getSparkEngineRetryMemoryGradient() {
-        return Double.parseDouble(getOptional("kylin.engine.retry-memory-gradient", "1.5"));
-    }
-
-    public double getSparkEngineRetryOverheadMemoryGradient() {
-        return Double.parseDouble(getOptional("kylin.engine.retry-overheadMemory-gradient", "0.2"));
-    }
-
-    public Double getMaxAllocationResourceProportion() {
-        return Double.parseDouble(getOptional("kylin.engine.max-allocation-proportion", "0.9"));
-    }
-
-    public int getSparkEngineBaseExecutorInstances() {
-        return Integer.parseInt(getOptional("kylin.engine.base-executor-instance", "5"));
-    }
-
-    public String getSparkEngineRequiredTotalCores() {
-        return getOptional("kylin.engine.spark.required-cores", "1");
-    }
-
-    public String getSparkEngineExecutorInstanceStrategy() {
-        return getOptional("kylin.engine.executor-instance-strategy", "100,2,500,3,1000,4");
+    public boolean isSnapshotParallelBuildEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.snapshot.parallel-build-enabled", "true"));
     }
 
-    public int getSnapshotShardSizeMB() {
-        return Integer.parseInt(getOptional("kylin.snapshot.shard-size-mb", "128"));
+    public boolean isUTEnv() {
+        return "UT".equals(getDeployEnv());
     }
 
-    /***
-     * Global dictionary will be split into several buckets. To encode a column to int value more
-     * efficiently, source dataset will be repartitioned by the to-be encoded column to the same
-     * amount of partitions as the dictionary's bucket size.
-     *
-     * It sometimes bring side effect, because repartitioning by a single column is more likely to cause
-     * serious data skew, causing one task takes the majority of time in first layer's cuboid building.
-     *
-     * When faced with this case, you can try repartitioning encoded dataset by all
-     * RowKey columns to avoid data skew. The repartition size is default to max bucket
-     * size of all dictionaries, but you can also set to other flexible value by this option:
-     * 'kylin.engine.spark.dataset.repartition.num.after.encoding'
-     ***/
-    public boolean rePartitionEncodedDatasetWithRowKey() {
-        return Boolean.valueOf(getOptional("kylin.engine.spark.repartition.encoded.dataset", "false"));
+    public boolean isLocalEnv() {
+        return "LOCAL".equals(getDeployEnv());
     }
 
-    /**
-     * If we should calculate cuboid statistics for each segment, which is needed for cube planner phase two
-     */
-    public boolean isSegmentStatisticsEnabled() {
-        return Boolean.parseBoolean(this.getOptional("kylin.engine.segment-statistics-enabled", "false"));
+    public int snapshotParallelBuildTimeoutSeconds() {
+        return Integer.parseInt(getOptional("kylin.snapshot.parallel-build-timeout-seconds", "3600"));
     }
 
-    // ============================================================================
-    // STORAGE.PARQUET
-    // ============================================================================
-
     public String getStorageProvider() {
         return getOptional("kylin.storage.provider", "org.apache.kylin.common.storage.DefaultStorageProvider");
     }
@@ -2666,21 +2842,61 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.valueOf(getOptional("kylin.storage.columnar.dfs-replication", "3"));
     }
 
-    // ============================================================================
-    // Query Engine (Sparder)
-    // ============================================================================
+    public boolean isAutoSetSparkConf() {
+        return Boolean.parseBoolean(getOptional("kylin.spark-conf.auto.prior", "true"));
+    }
 
-    /**
-     * Get the sparder app name, default value is: 'sparder_on_localhost-7070'
-     */
-    public String getSparderAppName() {
-        String customSparderAppName = getOptional("kylin.query.sparder-context.app-name", "");
-        if (StringUtils.isEmpty(customSparderAppName)) {
-            customSparderAppName =
-                    "sparder_on_" + getServerRestAddress().replaceAll(":", "-");
-        }
-        return customSparderAppName;
+    public String getBuildConf() {
+        return getOptional("kylin.engine.submit-hadoop-conf-dir", "");
+    }
+
+    public boolean isJobLogPrintEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.job.log-print-enabled", "true"));
+    }
+
+    @ConfigTag(ConfigTag.Tag.DEBUG_HACK)
+    public String getClusterInfoFetcherClassName() {
+        return getOptional("kylin.engine.spark.cluster-info-fetcher-class-name",
+                "org.apache.kylin.cluster.YarnInfoFetcher");
+    }
+
+    @ConfigTag(ConfigTag.Tag.DEBUG_HACK)
+    public String getSparkMergeClassName() {
+        return getOptional("kylin.engine.spark.merge-class-name", "org.apache.kylin.engine.spark.job.CubeMergeJob");
+    }
+
+    public int getSparkEngineMaxRetryTime() {
+        return Integer.parseInt(getOptional("kylin.engine.max-retry-time", "3"));
     }
+
+    public double getSparkEngineRetryMemoryGradient() {
+        return Double.parseDouble(getOptional("kylin.engine.retry-memory-gradient", "1.5"));
+    }
+
+    public double getSparkEngineRetryOverheadMemoryGradient() {
+        return Double.parseDouble(getOptional("kylin.engine.retry-overheadMemory-gradient", "0.2"));
+    }
+
+    public Double getMaxAllocationResourceProportion() {
+        return Double.parseDouble(getOptional("kylin.engine.max-allocation-proportion", "0.9"));
+    }
+
+    public int getSparkEngineBaseExecutorInstances() {
+        return Integer.parseInt(getOptional("kylin.engine.base-executor-instance", "5"));
+    }
+
+    public String getSparkEngineRequiredTotalCores() {
+        return getOptional("kylin.engine.spark.required-cores", "1");
+    }
+
+    public String getSparkEngineExecutorInstanceStrategy() {
+        return getOptional("kylin.engine.executor-instance-strategy", "100,2,500,3,1000,4");
+    }
+
+    public int getSnapshotShardSizeMB() {
+        return Integer.parseInt(getOptional("kylin.snapshot.shard-size-mb", "128"));
+    }
+
     /**
      * driver memory that can be used by join(mostly BHJ)
      */
@@ -2852,22 +3068,25 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.parseInt(this.getOptional("kylin.canary.sparder-context-period-min", "3"));
     }
 
+    /**
+     * If we should calculate cuboid statistics for each segment, which is needed for cube planner phase two
+     */
+    public boolean isSegmentStatisticsEnabled() {
+        return Boolean.parseBoolean(this.getOptional("kylin.engine.segment-statistics-enabled", "false"));
+    }
 
     // ============================================================================
     // Spark with Kerberos
     // ============================================================================
 
-    @ConfigTag(ConfigTag.Tag.NOT_CLEAR)
     public Boolean isKerberosEnabled() {
         return Boolean.valueOf(getOptional("kylin.kerberos.enabled", FALSE));
     }
 
-    @ConfigTag(ConfigTag.Tag.NOT_CLEAR)
     public String getKerberosKeytab() {
         return getOptional("kylin.kerberos.keytab", "");
     }
 
-    @ConfigTag(ConfigTag.Tag.NOT_CLEAR)
     public String getKerberosKeytabPath() {
         return KylinConfig.getKylinConfDir() + File.separator + getKerberosKeytab();
     }
@@ -2917,8 +3136,64 @@ public abstract class KylinConfigBase implements Serializable {
         return KylinConfig.getKylinConfDir() + File.separator + getKerberosJaasConf();
     }
 
-    @ConfigTag(ConfigTag.Tag.NOT_CLEAR)
     public String getKerberosPrincipal() {
         return getOptional("kylin.kerberos.principal");
     }
+
+    public String getParentDatasetStorageLevel() {
+        return getOptional("kylin.engine.spark.parent-dataset.storage.level", "NONE");
+    }
+
+    public int getMaxParentDatasetPersistCount() {
+        return Integer.parseInt(getOptional("kylin.engine.spark.parent-dataset.max.persist.count", "1"));
+    }
+
+    public int getRepartitionNumAfterEncode() {
+        return Integer.valueOf(getOptional("kylin.engine.spark.dataset.repartition.num.after.encoding", "0"));
+    }
+
+    /***
+     * Global dictionary will be split into several buckets. To encode a column to int value more
+     * efficiently, source dataset will be repartitioned by the to-be encoded column to the same
+     * amount of partitions as the dictionary's bucket size.
+     *
+     * It sometimes bring side effect, because repartitioning by a single column is more likely to cause
+     * serious data skew, causing one task takes the majority of time in first layer's cuboid building.
+     *
+     * When faced with this case, you can try repartitioning encoded dataset by all
+     * RowKey columns to avoid data skew. The repartition size is default to max bucket
+     * size of all dictionaries, but you can also set to other flexible value by this option:
+     * 'kylin.engine.spark.dataset.repartition.num.after.encoding'
+     ***/
+    public boolean rePartitionEncodedDatasetWithRowKey() {
+        return Boolean.valueOf(getOptional("kylin.engine.spark.repartition.encoded.dataset", "false"));
+    }
+
+    /*
+     * Detect dataset skew in dictionary encode step.
+     * */
+    public boolean detectDataSkewInDictEncodingEnabled() {
+        return Boolean.valueOf(getOptional("kylin.dictionary.detect.data.skew.in.encoding", "false"));
+    }
+
+    /*
+    * In some data skew cases, the repartition step during dictionary encoding will be slow.
+    * We can choose to sample from the dataset to detect skewed. This configuration is used to set the sample rate.
+    * */
+    public double sampleRateInEncodingSkewDetection() {
+        return Double.valueOf(getOptional("kylin.dictionary.detect.data.skew.sample.rate", "0.1"));
+    }
+
+    /*
+    * In KYLIN4, dictionaries are hashed into several buckets, column data are repartitioned by the same hash algorithm
+    * during encoding step too. In data skew cases, the repartition step will be very slow. Kylin will automatically
+    * sample from the source to detect skewed data and repartition these skewed data to random partitions.
+    * This configuration is used to set the skew data threshhold, valued from 0 to 1.
+    * e.g.
+    *   if you set this value to 0.05, for each value that takes up more than 5% percent of the total will be regarded
+    *   as skew data, as a result the skewed data will be no more than 20 records
+    * */
+    public double skewPercentageThreshHold() {
+        return Double.valueOf(getOptional("kylin.dictionary.data.skew.percentage.threshhold", "0.05"));
+    }
 }
diff --git a/core-common/src/main/java/org/apache/kylin/common/annotation/ConfigTag.java b/core-common/src/main/java/org/apache/kylin/common/annotation/ConfigTag.java
index 965d252..53a6967 100644
--- a/core-common/src/main/java/org/apache/kylin/common/annotation/ConfigTag.java
+++ b/core-common/src/main/java/org/apache/kylin/common/annotation/ConfigTag.java
@@ -39,9 +39,6 @@ public @interface ConfigTag {
          */
         DEPRECATED,
 
-        /**
-         * Not tested/verified
-         */
         NOT_CLEAR,
 
         /**
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index 9b782de..e5facf4 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -101,6 +101,10 @@ public class HadoopUtil {
         return getFileSystem(workingPath, conf);
     }
 
+    public static FileSystem getReadFileSystem() throws IOException {
+        return getFileSystem(KylinConfig.getInstanceFromEnv().getReadHdfsWorkingDirectory());
+    }
+
     public static FileSystem getFileSystem(String path) {
         return getFileSystem(new Path(makeURI(path)));
     }
diff --git a/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java b/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java
index 29f92a6..b4ac16b 100644
--- a/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java
@@ -76,16 +76,16 @@ public class KylinConfigTest extends HotLoadKylinPropertiesTestCase {
         assertEquals("1234", configExt.getOptional("1234"));
     }
 
-//    @Test
-//    public void testPropertiesHotLoad() {
-//        KylinConfig config = KylinConfig.getInstanceFromEnv();
-//        assertEquals("whoami@kylin.apache.org", config.getKylinOwner());
-//
-//        updateProperty("kylin.storage.hbase.owner-tag", "kylin@kylin.apache.org");
-//        KylinConfig.getInstanceFromEnv().reloadFromSiteProperties();
-//
-//        assertEquals("kylin@kylin.apache.org", config.getKylinOwner());
-//    }
+    @Test
+    public void testPropertiesHotLoad() {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        assertEquals("whoami@kylin.apache.org", config.getKylinOwner());
+
+        updateProperty("kylin.storage.hbase.owner-tag", "kylin@kylin.apache.org");
+        KylinConfig.getInstanceFromEnv().reloadFromSiteProperties();
+
+        assertEquals("kylin@kylin.apache.org", config.getKylinOwner());
+    }
 
     @Test
     public void testGetMetadataUrlPrefix() {
diff --git a/core-cube/pom.xml b/core-cube/pom.xml
index 91be01b..c024c06 100644
--- a/core-cube/pom.xml
+++ b/core-cube/pom.xml
@@ -38,10 +38,10 @@
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-core-metadata</artifactId>
         </dependency>
-<!--        <dependency>-->
-<!--            <groupId>org.apache.kylin</groupId>-->
-<!--            <artifactId>kylin-core-dictionary</artifactId>-->
-<!--        </dependency>-->
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-dictionary</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-shaded-guava</artifactId>
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 3e99268..585a37a 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -27,6 +27,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -40,23 +41,34 @@ import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.persistence.WriteConflictException;
 import org.apache.kylin.common.util.AutoReadWriteLock;
 import org.apache.kylin.common.util.AutoReadWriteLock.AutoLock;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeDescTiretreeGlobalDomainDictUtil;
 import org.apache.kylin.cube.model.SnapshotTableDesc;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.LookupProviderFactory;
+import org.apache.kylin.dict.lookup.SnapshotManager;
+import org.apache.kylin.dict.lookup.SnapshotTable;
 import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
 import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
 import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
 import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.IEngineAware;
 import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.PartitionDesc;
 import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.SegmentRange.TSRange;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.Segments;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.project.RealizationEntry;
@@ -65,6 +77,8 @@ import org.apache.kylin.metadata.realization.IRealizationProvider;
 import org.apache.kylin.metadata.realization.RealizationRegistry;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.source.IReadableTable;
+import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.source.SourcePartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -115,6 +129,7 @@ public class CubeManager implements IRealizationProvider {
 
     // a few inner classes to group related methods
     private SegmentAssist segAssist = new SegmentAssist();
+    private DictionaryAssist dictAssist = new DictionaryAssist();
 
     private Random ran = new Random();
 
@@ -541,24 +556,27 @@ public class CubeManager implements IRealizationProvider {
     }
 
     public ILookupTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) {
-        return null;
+        String tableName = join.getPKSide().getTableIdentity();
+        CubeDesc cubeDesc = cubeSegment.getCubeDesc();
+        SnapshotTableDesc snapshotTableDesc = cubeDesc.getSnapshotTableDesc(tableName);
+        return getInMemLookupTable(cubeSegment, join, snapshotTableDesc);
     }
 
-//    private ILookupTable getInMemLookupTable(CubeSegment cubeSegment, JoinDesc join,
-//            SnapshotTableDesc snapshotTableDesc) {
-//        String tableName = join.getPKSide().getTableIdentity();
-//        String snapshotResPath = getSnapshotResPath(cubeSegment, tableName, snapshotTableDesc);
-//        String[] pkCols = join.getPrimaryKey();
-//
-//        try {
-//            SnapshotTable snapshot = getSnapshotManager().getSnapshotTable(snapshotResPath);
-//            TableDesc tableDesc = getMetadataManager().getTableDesc(tableName, cubeSegment.getProject());
-//            return LookupProviderFactory.getInMemLookupTable(tableDesc, pkCols, snapshot);
-//        } catch (IOException e) {
-//            throw new IllegalStateException(
-//                    "Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e);
-//        }
-//    }
+    private ILookupTable getInMemLookupTable(CubeSegment cubeSegment, JoinDesc join,
+            SnapshotTableDesc snapshotTableDesc) {
+        String tableName = join.getPKSide().getTableIdentity();
+        String snapshotResPath = getSnapshotResPath(cubeSegment, tableName, snapshotTableDesc);
+        String[] pkCols = join.getPrimaryKey();
+
+        try {
+            SnapshotTable snapshot = getSnapshotManager().getSnapshotTable(snapshotResPath);
+            TableDesc tableDesc = getMetadataManager().getTableDesc(tableName, cubeSegment.getProject());
+            return LookupProviderFactory.getInMemLookupTable(tableDesc, pkCols, snapshot);
+        } catch (IOException e) {
+            throw new IllegalStateException(
+                    "Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e);
+        }
+    }
 
     private String getSnapshotResPath(CubeSegment cubeSegment, String tableName, SnapshotTableDesc snapshotTableDesc) {
         String snapshotResPath;
@@ -576,11 +594,20 @@ public class CubeManager implements IRealizationProvider {
 
     @VisibleForTesting
     /*private*/ String generateStorageLocation(int engineType) {
+        String namePrefix = config.getHBaseTableNamePrefix();
+        String namespace = config.getHBaseStorageNameSpace();
         String tableName = "";
         do {
             StringBuffer sb = new StringBuffer();
             int identifierLength = HBASE_TABLE_LENGTH;
+            if (engineType != IEngineAware.ID_SPARK_II) {
+                if ((namespace.equals("default") || namespace.equals("")) == false) {
+                    sb.append(namespace).append(":");
+                }
+                sb.append(namePrefix);
+            } else {
                 identifierLength = PARQUET_IDENTIFIER_LENGTH;
+            }
             for (int i = 0; i < identifierLength; i++) {
                 sb.append(ALPHA_NUM.charAt(ran.nextInt(ALPHA_NUM.length())));
             }
@@ -601,6 +628,14 @@ public class CubeManager implements IRealizationProvider {
         return TableMetadataManager.getInstance(config);
     }
 
+    private DictionaryManager getDictionaryManager() {
+        return DictionaryManager.getInstance(config);
+    }
+
+    private SnapshotManager getSnapshotManager() {
+        return SnapshotManager.getInstance(config);
+    }
+
     private ResourceStore getStore() {
         return ResourceStore.getStore(this.config);
     }
@@ -1090,143 +1125,143 @@ public class CubeManager implements IRealizationProvider {
     // Dictionary/Snapshot related methods
     // ============================================================================
 
-//    public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable)
-//            throws IOException {
-//        return dictAssist.buildDictionary(cubeSeg, col, inpTable);
-//    }
-//
-//    public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable,
-//            Dictionary<String> dict) throws IOException {
-//        return dictAssist.saveDictionary(cubeSeg, col, inpTable, dict);
-//    }
-//
-//    /**
-//     * return null if no dictionary for given column
-//     */
-//    public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) {
-//        return dictAssist.getDictionary(cubeSeg, col);
-//    }
-//
-//    public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable, String uuid) throws IOException {
-//        return dictAssist.buildSnapshotTable(cubeSeg, lookupTable, uuid);
-//    }
-//
-//    private TableMetadataManager getMetadataManager() {
-//        return TableMetadataManager.getInstance(config);
-//    }
-//
-//    private class DictionaryAssist {
-//        public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable)
-//                throws IOException {
-//            CubeDesc cubeDesc = cubeSeg.getCubeDesc();
-//            if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
-//                return null;
-//
-//            String builderClass = cubeDesc.getDictionaryBuilderClass(col);
-//            DictionaryInfo dictInfo = getDictionaryManager().buildDictionary(col, inpTable, builderClass);
-//
-//            saveDictionaryInfo(cubeSeg, col, dictInfo);
-//            return dictInfo;
-//        }
-//
-//        public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable,
-//                Dictionary<String> dict) throws IOException {
-//            CubeDesc cubeDesc = cubeSeg.getCubeDesc();
-//            if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
-//                return null;
-//
-//            DictionaryInfo dictInfo = getDictionaryManager().saveDictionary(col, inpTable, dict);
-//
-//            saveDictionaryInfo(cubeSeg, col, dictInfo);
-//            return dictInfo;
-//        }
-//
-//        private void saveDictionaryInfo(CubeSegment cubeSeg, TblColRef col, DictionaryInfo dictInfo)
-//                throws IOException {
-//            if (dictInfo == null)
-//                return;
-//
-//            // work on copy instead of cached objects
-//            CubeInstance cubeCopy = cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy
-//            CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid());
-//
-//            Dictionary<?> dict = dictInfo.getDictionaryObject();
-//            segCopy.putDictResPath(col, dictInfo.getResourcePath());
-//            segCopy.getRowkeyStats().add(new Object[] { col.getIdentity(), dict.getSize(), dict.getSizeOfId() });
-//
-//            CubeUpdate update = new CubeUpdate(cubeCopy);
-//            update.setToUpdateSegs(segCopy);
-//            updateCube(update);
-//        }
-//
-//        /**
-//         * return null if no dictionary for given column
-//         */
-//        @SuppressWarnings("unchecked")
-//        public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) {
-//            DictionaryInfo info = null;
-//            String dictResPath = null;
-//            try {
-//                DictionaryManager dictMgr = getDictionaryManager();
-//
-//                //tiretree global domain dic
-//                List<CubeDescTiretreeGlobalDomainDictUtil.GlobalDict> globalDicts = cubeSeg.getCubeDesc().listDomainDict();
-//                if (!globalDicts.isEmpty()) {
-//                    dictResPath = CubeDescTiretreeGlobalDomainDictUtil.globalReuseDictPath(cubeSeg.getConfig(), col, cubeSeg.getCubeDesc());
-//                }
-//
-//                if (Objects.isNull(dictResPath)){
-//                    dictResPath = cubeSeg.getDictResPath(col);
-//                }
-//
-//                if (dictResPath == null)
-//                    return null;
-//
-//                info = dictMgr.getDictionaryInfo(dictResPath);
-//                if (info == null)
-//                    throw new IllegalStateException("No dictionary found by " + dictResPath
-//                            + ", invalid cube state; cube segment" + cubeSeg + ", col " + col);
-//            } catch (IOException e) {
-//                throw new IllegalStateException("Failed to get dictionary for cube segment" + cubeSeg + ", col" + col,
-//                        e);
-//            }
-//            return (Dictionary<String>) info.getDictionaryObject();
-//        }
-//
-//        public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable, String uuid)
-//                throws IOException {
-//            // work on copy instead of cached objects
-//            CubeInstance cubeCopy = cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy
-//            CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid());
-//
-//            TableMetadataManager metaMgr = getTableManager();
-//            SnapshotManager snapshotMgr = getSnapshotManager();
-//
-//            TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable, segCopy.getProject()));
-//            IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc, uuid);
-//            SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc, cubeSeg.getConfig());
-//
-//            CubeDesc cubeDesc = cubeSeg.getCubeDesc();
-//            if (!cubeDesc.isGlobalSnapshotTable(lookupTable)) {
-//                segCopy.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
-//                CubeUpdate update = new CubeUpdate(cubeCopy);
-//                update.setToUpdateSegs(segCopy);
-//                updateCube(update);
-//
-//                // Update the input cubeSeg after the resource store updated
-//                cubeSeg.putSnapshotResPath(lookupTable, segCopy.getSnapshotResPath(lookupTable));
-//            } else {
-//                CubeUpdate cubeUpdate = new CubeUpdate(cubeCopy);
-//                Map<String, String> map = Maps.newHashMap();
-//                map.put(lookupTable, snapshot.getResourcePath());
-//                cubeUpdate.setUpdateTableSnapshotPath(map);
-//                updateCube(cubeUpdate);
-//
-//                cubeSeg.getCubeInstance().putSnapshotResPath(lookupTable, snapshot.getResourcePath());
-//            }
-//            return snapshot;
-//        }
-//    }
+    public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable)
+            throws IOException {
+        return dictAssist.buildDictionary(cubeSeg, col, inpTable);
+    }
+
+    public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable,
+            Dictionary<String> dict) throws IOException {
+        return dictAssist.saveDictionary(cubeSeg, col, inpTable, dict);
+    }
+
+    /**
+     * return null if no dictionary for given column
+     */
+    public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) {
+        return dictAssist.getDictionary(cubeSeg, col);
+    }
+
+    public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable, String uuid) throws IOException {
+        return dictAssist.buildSnapshotTable(cubeSeg, lookupTable, uuid);
+    }
+
+    private TableMetadataManager getMetadataManager() {
+        return TableMetadataManager.getInstance(config);
+    }
+
+    private class DictionaryAssist {
+        public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable)
+                throws IOException {
+            CubeDesc cubeDesc = cubeSeg.getCubeDesc();
+            if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
+                return null;
+
+            String builderClass = cubeDesc.getDictionaryBuilderClass(col);
+            DictionaryInfo dictInfo = getDictionaryManager().buildDictionary(col, inpTable, builderClass);
+
+            saveDictionaryInfo(cubeSeg, col, dictInfo);
+            return dictInfo;
+        }
+
+        public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable,
+                Dictionary<String> dict) throws IOException {
+            CubeDesc cubeDesc = cubeSeg.getCubeDesc();
+            if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
+                return null;
+
+            DictionaryInfo dictInfo = getDictionaryManager().saveDictionary(col, inpTable, dict);
+
+            saveDictionaryInfo(cubeSeg, col, dictInfo);
+            return dictInfo;
+        }
+
+        private void saveDictionaryInfo(CubeSegment cubeSeg, TblColRef col, DictionaryInfo dictInfo)
+                throws IOException {
+            if (dictInfo == null)
+                return;
+
+            // work on copy instead of cached objects
+            CubeInstance cubeCopy = cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy
+            CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid());
+
+            Dictionary<?> dict = dictInfo.getDictionaryObject();
+            segCopy.putDictResPath(col, dictInfo.getResourcePath());
+            segCopy.getRowkeyStats().add(new Object[] { col.getIdentity(), dict.getSize(), dict.getSizeOfId() });
+
+            CubeUpdate update = new CubeUpdate(cubeCopy);
+            update.setToUpdateSegs(segCopy);
+            updateCube(update);
+        }
+
+        /**
+         * return null if no dictionary for given column
+         */
+        @SuppressWarnings("unchecked")
+        public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) {
+            DictionaryInfo info = null;
+            String dictResPath = null;
+            try {
+                DictionaryManager dictMgr = getDictionaryManager();
+
+                //tiretree global domain dic
+                List<CubeDescTiretreeGlobalDomainDictUtil.GlobalDict> globalDicts = cubeSeg.getCubeDesc().listDomainDict();
+                if (!globalDicts.isEmpty()) {
+                    dictResPath = CubeDescTiretreeGlobalDomainDictUtil.globalReuseDictPath(cubeSeg.getConfig(), col, cubeSeg.getCubeDesc());
+                }
+
+                if (Objects.isNull(dictResPath)){
+                    dictResPath = cubeSeg.getDictResPath(col);
+                }
+
+                if (dictResPath == null)
+                    return null;
+
+                info = dictMgr.getDictionaryInfo(dictResPath);
+                if (info == null)
+                    throw new IllegalStateException("No dictionary found by " + dictResPath
+                            + ", invalid cube state; cube segment" + cubeSeg + ", col " + col);
+            } catch (IOException e) {
+                throw new IllegalStateException("Failed to get dictionary for cube segment" + cubeSeg + ", col" + col,
+                        e);
+            }
+            return (Dictionary<String>) info.getDictionaryObject();
+        }
+
+        public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable, String uuid)
+                throws IOException {
+            // work on copy instead of cached objects
+            CubeInstance cubeCopy = cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy
+            CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid());
+
+            TableMetadataManager metaMgr = getTableManager();
+            SnapshotManager snapshotMgr = getSnapshotManager();
+
+            TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable, segCopy.getProject()));
+            IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc, uuid);
+            SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc, cubeSeg.getConfig());
+
+            CubeDesc cubeDesc = cubeSeg.getCubeDesc();
+            if (!cubeDesc.isGlobalSnapshotTable(lookupTable)) {
+                segCopy.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
+                CubeUpdate update = new CubeUpdate(cubeCopy);
+                update.setToUpdateSegs(segCopy);
+                updateCube(update);
+
+                // Update the input cubeSeg after the resource store updated
+                cubeSeg.putSnapshotResPath(lookupTable, segCopy.getSnapshotResPath(lookupTable));
+            } else {
+                CubeUpdate cubeUpdate = new CubeUpdate(cubeCopy);
+                Map<String, String> map = Maps.newHashMap();
+                map.put(lookupTable, snapshot.getResourcePath());
+                cubeUpdate.setUpdateTableSnapshotPath(map);
+                updateCube(cubeUpdate);
+
+                cubeSeg.getCubeInstance().putSnapshotResPath(lookupTable, snapshot.getResourcePath());
+            }
+            return snapshot;
+        }
+    }
 
     /**
      * To keep "select * from LOOKUP_TABLE" has consistent and latest result, we manually choose
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 5205cc5..c32da70 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -371,24 +371,26 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
         this.storageLocationIdentifier = storageLocationIdentifier;
     }
 
-//    public Map<TblColRef, Dictionary<String>> buildDictionaryMap() {
-//        Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
-//        for (TblColRef col : getCubeDesc().getAllColumnsHaveDictionary()) {
-//            result.put(col, (Dictionary<String>) getDictionary(col));
-//        }
-//        return result;
-//    }
-//
-//    public Map<TblColRef, Dictionary<String>> buildGlobalDictionaryMap(int globalColumnsSize) {
-//        Map<TblColRef, Dictionary<String>> result = Maps.newHashMapWithExpectedSize(globalColumnsSize);
-//        for (TblColRef col : getCubeDesc().getAllGlobalDictColumns()) {
-//            result.put(col, getDictionary(col));
-//        }
-//        return result;
-//    }
+    public Map<TblColRef, Dictionary<String>> buildDictionaryMap() {
+        Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
+        for (TblColRef col : getCubeDesc().getAllColumnsHaveDictionary()) {
+            result.put(col, (Dictionary<String>) getDictionary(col));
+        }
+        return result;
+    }
+
+    public Map<TblColRef, Dictionary<String>> buildGlobalDictionaryMap(int globalColumnsSize) {
+        Map<TblColRef, Dictionary<String>> result = Maps.newHashMapWithExpectedSize(globalColumnsSize);
+        for (TblColRef col : getCubeDesc().getAllGlobalDictColumns()) {
+            result.put(col, getDictionary(col));
+        }
+        return result;
+    }
 
     public Dictionary<String> getDictionary(TblColRef col) {
-        return null;
+        TblColRef reuseCol = getCubeDesc().getDictionaryReuseColumn(col);
+        CubeManager cubeMgr = CubeManager.getInstance(this.getCubeInstance().getConfig());
+        return cubeMgr.getDictionary(this, reuseCol);
     }
 
     public CubeDimEncMap getDimensionEncodingMap() {
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
new file mode 100644
index 0000000..0815942
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.cli;
+
+import java.io.IOException;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.io.IOUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryInfoSerializer;
+import org.apache.kylin.dict.DictionaryProvider;
+import org.apache.kylin.dict.DistinctColumnValuesProvider;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.dict.lookup.SnapshotTableSerializer;
+import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.IReadableTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kylin.shaded.com.google.common.collect.Sets;
+
+public class DictionaryGeneratorCLI {
+
+    private DictionaryGeneratorCLI() {
+    }
+
+    private static final Logger logger = LoggerFactory.getLogger(DictionaryGeneratorCLI.class);
+
+    public static void processSegment(KylinConfig config, String cubeName, String segmentID, String uuid,
+            DistinctColumnValuesProvider factTableValueProvider, DictionaryProvider dictProvider) throws IOException {
+        CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+        CubeSegment segment = cube.getSegmentById(segmentID);
+
+        int retryTime = 0;
+        while (retryTime < 3) {
+            if (retryTime > 0) {
+                logger.info("Rebuild dictionary and snapshot for Cube: {}, Segment: {}, {} times.", cubeName, segmentID,
+                        retryTime);
+            }
+
+            processSegment(config, segment, uuid, factTableValueProvider, dictProvider);
+
+            if (isAllDictsAndSnapshotsReady(config, cubeName, segmentID)) {
+                break;
+            }
+            retryTime++;
+        }
+
+        if (retryTime >= 3) {
+            logger.error("Not all dictionaries and snapshots ready for cube segment: {}", segmentID);
+        } else {
+            logger.info("Succeed to build all dictionaries and snapshots for cube segment: {}", segmentID);
+        }
+    }
+
+    private static void processSegment(KylinConfig config, CubeSegment cubeSeg, String uuid,
+            DistinctColumnValuesProvider factTableValueProvider, DictionaryProvider dictProvider) throws IOException {
+        CubeManager cubeMgr = CubeManager.getInstance(config);
+
+        // dictionary
+        for (TblColRef col : cubeSeg.getCubeDesc().getAllColumnsNeedDictionaryBuilt()) {
+            logger.info("Building dictionary for {}", col);
+            IReadableTable inpTable = factTableValueProvider.getDistinctValuesFor(col);
+
+            Dictionary<String> preBuiltDict = null;
+            if (dictProvider != null) {
+                preBuiltDict = dictProvider.getDictionary(col);
+            }
+
+            if (preBuiltDict != null) {
+                logger.debug("Dict for '{}' has already been built, save it", col.getName());
+                cubeMgr.saveDictionary(cubeSeg, col, inpTable, preBuiltDict);
+            } else {
+                logger.debug("Dict for '{}' not pre-built, build it from {}", col.getName(), inpTable);
+                cubeMgr.buildDictionary(cubeSeg, col, inpTable);
+            }
+        }
+
+        // snapshot
+        Set<String> toSnapshot = Sets.newHashSet();
+        Set<TableRef> toCheckLookup = Sets.newHashSet();
+        for (DimensionDesc dim : cubeSeg.getCubeDesc().getDimensions()) {
+            TableRef table = dim.getTableRef();
+            if (cubeSeg.getModel().isLookupTable(table)) {
+                // only the snapshot desc is not ext type, need to take snapshot
+                toSnapshot.add(table.getTableIdentity());
+                toCheckLookup.add(table);
+            }
+        }
+
+        for (String tableIdentity : toSnapshot) {
+            logger.info("Building snapshot of {}", tableIdentity);
+            cubeMgr.buildSnapshotTable(cubeSeg, tableIdentity, uuid);
+        }
+
+        CubeInstance updatedCube = cubeMgr.getCube(cubeSeg.getCubeInstance().getName());
+        cubeSeg = updatedCube.getSegmentById(cubeSeg.getUuid());
+        for (TableRef lookup : toCheckLookup) {
+            logger.info("Checking snapshot of {}", lookup);
+            try {
+                JoinDesc join = cubeSeg.getModel().getJoinsTree().getJoinByPKSide(lookup);
+                ILookupTable table = cubeMgr.getLookupTable(cubeSeg, join);
+                if (table != null) {
+                    IOUtils.closeStream(table);
+                }
+            } catch (Throwable th) {
+                throw new RuntimeException(String.format(Locale.ROOT, "Checking snapshot of %s failed.", lookup), th);
+            }
+        }
+    }
+
+    private static boolean isAllDictsAndSnapshotsReady(KylinConfig config, String cubeName, String segmentID) {
+        CubeInstance cube = CubeManager.getInstance(config).reloadCube(cubeName);
+        CubeSegment segment = cube.getSegmentById(segmentID);
+        ResourceStore store = ResourceStore.getStore(config);
+
+        // check dicts
+        logger.info("Begin to check if all dictionaries exist of Segment: {}", segmentID);
+        Map<String, String> dictionaries = segment.getDictionaries();
+        for (Map.Entry<String, String> entry : dictionaries.entrySet()) {
+            String dictResPath = entry.getValue();
+            String dictKey = entry.getKey();
+            try {
+                DictionaryInfo dictInfo = store.getResource(dictResPath, DictionaryInfoSerializer.INFO_SERIALIZER);
+                if (dictInfo == null) {
+                    logger.warn("Dictionary=[key: {}, resource path: {}] doesn't exist in resource store", dictKey,
+                            dictResPath);
+                    return false;
+                }
+            } catch (IOException e) {
+                logger.warn("Dictionary=[key: {}, path: {}] failed to check, details: {}", dictKey, dictResPath, e);
+                return false;
+            }
+        }
+
+        // check snapshots
+        logger.info("Begin to check if all snapshots exist of Segment: {}", segmentID);
+        Map<String, String> snapshots = segment.getSnapshots();
+        for (Map.Entry<String, String> entry : snapshots.entrySet()) {
+            String snapshotKey = entry.getKey();
+            String snapshotResPath = entry.getValue();
+            try {
+                SnapshotTable snapshot = store.getResource(snapshotResPath, SnapshotTableSerializer.INFO_SERIALIZER);
+                if (snapshot == null) {
+                    logger.info("SnapshotTable=[key: {}, resource path: {}] doesn't exist in resource store",
+                            snapshotKey, snapshotResPath);
+                    return false;
+                }
+            } catch (IOException e) {
+                logger.warn("SnapshotTable=[key: {}, resource path: {}]  failed to check, details: {}", snapshotKey,
+                        snapshotResPath, e);
+                return false;
+            }
+        }
+
+        logger.info("All dictionaries and snapshots exist checking succeed for Cube Segment: {}", segmentID);
+        return true;
+    }
+}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DumpDictionaryCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DumpDictionaryCLI.java
new file mode 100644
index 0000000..729a6da
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DumpDictionaryCLI.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.cli;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Date;
+
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryInfoSerializer;
+
+public class DumpDictionaryCLI {
+
+    public static void main(String[] args) throws IOException {
+        for (String path : args) {
+            dump(new File(path));
+        }
+    }
+
+    public static void dump(File f) throws IOException {
+        if (f.isDirectory()) {
+            File[] files = f.listFiles();
+            if (files == null) {
+                return;
+            }
+            for (File c : files)
+                dump(c);
+            return;
+        }
+
+        if (f.getName().endsWith(".dict")) {
+            DictionaryInfoSerializer ser = new DictionaryInfoSerializer();
+            DictionaryInfo dictInfo = ser.deserialize(new DataInputStream(new FileInputStream(f)));
+
+            System.out.println("============================================================================");
+            System.out.println("File: " + f.getAbsolutePath());
+            System.out.println(new Date(dictInfo.getLastModified()));
+            System.out.println(JsonUtil.writeValueAsIndentString(dictInfo));
+            dictInfo.getDictionaryObject().dump(System.out);
+            System.out.println();
+        }
+    }
+}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 6bf67db..c0c6882 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -60,6 +60,8 @@ import org.apache.kylin.common.util.Array;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.dict.GlobalDictionaryBuilder;
+import org.apache.kylin.dict.global.SegmentAppendTrieDictBuilder;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.measure.extendedcolumn.ExtendedColumnMeasureType;
 import org.apache.kylin.metadata.MetadataConstants;
@@ -1557,30 +1559,30 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware {
         return null;
     }
 
-//    public List<TblColRef> getAllGlobalDictColumns() {
-//        List<TblColRef> globalDictCols = new ArrayList<TblColRef>();
-//        List<DictionaryDesc> dictionaryDescList = getDictionaries();
-//
-//        if (dictionaryDescList == null) {
-//            return globalDictCols;
-//        }
-//
-//        for (DictionaryDesc dictionaryDesc : dictionaryDescList) {
-//            String cls = dictionaryDesc.getBuilderClass();
-//            if (GlobalDictionaryBuilder.class.getName().equals(cls)
-//                    || SegmentAppendTrieDictBuilder.class.getName().equals(cls))
-//                globalDictCols.add(dictionaryDesc.getColumnRef());
-//        }
-//        return globalDictCols;
-//    }
+    public List<TblColRef> getAllGlobalDictColumns() {
+        List<TblColRef> globalDictCols = new ArrayList<TblColRef>();
+        List<DictionaryDesc> dictionaryDescList = getDictionaries();
+
+        if (dictionaryDescList == null) {
+            return globalDictCols;
+        }
+
+        for (DictionaryDesc dictionaryDesc : dictionaryDescList) {
+            String cls = dictionaryDesc.getBuilderClass();
+            if (GlobalDictionaryBuilder.class.getName().equals(cls)
+                    || SegmentAppendTrieDictBuilder.class.getName().equals(cls))
+                globalDictCols.add(dictionaryDesc.getColumnRef());
+        }
+        return globalDictCols;
+    }
 
     // UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns
-//    public List<TblColRef> getAllUHCColumns() {
-//        List<TblColRef> uhcColumns = new ArrayList<>();
-//        uhcColumns.addAll(getAllGlobalDictColumns());
-//        uhcColumns.addAll(getShardByColumns());
-//        return uhcColumns;
-//    }
+    public List<TblColRef> getAllUHCColumns() {
+        List<TblColRef> uhcColumns = new ArrayList<>();
+        uhcColumns.addAll(getAllGlobalDictColumns());
+        uhcColumns.addAll(getShardByColumns());
+        return uhcColumns;
+    }
 
     public String getProject() {
         DataModelDesc modelDesc = getModel();
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/SnapshotTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/SnapshotTableDesc.java
index 28861a1..30f533b 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/SnapshotTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/SnapshotTableDesc.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.cube.model;
 
+import org.apache.kylin.dict.lookup.SnapshotTable;
+
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -31,8 +33,8 @@ SnapshotTableDesc implements java.io.Serializable{
     private String tableName;
 
     @JsonProperty("storage_type")
-    private String storageType = "Parquet";
-    
+    private String storageType = SnapshotTable.STORAGE_TYPE_METASTORE;
+
     @JsonProperty("local_cache_enable")
     private boolean enableLocalCache = true;
 
@@ -63,6 +65,10 @@ SnapshotTableDesc implements java.io.Serializable{
         this.global = global;
     }
 
+    public boolean isExtSnapshotTable() {
+        return !SnapshotTable.STORAGE_TYPE_METASTORE.equals(storageType);
+    }
+
     public boolean isEnableLocalCache() {
         return enableLocalCache;
     }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
index 04535fd..9023f28 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
@@ -30,6 +30,7 @@ import org.apache.kylin.cube.model.RowKeyDesc;
 import org.apache.kylin.cube.model.validation.IValidatorRule;
 import org.apache.kylin.cube.model.validation.ResultLevel;
 import org.apache.kylin.cube.model.validation.ValidateContext;
+import org.apache.kylin.dict.GlobalDictionaryBuilder;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -91,6 +92,11 @@ public class DictionaryRule implements IValidatorRule<CubeDesc> {
                 }
             }
 
+            if (StringUtils.isNotEmpty(builderClass) && builderClass.equalsIgnoreCase(GlobalDictionaryBuilder.class.getName()) && dimensionColumns.contains(dictCol) && rowKeyDesc.isUseDictionary(dictCol)) {
+                context.addResult(ResultLevel.ERROR, ERROR_GLOBAL_DICTIONNARY_ONLY_MEASURE + dictCol);
+                return;
+            }
+
             if (reuseCol != null) {
                 reuseDictionaries.add(dictDesc);
             } else {
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
index 7c3958c..c6d0c00 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
@@ -18,150 +18,150 @@
 
 package org.apache.kylin.cube.util;
 
-//import java.io.IOException;
-//import java.util.HashMap;
-//import java.util.List;
-//import java.util.Locale;
-//import java.util.Map;
-//import java.util.Set;
-//
-//import org.apache.kylin.common.util.Dictionary;
-//import org.apache.kylin.cube.CubeInstance;
-//import org.apache.kylin.cube.CubeSegment;
-//import org.apache.kylin.cube.cuboid.Cuboid;
-//import org.apache.kylin.cube.model.CubeDesc;
-//import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
-//import org.apache.kylin.dict.DictionaryGenerator;
-//import org.apache.kylin.dict.DictionaryInfo;
-//import org.apache.kylin.dict.DictionaryManager;
-//import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
-//import org.apache.kylin.measure.hllc.HLLCounter;
-//import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-//import org.apache.kylin.metadata.model.TblColRef;
-//import org.apache.kylin.source.IReadableTable;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//
-//import org.apache.kylin.shaded.com.google.common.collect.HashMultimap;
-//import org.apache.kylin.shaded.com.google.common.collect.Maps;
-//import org.apache.kylin.shaded.com.google.common.hash.HashFunction;
-//import org.apache.kylin.shaded.com.google.common.hash.Hasher;
-//import org.apache.kylin.shaded.com.google.common.hash.Hashing;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.IReadableTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kylin.shaded.com.google.common.collect.HashMultimap;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
+import org.apache.kylin.shaded.com.google.common.hash.HashFunction;
+import org.apache.kylin.shaded.com.google.common.hash.Hasher;
+import org.apache.kylin.shaded.com.google.common.hash.Hashing;
 
 /**
  */
 public class CubingUtils {
-//
-//    private static Logger logger = LoggerFactory.getLogger(CubingUtils.class);
-//
-//    public static Map<Long, HLLCounter> sampling(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDescIn,
-//            Iterable<List<String>> streams) {
-//        final CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(flatDescIn, cubeDesc);
-//        final int rowkeyLength = cubeDesc.getRowkey().getRowKeyColumns().length;
-//        final Set<Long> allCuboidIds = cubeDesc.getInitialCuboidScheduler().getAllCuboidIds();
-//        final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-//        final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMap();
-//
-//        final Map<Long, HLLCounter> result = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
-//        for (Long cuboidId : allCuboidIds) {
-//            result.put(cuboidId, new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision()));
-//            Integer[] cuboidBitSet = new Integer[Long.bitCount(cuboidId)];
-//
-//            long mask = Long.highestOneBit(baseCuboidId);
-//            int position = 0;
-//            for (int i = 0; i < rowkeyLength; i++) {
-//                if ((mask & cuboidId) > 0) {
-//                    cuboidBitSet[position] = i;
-//                    position++;
-//                }
-//                mask = mask >> 1;
-//            }
-//            allCuboidsBitSet.put(cuboidId, cuboidBitSet);
-//        }
-//
-//        HashFunction hf = Hashing.murmur3_32();
-//        byte[][] row_hashcodes = new byte[rowkeyLength][];
-//        for (List<String> row : streams) {
-//            //generate hash for each row key column
-//            for (int i = 0; i < rowkeyLength; i++) {
-//                Hasher hc = hf.newHasher();
-//                final String cell = row.get(flatDesc.getRowKeyColumnIndexes()[i]);
-//                if (cell != null) {
-//                    row_hashcodes[i] = hc.putUnencodedChars(cell).hash().asBytes();
-//                } else {
-//                    row_hashcodes[i] = hc.putInt(0).hash().asBytes();
-//                }
-//            }
-//
-//            for (Map.Entry<Long, HLLCounter> longHyperLogLogPlusCounterNewEntry : result.entrySet()) {
-//                Long cuboidId = longHyperLogLogPlusCounterNewEntry.getKey();
-//                HLLCounter counter = longHyperLogLogPlusCounterNewEntry.getValue();
-//                Hasher hc = hf.newHasher();
-//                final Integer[] cuboidBitSet = allCuboidsBitSet.get(cuboidId);
-//                for (int position = 0; position < cuboidBitSet.length; position++) {
-//                    hc.putBytes(row_hashcodes[cuboidBitSet[position]]);
-//                }
-//                counter.add(hc.hash().asBytes());
-//            }
-//        }
-//        return result;
-//    }
-//
-//    public static Map<TblColRef, Dictionary<String>> buildDictionary(final CubeInstance cubeInstance,
-//            Iterable<List<String>> recordList) throws IOException {
-//        final List<TblColRef> columnsNeedToBuildDictionary = cubeInstance.getDescriptor()
-//                .listDimensionColumnsExcludingDerived(true);
-//        final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap();
-//        int index = 0;
-//        for (TblColRef column : columnsNeedToBuildDictionary) {
-//            tblColRefMap.put(index++, column);
-//        }
-//
-//        HashMap<TblColRef, Dictionary<String>> result = Maps.newHashMap();
-//
-//        HashMultimap<TblColRef, String> valueMap = HashMultimap.create();
-//        for (List<String> row : recordList) {
-//            for (int i = 0; i < row.size(); i++) {
-//                String cell = row.get(i);
-//                if (tblColRefMap.containsKey(i)) {
-//                    valueMap.put(tblColRefMap.get(i), cell);
-//                }
-//            }
-//        }
-//        for (TblColRef tblColRef : valueMap.keySet()) {
-//            Set<String> values = valueMap.get(tblColRef);
-//            Dictionary<String> dict = DictionaryGenerator.buildDictionary(tblColRef.getType(),
-//                    new IterableDictionaryValueEnumerator(values));
-//            result.put(tblColRef, dict);
-//        }
-//        return result;
-//    }
-//
-//    @SuppressWarnings("unchecked")
-//    public static Map<TblColRef, Dictionary<String>> writeDictionary(CubeSegment cubeSegment,
-//            Map<TblColRef, Dictionary<String>> dictionaryMap, long startOffset, long endOffset) {
-//        Map<TblColRef, Dictionary<String>> realDictMap = Maps.newHashMap();
-//
-//        for (Map.Entry<TblColRef, Dictionary<String>> entry : dictionaryMap.entrySet()) {
-//            final TblColRef tblColRef = entry.getKey();
-//            final Dictionary<String> dictionary = entry.getValue();
-//            IReadableTable.TableSignature signature = new IReadableTable.TableSignature();
-//            signature.setLastModifiedTime(System.currentTimeMillis());
-//            signature.setPath(String.format(Locale.ROOT, "streaming_%s_%s", startOffset, endOffset));
-//            signature.setSize(endOffset - startOffset);
-//            DictionaryInfo dictInfo = new DictionaryInfo(tblColRef.getColumnDesc(), tblColRef.getDatatype(), signature);
-//            logger.info("writing dictionary for TblColRef:" + tblColRef.toString());
-//            DictionaryManager dictionaryManager = DictionaryManager.getInstance(cubeSegment.getCubeDesc().getConfig());
-//            try {
-//                DictionaryInfo realDict = dictionaryManager.trySaveNewDict(dictionary, dictInfo);
-//                cubeSegment.putDictResPath(tblColRef, realDict.getResourcePath());
-//                realDictMap.put(tblColRef, (Dictionary<String>) realDict.getDictionaryObject());
-//            } catch (IOException e) {
-//                throw new RuntimeException("error save dictionary for column:" + tblColRef, e);
-//            }
-//        }
-//
-//        return realDictMap;
-//    }
-//
+
+    private static Logger logger = LoggerFactory.getLogger(CubingUtils.class);
+
+    public static Map<Long, HLLCounter> sampling(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDescIn,
+            Iterable<List<String>> streams) {
+        final CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(flatDescIn, cubeDesc);
+        final int rowkeyLength = cubeDesc.getRowkey().getRowKeyColumns().length;
+        final Set<Long> allCuboidIds = cubeDesc.getInitialCuboidScheduler().getAllCuboidIds();
+        final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+        final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMap();
+
+        final Map<Long, HLLCounter> result = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
+        for (Long cuboidId : allCuboidIds) {
+            result.put(cuboidId, new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision()));
+            Integer[] cuboidBitSet = new Integer[Long.bitCount(cuboidId)];
+
+            long mask = Long.highestOneBit(baseCuboidId);
+            int position = 0;
+            for (int i = 0; i < rowkeyLength; i++) {
+                if ((mask & cuboidId) > 0) {
+                    cuboidBitSet[position] = i;
+                    position++;
+                }
+                mask = mask >> 1;
+            }
+            allCuboidsBitSet.put(cuboidId, cuboidBitSet);
+        }
+
+        HashFunction hf = Hashing.murmur3_32();
+        byte[][] row_hashcodes = new byte[rowkeyLength][];
+        for (List<String> row : streams) {
+            //generate hash for each row key column
+            for (int i = 0; i < rowkeyLength; i++) {
+                Hasher hc = hf.newHasher();
+                final String cell = row.get(flatDesc.getRowKeyColumnIndexes()[i]);
+                if (cell != null) {
+                    row_hashcodes[i] = hc.putUnencodedChars(cell).hash().asBytes();
+                } else {
+                    row_hashcodes[i] = hc.putInt(0).hash().asBytes();
+                }
+            }
+
+            for (Map.Entry<Long, HLLCounter> longHyperLogLogPlusCounterNewEntry : result.entrySet()) {
+                Long cuboidId = longHyperLogLogPlusCounterNewEntry.getKey();
+                HLLCounter counter = longHyperLogLogPlusCounterNewEntry.getValue();
+                Hasher hc = hf.newHasher();
+                final Integer[] cuboidBitSet = allCuboidsBitSet.get(cuboidId);
+                for (int position = 0; position < cuboidBitSet.length; position++) {
+                    hc.putBytes(row_hashcodes[cuboidBitSet[position]]);
+                }
+                counter.add(hc.hash().asBytes());
+            }
+        }
+        return result;
+    }
+
+    public static Map<TblColRef, Dictionary<String>> buildDictionary(final CubeInstance cubeInstance,
+            Iterable<List<String>> recordList) throws IOException {
+        final List<TblColRef> columnsNeedToBuildDictionary = cubeInstance.getDescriptor()
+                .listDimensionColumnsExcludingDerived(true);
+        final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap();
+        int index = 0;
+        for (TblColRef column : columnsNeedToBuildDictionary) {
+            tblColRefMap.put(index++, column);
+        }
+
+        HashMap<TblColRef, Dictionary<String>> result = Maps.newHashMap();
+
+        HashMultimap<TblColRef, String> valueMap = HashMultimap.create();
+        for (List<String> row : recordList) {
+            for (int i = 0; i < row.size(); i++) {
+                String cell = row.get(i);
+                if (tblColRefMap.containsKey(i)) {
+                    valueMap.put(tblColRefMap.get(i), cell);
+                }
+            }
+        }
+        for (TblColRef tblColRef : valueMap.keySet()) {
+            Set<String> values = valueMap.get(tblColRef);
+            Dictionary<String> dict = DictionaryGenerator.buildDictionary(tblColRef.getType(),
+                    new IterableDictionaryValueEnumerator(values));
+            result.put(tblColRef, dict);
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static Map<TblColRef, Dictionary<String>> writeDictionary(CubeSegment cubeSegment,
+            Map<TblColRef, Dictionary<String>> dictionaryMap, long startOffset, long endOffset) {
+        Map<TblColRef, Dictionary<String>> realDictMap = Maps.newHashMap();
+
+        for (Map.Entry<TblColRef, Dictionary<String>> entry : dictionaryMap.entrySet()) {
+            final TblColRef tblColRef = entry.getKey();
+            final Dictionary<String> dictionary = entry.getValue();
+            IReadableTable.TableSignature signature = new IReadableTable.TableSignature();
+            signature.setLastModifiedTime(System.currentTimeMillis());
+            signature.setPath(String.format(Locale.ROOT, "streaming_%s_%s", startOffset, endOffset));
+            signature.setSize(endOffset - startOffset);
+            DictionaryInfo dictInfo = new DictionaryInfo(tblColRef.getColumnDesc(), tblColRef.getDatatype(), signature);
+            logger.info("writing dictionary for TblColRef:" + tblColRef.toString());
+            DictionaryManager dictionaryManager = DictionaryManager.getInstance(cubeSegment.getCubeDesc().getConfig());
+            try {
+                DictionaryInfo realDict = dictionaryManager.trySaveNewDict(dictionary, dictInfo);
+                cubeSegment.putDictResPath(tblColRef, realDict.getResourcePath());
+                realDictMap.put(tblColRef, (Dictionary<String>) realDict.getDictionaryObject());
+            } catch (IOException e) {
+                throw new RuntimeException("error save dictionary for column:" + tblColRef, e);
+            }
+        }
+
+        return realDictMap;
+    }
+
 }
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java b/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
index 90cabfa..de436d0 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.cube.model.validation.rule;
 
 import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_DUPLICATE_DICTIONARY_COLUMN;
+import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_GLOBAL_DICTIONNARY_ONLY_MEASURE;
 import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_REUSE_BUILDER_BOTH_EMPTY;
 import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_REUSE_BUILDER_BOTH_SET;
 import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_TRANSITIVE_REUSE;
@@ -36,6 +37,7 @@ import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.DictionaryDesc;
 import org.apache.kylin.cube.model.validation.ValidateContext;
+import org.apache.kylin.dict.GlobalDictionaryBuilder;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -73,6 +75,7 @@ public class DictionaryRuleTest extends LocalFileMetadataTestCase {
     @Test
     public void testBadDesc() throws IOException {
         testDictionaryDesc(ERROR_DUPLICATE_DICTIONARY_COLUMN, DictionaryDesc.create("ORDER_ID", null, "FakeBuilderClass"));
+        testDictionaryDesc(ERROR_DUPLICATE_DICTIONARY_COLUMN, DictionaryDesc.create("ORDER_ID", null, GlobalDictionaryBuilder.class.getName()));
     }
 
     @Test
@@ -92,6 +95,17 @@ public class DictionaryRuleTest extends LocalFileMetadataTestCase {
                 DictionaryDesc.create("price", "lstg_site_id", null));
     }
 
+    @Test
+    public void testBadDesc5() throws IOException {
+        testDictionaryDesc(ERROR_GLOBAL_DICTIONNARY_ONLY_MEASURE,
+                DictionaryDesc.create("CATEG_LVL2_NAME", null, GlobalDictionaryBuilder.class.getName()));
+    }
+
+    @Test
+    public void testGoodDesc2() throws IOException {
+        testDictionaryDesc(null, DictionaryDesc.create("SELLER_ID", null, GlobalDictionaryBuilder.class.getName()));
+    }
+
     private void testDictionaryDesc(String expectMessage, DictionaryDesc... descs) throws IOException {
         DictionaryRule rule = new DictionaryRule();
         File f = new File(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/cube_desc/test_kylin_cube_without_slr_left_join_desc.json");
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
index f132569..a0730ff 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
@@ -40,196 +40,196 @@ import com.google.common.collect.Lists;
  */
 @SuppressWarnings({ "rawtypes", "unchecked" })
 public class DictionaryGenerator {
-//
-//    private static final Logger logger = LoggerFactory.getLogger(DictionaryGenerator.class);
-//
-//    public static IDictionaryBuilder newDictionaryBuilder(DataType dataType) {
-//        Preconditions.checkNotNull(dataType, "dataType cannot be null");
-//
-//        // build dict, case by data type
-//        IDictionaryBuilder builder;
-//        boolean useForest = KylinConfig.getInstanceFromEnv().isUseForestTrieDictionary();
-//        if (dataType.isNumberFamily())
-//            builder = useForest ? new NumberTrieDictForestBuilder() : new NumberTrieDictBuilder();
-//        else
-//            builder = useForest ? new StringTrieDictForestBuilder() : new StringTrieDictBuilder();
-//        return builder;
-//    }
-//
-//    public static Dictionary<String> buildDictionary(DataType dataType, IDictionaryValueEnumerator valueEnumerator)
-//            throws IOException {
-//        return buildDictionary(newDictionaryBuilder(dataType), null, valueEnumerator);
-//    }
-//
-//    static Dictionary<String> buildDictionary(IDictionaryBuilder builder, DictionaryInfo dictInfo,
-//            IDictionaryValueEnumerator valueEnumerator) throws IOException {
-//        int baseId = 0; // always 0 for now
-//        int nSamples = 5;
-//        ArrayList<String> samples = new ArrayList<String>(nSamples);
-//
-//        // init the builder
-//        builder.init(dictInfo, baseId, null);
-//
-//        // add values
-//        try {
-//            while (valueEnumerator.moveNext()) {
-//                String value = valueEnumerator.current();
-//
-//                boolean accept = builder.addValue(value);
-//
-//                if (accept && samples.size() < nSamples && samples.contains(value) == false)
-//                    samples.add(value);
-//            }
-//        } catch (IOException e) {
-//            logger.error("Error during adding dict value.", e);
-//            builder.clear();
-//            throw e;
-//        }
-//
-//        // build
-//        Dictionary<String> dict = builder.build();
-//        logger.debug("Dictionary cardinality: " + dict.getSize());
-//        logger.debug("Dictionary builder class: " + builder.getClass().getName());
-//        logger.debug("Dictionary class: " + dict.getClass().getName());
-//        // log a few samples
-//        StringBuilder buf = new StringBuilder();
-//        for (String s : samples) {
-//            if (buf.length() > 0) {
-//                buf.append(", ");
-//            }
-//            buf.append(s.toString()).append("=>").append(dict.getIdFromValue(s));
-//        }
-//        logger.debug("Dictionary value samples: " + buf.toString());
-//
-//        return dict;
-//    }
-//
-//    public static Dictionary mergeDictionaries(DataType dataType, List<DictionaryInfo> sourceDicts) throws IOException {
-//        List<Dictionary<String>> dictList = Lists.transform(sourceDicts, new Function<DictionaryInfo, Dictionary<String>>() {
-//            @Nullable
-//            @Override
-//            public Dictionary<String> apply(@Nullable DictionaryInfo input) {
-//                return input.dictionaryObject;
-//            }
-//        });
-//        return buildDictionary(dataType, new MultipleDictionaryValueEnumerator(dataType, dictList));
-//    }
-//
-//    private static class StringTrieDictBuilder implements IDictionaryBuilder {
-//        int baseId;
-//        TrieDictionaryBuilder builder;
-//
-//        @Override
-//        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
-//            this.baseId = baseId;
-//            this.builder = new TrieDictionaryBuilder(new StringBytesConverter());
-//        }
-//
-//        @Override
-//        public boolean addValue(String value) {
-//            if (value == null)
-//                return false;
-//
-//            builder.addValue(value);
-//            return true;
-//        }
-//
-//        @Override
-//        public Dictionary<String> build() throws IOException {
-//            return builder.build(baseId);
-//        }
-//
-//        @Override
-//        public void clear() {
-//
-//        }
-//    }
-//
-//    private static class StringTrieDictForestBuilder implements IDictionaryBuilder {
-//        TrieDictionaryForestBuilder builder;
-//
-//        @Override
-//        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
-//            builder = new TrieDictionaryForestBuilder(new StringBytesConverter(), baseId);
-//        }
-//
-//        @Override
-//        public boolean addValue(String value) {
-//            if (value == null)
-//                return false;
-//
-//            builder.addValue(value);
-//            return true;
-//        }
-//
-//        @Override
-//        public Dictionary<String> build() throws IOException {
-//            return builder.build();
-//        }
-//
-//        @Override
-//        public void clear() {
-//
-//        }
-//    }
-//
-//    @SuppressWarnings("deprecation")
-//    private static class NumberTrieDictBuilder implements IDictionaryBuilder {
-//        int baseId;
-//        NumberDictionaryBuilder builder;
-//
-//        @Override
-//        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
-//            this.baseId = baseId;
-//            this.builder = new NumberDictionaryBuilder();
-//        }
-//
-//        @Override
-//        public boolean addValue(String value) {
-//            if (StringUtils.isBlank(value)) // empty string is treated as null
-//                return false;
-//
-//            builder.addValue(value);
-//            return true;
-//        }
-//
-//        @Override
-//        public Dictionary<String> build() throws IOException {
-//            return builder.build(baseId);
-//        }
-//
-//        @Override
-//        public void clear() {
-//
-//        }
-//    }
-//
-//    private static class NumberTrieDictForestBuilder implements IDictionaryBuilder {
-//        NumberDictionaryForestBuilder builder;
-//
-//        @Override
-//        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
-//            builder = new NumberDictionaryForestBuilder(baseId);
-//        }
-//
-//        @Override
-//        public boolean addValue(String value) {
-//            if (StringUtils.isBlank(value)) // empty string is treated as null
-//                return false;
-//
-//            builder.addValue(value);
-//            return true;
-//        }
-//
-//        @Override
-//        public Dictionary<String> build() throws IOException {
-//            return builder.build();
-//        }
-//
-//        @Override
-//        public void clear() {
-//
-//        }
-//    }
+
+    private static final Logger logger = LoggerFactory.getLogger(DictionaryGenerator.class);
+
+    public static IDictionaryBuilder newDictionaryBuilder(DataType dataType) {
+        Preconditions.checkNotNull(dataType, "dataType cannot be null");
+
+        // build dict, case by data type
+        IDictionaryBuilder builder;
+        boolean useForest = KylinConfig.getInstanceFromEnv().isUseForestTrieDictionary();
+        if (dataType.isNumberFamily())
+            builder = useForest ? new NumberTrieDictForestBuilder() : new NumberTrieDictBuilder();
+        else
+            builder = useForest ? new StringTrieDictForestBuilder() : new StringTrieDictBuilder();
+        return builder;
+    }
+
+    public static Dictionary<String> buildDictionary(DataType dataType, IDictionaryValueEnumerator valueEnumerator)
+            throws IOException {
+        return buildDictionary(newDictionaryBuilder(dataType), null, valueEnumerator);
+    }
+
+    static Dictionary<String> buildDictionary(IDictionaryBuilder builder, DictionaryInfo dictInfo,
+            IDictionaryValueEnumerator valueEnumerator) throws IOException {
+        int baseId = 0; // always 0 for now
+        int nSamples = 5;
+        ArrayList<String> samples = new ArrayList<String>(nSamples);
+
+        // init the builder
+        builder.init(dictInfo, baseId, null);
+
+        // add values
+        try {
+            while (valueEnumerator.moveNext()) {
+                String value = valueEnumerator.current();
+
+                boolean accept = builder.addValue(value);
+
+                if (accept && samples.size() < nSamples && samples.contains(value) == false)
+                    samples.add(value);
+            }
+        } catch (IOException e) {
+            logger.error("Error during adding dict value.", e);
+            builder.clear();
+            throw e;
+        }
+
+        // build
+        Dictionary<String> dict = builder.build();
+        logger.debug("Dictionary cardinality: " + dict.getSize());
+        logger.debug("Dictionary builder class: " + builder.getClass().getName());
+        logger.debug("Dictionary class: " + dict.getClass().getName());
+        // log a few samples
+        StringBuilder buf = new StringBuilder();
+        for (String s : samples) {
+            if (buf.length() > 0) {
+                buf.append(", ");
+            }
+            buf.append(s.toString()).append("=>").append(dict.getIdFromValue(s));
+        }
+        logger.debug("Dictionary value samples: " + buf.toString());
+
+        return dict;
+    }
+
+    public static Dictionary mergeDictionaries(DataType dataType, List<DictionaryInfo> sourceDicts) throws IOException {
+        List<Dictionary<String>> dictList = Lists.transform(sourceDicts, new Function<DictionaryInfo, Dictionary<String>>() {
+            @Nullable
+            @Override
+            public Dictionary<String> apply(@Nullable DictionaryInfo input) {
+                return input.dictionaryObject;
+            }
+        });
+        return buildDictionary(dataType, new MultipleDictionaryValueEnumerator(dataType, dictList));
+    }
+
+    private static class StringTrieDictBuilder implements IDictionaryBuilder {
+        int baseId;
+        TrieDictionaryBuilder builder;
+
+        @Override
+        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
+            this.baseId = baseId;
+            this.builder = new TrieDictionaryBuilder(new StringBytesConverter());
+        }
+
+        @Override
+        public boolean addValue(String value) {
+            if (value == null)
+                return false;
+
+            builder.addValue(value);
+            return true;
+        }
+
+        @Override
+        public Dictionary<String> build() throws IOException {
+            return builder.build(baseId);
+        }
+
+        @Override
+        public void clear() {
+
+        }
+    }
+
+    private static class StringTrieDictForestBuilder implements IDictionaryBuilder {
+        TrieDictionaryForestBuilder builder;
+
+        @Override
+        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
+            builder = new TrieDictionaryForestBuilder(new StringBytesConverter(), baseId);
+        }
+
+        @Override
+        public boolean addValue(String value) {
+            if (value == null)
+                return false;
+
+            builder.addValue(value);
+            return true;
+        }
+
+        @Override
+        public Dictionary<String> build() throws IOException {
+            return builder.build();
+        }
+
+        @Override
+        public void clear() {
+
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    private static class NumberTrieDictBuilder implements IDictionaryBuilder {
+        int baseId;
+        NumberDictionaryBuilder builder;
+
+        @Override
+        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
+            this.baseId = baseId;
+            this.builder = new NumberDictionaryBuilder();
+        }
+
+        @Override
+        public boolean addValue(String value) {
+            if (StringUtils.isBlank(value)) // empty string is treated as null
+                return false;
+
+            builder.addValue(value);
+            return true;
+        }
+
+        @Override
+        public Dictionary<String> build() throws IOException {
+            return builder.build(baseId);
+        }
+
+        @Override
+        public void clear() {
+
+        }
+    }
+
+    private static class NumberTrieDictForestBuilder implements IDictionaryBuilder {
+        NumberDictionaryForestBuilder builder;
+
+        @Override
+        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
+            builder = new NumberDictionaryForestBuilder(baseId);
+        }
+
+        @Override
+        public boolean addValue(String value) {
+            if (StringUtils.isBlank(value)) // empty string is treated as null
+                return false;
+
+            builder.addValue(value);
+            return true;
+        }
+
+        @Override
+        public Dictionary<String> build() throws IOException {
+            return builder.build();
+        }
+
+        @Override
+        public void clear() {
+
+        }
+    }
 
 }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/ILookupTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ILookupTable.java
similarity index 96%
rename from core-cube/src/main/java/org/apache/kylin/cube/ILookupTable.java
rename to core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ILookupTable.java
index 794b1e5..dccb7c4 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/ILookupTable.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ILookupTable.java
@@ -16,7 +16,7 @@
  * limitations under the License.
 */
 
-package org.apache.kylin.cube;
+package org.apache.kylin.dict.lookup;
 
 import org.apache.kylin.common.util.Array;
 
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
index c72681b..9d591b5 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -63,7 +63,7 @@ public class SnapshotManager {
                 SnapshotManager.logger.info("Snapshot with resource path {} is removed due to {}",
                         notification.getKey(), notification.getCause());
             }
-        }).maximumSize(100L)//
+        }).maximumSize(config.getCachedSnapshotMaxEntrySize())//
                 .expireAfterWrite(1, TimeUnit.DAYS).build(new CacheLoader<String, SnapshotTable>() {
                     @Override
                     public SnapshotTable load(String key) throws Exception {
@@ -173,11 +173,11 @@ public class SnapshotManager {
             KylinConfig cubeConfig) throws IOException {
         String dup = checkDupByInfo(snapshot);
 
-//        if ((float) snapshot.getSignature().getSize() / 1024 / 1024 > cubeConfig.getTableSnapshotMaxMB()) {
-//            throw new IllegalStateException(
-//                    "Table snapshot should be no greater than " + cubeConfig.getTableSnapshotMaxMB() //
-//                            + " MB, but " + tableDesc + " size is " + snapshot.getSignature().getSize());
-//        }
+        if ((float) snapshot.getSignature().getSize() / 1024 / 1024 > cubeConfig.getTableSnapshotMaxMB()) {
+            throw new IllegalStateException(
+                    "Table snapshot should be no greater than " + cubeConfig.getTableSnapshotMaxMB() //
+                            + " MB, but " + tableDesc + " size is " + snapshot.getSignature().getSize());
+        }
 
         if (dup != null) {
             logger.info("Identical input {}, reuse existing snapshot at {}", table.getSignature(), dup);
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 31cc081..2c4bc6a 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.job;
 
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -31,6 +32,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.RowKeyColDesc;
+import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.JoinDesc;
@@ -111,16 +113,16 @@ public class JoinedFlatTable {
             kylinConfig = (flatDesc.getSegment()).getConfig();
         }
 
-//        if (kylinConfig.isAdvancedFlatTableUsed()) {
-//            try {
-//                Class advancedFlatTable = Class.forName(kylinConfig.getAdvancedFlatTableClass());
-//                Method method = advancedFlatTable.getMethod("generateInsertDataStatement", IJoinedFlatTableDesc.class,
-//                        JobEngineConfig.class);
-//                return (String) method.invoke(null, flatDesc);
-//            } catch (Exception e) {
-//                throw new RuntimeException(e);
-//            }
-//        }
+        if (kylinConfig.isAdvancedFlatTableUsed()) {
+            try {
+                Class advancedFlatTable = Class.forName(kylinConfig.getAdvancedFlatTableClass());
+                Method method = advancedFlatTable.getMethod("generateInsertDataStatement", IJoinedFlatTableDesc.class,
+                        JobEngineConfig.class);
+                return (String) method.invoke(null, flatDesc);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
 
         return "INSERT OVERWRITE TABLE " + quoteIdentifier(flatDesc.getTableName(), null) + " " + generateSelectDataStatement(flatDesc)
                 + ";\n";
diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/DimensionEncodingFactory.java b/core-metadata/src/main/java/org/apache/kylin/dimension/DimensionEncodingFactory.java
index 937c1ad..422a802 100644
--- a/core-metadata/src/main/java/org/apache/kylin/dimension/DimensionEncodingFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/dimension/DimensionEncodingFactory.java
@@ -32,7 +32,6 @@ import org.apache.kylin.shaded.com.google.common.base.Predicate;
 import org.apache.kylin.shaded.com.google.common.collect.Iterables;
 import org.apache.kylin.shaded.com.google.common.collect.Maps;
 
-@Deprecated
 public abstract class DimensionEncodingFactory {
 
     private static final Logger logger = LoggerFactory.getLogger(DimensionEncodingFactory.class);
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
index 55bdb4b..acdc2e9 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
@@ -20,6 +20,7 @@ package org.apache.kylin.storage;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.threadlocal.InternalThreadLocal;
+import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.ImplementationSwitch;
 import org.apache.kylin.metadata.model.IStorageAware;
 import org.apache.kylin.metadata.realization.IRealization;
@@ -31,7 +32,19 @@ public class StorageFactory {
     // Use thread-local because KylinConfig can be thread-local and implementation might be different among multiple threads.
     private static InternalThreadLocal<ImplementationSwitch<IStorage>> storages = new InternalThreadLocal<>();
 
+    private static IStorage configuredUseLocalStorage;
+
+    static {
+        String localStorageImpl = KylinConfig.getInstanceFromEnv().getLocalStorageImpl();
+        if (localStorageImpl != null){
+            configuredUseLocalStorage = (IStorage) ClassUtil.newInstance(localStorageImpl);
+        }
+    }
+
     public static IStorage storage(IStorageAware aware) {
+        if (configuredUseLocalStorage != null) {
+            return configuredUseLocalStorage;
+        }
         ImplementationSwitch<IStorage> current = storages.get();
         if (storages.get() == null) {
             current = new ImplementationSwitch<>(KylinConfig.getInstanceFromEnv().getStorageEngines(), IStorage.class);
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
index 3fa104d..e6e0737 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
@@ -28,6 +28,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
@@ -79,9 +80,9 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
             TupleFilter havingFilter, StorageContext context) {
         this.context = context;
 
-//        this.maxScanRanges = cubeSegment.getConfig().getQueryStorageVisitScanRangeMax();
-//        this.maxFuzzyKeysPerSplit = cubeSegment.getConfig().getQueryScanFuzzyKeyMax();
-//        this.maxFuzzyKeys = maxFuzzyKeysPerSplit * cubeSegment.getConfig().getQueryScanFuzzyKeySplitMax();
+        this.maxScanRanges = cubeSegment.getConfig().getQueryStorageVisitScanRangeMax();
+        this.maxFuzzyKeysPerSplit = cubeSegment.getConfig().getQueryScanFuzzyKeyMax();
+        this.maxFuzzyKeys = maxFuzzyKeysPerSplit * cubeSegment.getConfig().getQueryScanFuzzyKeySplitMax();
 
         this.cubeSegment = cubeSegment;
         this.cubeDesc = cubeSegment.getCubeDesc();
@@ -147,9 +148,9 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
      */
     public CubeScanRangePlanner(GTInfo info, TblColRef gtPartitionCol, TupleFilter gtFilter) {
 
-//        this.maxScanRanges = KylinConfig.getInstanceFromEnv().getQueryStorageVisitScanRangeMax();
-//        this.maxFuzzyKeysPerSplit = KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeyMax();
-//        this.maxFuzzyKeys = maxFuzzyKeysPerSplit * KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeySplitMax();
+        this.maxScanRanges = KylinConfig.getInstanceFromEnv().getQueryStorageVisitScanRangeMax();
+        this.maxFuzzyKeysPerSplit = KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeyMax();
+        this.maxFuzzyKeys = maxFuzzyKeysPerSplit * KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeySplitMax();
 
         this.gtInfo = info;
 
@@ -174,7 +175,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
                     .setRtAggrMetrics(gtRtAggrMetrics).setDynamicColumns(gtDynColumns)
                     .setExprsPushDown(tupleExpressionMap)//
                     .setAllowStorageAggregation(context.isNeedStorageAggregation())
-                    .setAggCacheMemThreshold(0)//
+                    .setAggCacheMemThreshold(cubeSegment.getConfig().getQueryCoprocessorMemGB())//
                     .setStoragePushDownLimit(context.getFinalPushDownLimit())
                     .setStorageLimitLevel(context.getStorageLimitLevel()).setHavingFilterPushDown(havingFilter)
                     .createGTScanRequest();
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
index b76fe47..3adbb8e 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
@@ -26,10 +26,12 @@ import java.util.Set;
 
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.dict.BuiltInFunctionTransformer;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.metadata.expression.TupleExpression;
+import org.apache.kylin.metadata.filter.ITupleFilterTransformer;
 import org.apache.kylin.metadata.filter.StringCodeSystem;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilterSerializer;
@@ -66,6 +68,10 @@ public class CubeSegmentScanner implements Iterable<GTRecord> {
         byte[] serialize = TupleFilterSerializer.serialize(originalfilter, StringCodeSystem.INSTANCE);
         TupleFilter filter = TupleFilterSerializer.deserialize(serialize, StringCodeSystem.INSTANCE);
 
+        // translate FunctionTupleFilter to IN clause
+        ITupleFilterTransformer translator = new BuiltInFunctionTransformer(cubeSeg.getDimensionEncodingMap());
+        filter = translator.transform(filter);
+
         CubeScanRangePlanner scanRangePlanner;
         try {
             scanRangePlanner = new CubeScanRangePlanner(cubeSeg, cuboid, filter, dimensions, groups, dynGroups,
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
index db08e62..4fb71bb 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
@@ -36,6 +36,7 @@ import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.cube.model.RowKeyColDesc;
 import org.apache.kylin.cube.model.RowKeyDesc;
+import org.apache.kylin.dict.lookup.ILookupTable;
 import org.apache.kylin.dimension.TimeDerivedColumnType;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
@@ -44,7 +45,6 @@ import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.tuple.Tuple;
 import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.cube.ILookupTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -97,7 +97,7 @@ public class CubeTupleConverter implements ITupleConverter {
         advMeasureFillers = Lists.newArrayListWithCapacity(1);
         advMeasureIndexInGTValues = Lists.newArrayListWithCapacity(1);
         usedLookupTables = Lists.newArrayList();
-//        eventTimezone = cubeSeg.getConfig().getStreamingDerivedTimeTimezone();
+        eventTimezone = cubeSeg.getConfig().getStreamingDerivedTimeTimezone();
         autoJustByTimezone = eventTimezone.length() > 0
                 && cubeSeg.getCubeDesc().getModel().getRootFactTable().getTableDesc().isStreamingTable();
         if (autoJustByTimezone) {
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 63ef36d..804ce3f 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -33,7 +33,6 @@ import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.ILookupTable;
 import org.apache.kylin.cube.RawQueryLastHacker;
 import org.apache.kylin.cube.common.SegmentPruner;
 import org.apache.kylin.cube.cuboid.Cuboid;
@@ -42,6 +41,7 @@ import org.apache.kylin.cube.gridtable.CuboidToGridTableMappingExt;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.cube.model.RowKeyColDesc;
+import org.apache.kylin.dict.lookup.ILookupTable;
 import org.apache.kylin.gridtable.StorageLimitLevel;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.measure.bitmap.BitmapMeasureType;
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
index fea4550..9bfdd76 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
@@ -25,9 +25,9 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Array;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.ILookupTable;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.cube.model.CubeDesc.DeriveType;
+import org.apache.kylin.dict.lookup.ILookupTable;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.datatype.DataTypeOrder;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
index 47a3ba7..574bb9f 100644
--- a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
+++ b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
@@ -18,673 +18,721 @@
 
 package org.apache.kylin.storage.gtrecord;
 
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.BytesSerializer;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.gridtable.CubeCodeSystem;
+import org.apache.kylin.dict.NumberDictionaryForestBuilder;
+import org.apache.kylin.dict.StringBytesConverter;
+import org.apache.kylin.dict.TrieDictionaryBuilder;
+import org.apache.kylin.dimension.DictionaryDimEnc;
+import org.apache.kylin.dimension.DimensionEncoding;
+import org.apache.kylin.gridtable.GTBuilder;
+import org.apache.kylin.gridtable.GTFilterScanner.FilterResultCache;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTInfo.Builder;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRange;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
+import org.apache.kylin.gridtable.GTUtil;
+import org.apache.kylin.gridtable.GridTable;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.memstore.GTSimpleMemStore;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.ExtractTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.model.TblColRef.InnerDataTypeEnum;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.kylin.shaded.com.google.common.collect.Lists;
 
 public class DictGridTableTest extends LocalFileMetadataTestCase {
-//
-//    private GridTable table;
-//    private GTInfo info;
-//    private CompareTupleFilter timeComp0;
-//    private CompareTupleFilter timeComp1;
-//    private CompareTupleFilter timeComp2;
-//    private CompareTupleFilter timeComp3;
-//    private CompareTupleFilter timeComp4;
-//    private CompareTupleFilter timeComp5;
-//    private CompareTupleFilter timeComp6;
-//    private CompareTupleFilter timeComp7;
-//    private CompareTupleFilter ageComp1;
-//    private CompareTupleFilter ageComp2;
-//    private CompareTupleFilter ageComp3;
-//    private CompareTupleFilter ageComp4;
-//
-//    @After
-//    public void after() throws Exception {
-//
-//        this.cleanupTestMetadata();
-//    }
-//
-//    @Before
-//    public void setup() throws IOException {
-//
-//        this.createTestMetadata();
-//
-//        table = newTestTable();
-//        info = table.getInfo();
-//
-//        timeComp0 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-14"));
-//        timeComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
-//        timeComp2 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-13"));
-//        timeComp3 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-15"));
-//        timeComp4 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-15"));
-//        timeComp5 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-15"));
-//        timeComp6 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-14"));
-//        timeComp7 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "1970-01-01"));
-//        ageComp1 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "10"));
-//        ageComp2 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "20"));
-//        ageComp3 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "30"));
-//        ageComp4 = compare(info.colRef(1), FilterOperatorEnum.NEQ, enc(info, 1, "30"));
-//
-//    }
-//
-//    @Test
-//    public void verifySegmentSkipping() {
-//
-//        ByteArray segmentStart = enc(info, 0, "2015-01-14");
-//        ByteArray segmentStartX = enc(info, 0, "2015-01-14 00:00:00");//when partition col is dict encoded, time format will be free
-//        assertEquals(segmentStart, segmentStartX);
-//
-//        {
-//            LogicalTupleFilter filter = and(timeComp0, ageComp1);
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals(1, r.size());//scan range are [close,close]
-//            assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
-//            assertEquals(1, r.get(0).fuzzyKeys.size());
-//            assertEquals("[[null, 10, null, null, null]]", r.get(0).fuzzyKeys.toString());
-//        }
-//        {
-//            LogicalTupleFilter filter = and(timeComp2, ageComp1);
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals(1, r.size());
-//        }
-//        {
-//            LogicalTupleFilter filter = and(timeComp4, ageComp1);
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals(1, r.size());
-//        }
-//        {
-//            LogicalTupleFilter filter = and(timeComp5, ageComp1);
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals(1, r.size());
-//        }
-//        {
-//            LogicalTupleFilter filter = or(and(timeComp2, ageComp1), and(timeComp1, ageComp1),
-//                    and(timeComp6, ageComp1));
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals(2, r.size());
-//            assertEquals("[1421193600000, 10]-[null, 10]", r.get(1).toString());
-//            assertEquals("[[null, 10, null, null, null], [1421193600000, 10, null, null, null]]",
-//                    r.get(1).fuzzyKeys.toString());
-//        }
-//        {
-//            LogicalTupleFilter filter = or(and(timeComp3, ageComp3), and(timeComp7, ageComp1));
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals("[[0, 10]-[1421280000000, 30]]", r.toString());
-//        }
-//        {
-//            LogicalTupleFilter filter = or(timeComp2, timeComp1, timeComp6);
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals(2, r.size());
-//            assertEquals("[1421193600000, null]-[null, null]", r.get(1).toString());
-//            assertEquals(0, r.get(1).fuzzyKeys.size());
-//        }
-//        {
-//            //skip FALSE filter
-//            LogicalTupleFilter filter = and(ageComp1, ConstantTupleFilter.FALSE);
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals(0, r.size());
-//        }
-//        {
-//            //TRUE or FALSE filter
-//            LogicalTupleFilter filter = or(ConstantTupleFilter.TRUE, ConstantTupleFilter.FALSE);
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals(1, r.size());
-//            assertEquals("[null, null]-[null, null]", r.get(0).toString());
-//        }
-//        {
-//            //TRUE or other filter
-//            LogicalTupleFilter filter = or(ageComp1, ConstantTupleFilter.TRUE);
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals(1, r.size());
-//            assertEquals("[null, null]-[null, null]", r.get(0).toString());
-//        }
-//    }
-//
-//    @Test
-//    public void verifySegmentSkipping2() {
-//        {
-//            LogicalTupleFilter filter = and(timeComp0, ageComp1);
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals(1, r.size());//scan range are [close,close]
-//            assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
-//            assertEquals(1, r.get(0).fuzzyKeys.size());
-//            assertEquals("[[null, 10, null, null, null]]", r.get(0).fuzzyKeys.toString());
-//        }
-//
-//        {
-//            LogicalTupleFilter filter = and(timeComp5, ageComp1);
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals(1, r.size());//scan range are [close,close]
-//        }
-//    }
-//
-//    @Test
-//    public void verifyScanRangePlanner() {
-//
-//        // flatten or-and & hbase fuzzy value
-//        {
-//            LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2));
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals(1, r.size());
-//            assertEquals("[1421193600000, 10]-[null, 20]", r.get(0).toString());
-//            assertEquals("[[null, 10, null, null, null], [null, 20, null, null, null]]", r.get(0).fuzzyKeys.toString());
-//        }
-//
-//        // pre-evaluate ever false
-//        {
-//            LogicalTupleFilter filter = and(timeComp1, timeComp2);
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals(0, r.size());
-//        }
-//
-//        // pre-evaluate ever true
-//        {
-//            LogicalTupleFilter filter = or(timeComp1, ageComp4);
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals("[[null, null]-[null, null]]", r.toString());
-//        }
-//
-//        // merge overlap range
-//        {
-//            LogicalTupleFilter filter = or(timeComp1, timeComp3);
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals("[[null, null]-[null, null]]", r.toString());
-//        }
-//
-//        // merge too many ranges
-//        {
-//            LogicalTupleFilter filter = or(and(timeComp4, ageComp1), and(timeComp4, ageComp2),
-//                    and(timeComp4, ageComp3));
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals(3, r.size());
-//            assertEquals("[1421280000000, 10]-[1421280000000, 10]", r.get(0).toString());
-//            assertEquals("[1421280000000, 20]-[1421280000000, 20]", r.get(1).toString());
-//            assertEquals("[1421280000000, 30]-[1421280000000, 30]", r.get(2).toString());
-//            planner.setMaxScanRanges(2);
-//            List<GTScanRange> r2 = planner.planScanRanges();
-//            assertEquals("[[1421280000000, 10]-[1421280000000, 30]]", r2.toString());
-//        }
-//    }
-//
-//    @Test
-//    public void verifyFirstRow() throws IOException {
-//        doScanAndVerify(table,
-//                new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null)
-//                        .setFilterPushDown(null).createGTScanRequest(),
-//                "[1421193600000, 30, Yang, 10, 10.5]", //
-//                "[1421193600000, 30, Luke, 10, 10.5]", //
-//                "[1421280000000, 20, Dong, 10, 10.5]", //
-//                "[1421280000000, 20, Jason, 10, 10.5]", //
-//                "[1421280000000, 30, Xu, 10, 10.5]", //
-//                "[1421366400000, 20, Mahone, 10, 10.5]", //
-//                "[1421366400000, 20, Qianhao, 10, 10.5]", //
-//                "[1421366400000, 30, George, 10, 10.5]", //
-//                "[1421366400000, 30, Shaofeng, 10, 10.5]", //
-//                "[1421452800000, 10, Kejia, 10, 10.5]");
-//    }
-//
-//    //for testing GTScanRequest serialization and deserialization
-//    public static GTScanRequest useDeserializedGTScanRequest(GTScanRequest origin) {
-//        ByteBuffer buffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);
-//        GTScanRequest.serializer.serialize(origin, buffer);
-//        buffer.flip();
-//        GTScanRequest sGTScanRequest = GTScanRequest.serializer.deserialize(buffer);
-//
-//        Assert.assertArrayEquals(origin.getAggrMetricsFuncs(), sGTScanRequest.getAggrMetricsFuncs());
-//        Assert.assertEquals(origin.getAggCacheMemThreshold(), sGTScanRequest.getAggCacheMemThreshold(), 0.01);
-//        return sGTScanRequest;
-//    }
-//
-//    @Test
-//    public void verifyScanWithUnevaluatableFilter() throws IOException {
-//        GTInfo info = table.getInfo();
-//
-//        CompareTupleFilter fComp = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
-//        ExtractTupleFilter fUnevaluatable = unevaluatable(info.colRef(1));
-//        LogicalTupleFilter fNotPlusUnevaluatable = not(unevaluatable(info.colRef(1)));
-//        LogicalTupleFilter filter = and(fComp, fUnevaluatable, fNotPlusUnevaluatable);
-//
-//        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
-//                .setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[] { "sum" })
-//                .setFilterPushDown(filter).createGTScanRequest();
-//
-//        // note the unEvaluatable column 1 in filter is added to group by
-//        assertEquals(
-//                "GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [], []], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]",
-//                req.toString());
-//
-//        doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 20, null]",
-//                "[1421280000000, 30, null, 10, null]", "[1421366400000, 20, null, 20, null]",
-//                "[1421366400000, 30, null, 20, null]", "[1421452800000, 10, null, 10, null]");
-//    }
-//
-//    @Test
-//    public void verifyScanWithEvaluatableFilter() throws IOException {
-//        GTInfo info = table.getInfo();
-//
-//        CompareTupleFilter fComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
-//        CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10"));
-//        LogicalTupleFilter filter = and(fComp1, fComp2);
-//
-//        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
-//                .setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[] { "sum" })
-//                .setFilterPushDown(filter).createGTScanRequest();
-//        // note the evaluatable column 1 in filter is added to returned columns but not in group by
-//        assertEquals(
-//                "GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]",
-//                req.toString());
-//
-//        doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 30, null]",
-//                "[1421366400000, 20, null, 40, null]");
-//    }
-//
-//    @Test
-//    public void verifyAggregateAndHavingFilter() throws IOException {
-//        GTInfo info = table.getInfo();
-//
-//        TblColRef havingCol = TblColRef.newInnerColumn("SUM_OF_BIGDECIMAL", InnerDataTypeEnum.LITERAL);
-//        havingCol.getColumnDesc().setId("1"); // point to the first aggregated measure
-//        CompareTupleFilter havingFilter = compare(havingCol, FilterOperatorEnum.GT, "20");
-//
-//        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
-//                .setAggrGroupBy(setOf(1)).setAggrMetrics(setOf(4)).setAggrMetricsFuncs(new String[] { "sum" })
-//                .setHavingFilterPushDown(havingFilter).createGTScanRequest();
-//
-//        doScanAndVerify(table, useDeserializedGTScanRequest(req), "[null, 20, null, null, 42.0]",
-//                "[null, 30, null, null, 52.5]");
-//    }
-//
-//    @SuppressWarnings("unused")
-//    private void testFilterScannerPerfInner(GridTable table, GTInfo info, LogicalTupleFilter filter)
-//            throws IOException {
-//        long start = System.currentTimeMillis();
-//        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
-//                .setFilterPushDown(filter).createGTScanRequest();
-//        int i = 0;
-//        try (IGTScanner scanner = table.scan(req)) {
-//            for (GTRecord r : scanner) {
-//                i++;
-//            }
-//        }
-//        long end = System.currentTimeMillis();
-//        System.out.println(
-//                (end - start) + "ms with filter cache enabled=" + FilterResultCache.DEFAULT_OPTION + ", " + i + " rows");
-//    }
-//
-//    @Test
-//    public void verifyConvertFilterConstants1() {
-//        GTInfo info = table.getInfo();
-//
-//        TableDesc extTable = TableDesc.mockup("ext");
-//        TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
-//        TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
-//
-//        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
-//        CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.EQ, "10");
-//        LogicalTupleFilter filter = and(fComp1, fComp2);
-//
-//        List<TblColRef> colMapping = Lists.newArrayList();
-//        colMapping.add(extColA);
-//        colMapping.add(extColB);
-//
-//        TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-//        assertEquals(
-//                "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 EQ [\\x00]]",
-//                newFilter.toString());
-//    }
-//
-//    @Test
-//    public void verifyConvertFilterConstants2() {
-//        GTInfo info = table.getInfo();
-//
-//        TableDesc extTable = TableDesc.mockup("ext");
-//        TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
-//        TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
-//
-//        List<TblColRef> colMapping = Lists.newArrayList();
-//        colMapping.add(extColA);
-//        colMapping.add(extColB);
-//
-//        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
-//
-//        // $1<"9" round down to FALSE
-//        {
-//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LT, "9"));
-//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-//            assertEquals(
-//                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 ISNOTNULL []]",
-//                    newFilter.toString());
-//        }
-//
-//        // $1<"10" needs no rounding
-//        {
-//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LT, "10"));
-//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-//            assertEquals(
-//                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LT [\\x00]]",
-//                    newFilter.toString());
-//        }
-//
-//        // $1<"11" round down to <="10"
-//        {
-//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LT, "11"));
-//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-//            assertEquals(
-//                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LTE [\\x00]]",
-//                    newFilter.toString());
-//        }
-//
-//        // $1<="9" round down to FALSE
-//        {
-//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LTE, "9"));
-//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-//            assertEquals(ConstantTupleFilter.FALSE, newFilter);
-//        }
-//
-//        // $1<="10" needs no rounding
-//        {
-//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LTE, "10"));
-//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-//            assertEquals(
-//                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LTE [\\x00]]",
-//                    newFilter.toString());
-//        }
-//
-//        // $1<="11" round down to <="10"
-//        {
-//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LTE, "11"));
-//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-//            assertEquals(
-//                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LTE [\\x00]]",
-//                    newFilter.toString());
-//        }
-//    }
-//
-//    @Test
-//    public void verifyConvertFilterConstants3() {
-//        GTInfo info = table.getInfo();
-//
-//        TableDesc extTable = TableDesc.mockup("ext");
-//        TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
-//        TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
-//
-//        List<TblColRef> colMapping = Lists.newArrayList();
-//        colMapping.add(extColA);
-//        colMapping.add(extColB);
-//
-//        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
-//
-//        // $1>"101" round up to FALSE
-//        {
-//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GT, "101"));
-//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-//            assertEquals("AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 ISNOTNULL []]",
-//                    newFilter.toString());
-//        }
-//
-//        // $1>"100" needs no rounding
-//        {
-//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GT, "100"));
-//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-//            assertEquals(
-//                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GT [\\x09]]",
-//                    newFilter.toString());
-//        }
-//
-//        // $1>"99" round up to >="100"
-//        {
-//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GT, "99"));
-//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-//            assertEquals(
-//                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GTE [\\x09]]",
-//                    newFilter.toString());
-//        }
-//
-//        // $1>="101" round up to FALSE
-//        {
-//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GTE, "101"));
-//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-//            assertEquals(ConstantTupleFilter.FALSE, newFilter);
-//        }
-//
-//        // $1>="100" needs no rounding
-//        {
-//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GTE, "100"));
-//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-//            assertEquals(
-//                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GTE [\\x09]]",
-//                    newFilter.toString());
-//        }
-//
-//        // $1>="99" round up to >="100"
-//        {
-//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GTE, "99"));
-//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-//            assertEquals(
-//                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GTE [\\x09]]",
-//                    newFilter.toString());
-//        }
-//    }
-//
-//    @Test
-//    public void verifyConvertFilterConstants4() {
-//        GTInfo info = table.getInfo();
-//
-//        TableDesc extTable = TableDesc.mockup("ext");
-//        TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
-//        TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
-//
-//        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
-//        CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.IN, "9", "10", "15");
-//        LogicalTupleFilter filter = and(fComp1, fComp2);
-//
-//        List<TblColRef> colMapping = Lists.newArrayList();
-//        colMapping.add(extColA);
-//        colMapping.add(extColB);
-//
-//        // $1 in ("9", "10", "15") has only "10" left
-//        TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-//        assertEquals(
-//                "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 IN [\\x00]]",
-//                newFilter.toString());
-//    }
-//
-//    private void doScanAndVerify(GridTable table, GTScanRequest req, String... verifyRows) throws IOException {
-//        System.out.println(req);
-//        try (IGTScanner scanner = table.scan(req)) {
-//            int i = 0;
-//            for (GTRecord r : scanner) {
-//                System.out.println(r);
-//                if (verifyRows == null || i >= verifyRows.length) {
-//                    Assert.fail();
-//                }
-//                assertEquals(verifyRows[i], r.toString());
-//                i++;
-//            }
-//        }
-//    }
-//
-//    public static ByteArray enc(GTInfo info, int col, String value) {
-//        ByteBuffer buf = ByteBuffer.allocate(info.getMaxColumnLength());
-//        info.getCodeSystem().encodeColumnValue(col, value, buf);
-//        return ByteArray.copyOf(buf.array(), buf.arrayOffset(), buf.position());
-//    }
-//
-//    public static ExtractTupleFilter unevaluatable(TblColRef col) {
-//        ExtractTupleFilter r = new ExtractTupleFilter(FilterOperatorEnum.EXTRACT);
-//        r.addChild(new ColumnTupleFilter(col));
-//        return r;
-//    }
-//
-//    public static CompareTupleFilter compare(TblColRef col, FilterOperatorEnum op, Object... value) {
-//        CompareTupleFilter result = new CompareTupleFilter(op);
-//        result.addChild(new ColumnTupleFilter(col));
-//        result.addChild(new ConstantTupleFilter(Arrays.asList(value)));
-//        return result;
-//    }
-//
-//    public static LogicalTupleFilter and(TupleFilter... children) {
-//        return logic(FilterOperatorEnum.AND, children);
-//    }
-//
-//    public static LogicalTupleFilter or(TupleFilter... children) {
-//        return logic(FilterOperatorEnum.OR, children);
-//    }
-//
-//    public static LogicalTupleFilter not(TupleFilter child) {
-//        return logic(FilterOperatorEnum.NOT, child);
-//    }
-//
-//    public static LogicalTupleFilter logic(FilterOperatorEnum op, TupleFilter... children) {
-//        LogicalTupleFilter result = new LogicalTupleFilter(op);
-//        for (TupleFilter c : children) {
-//            result.addChild(c);
-//        }
-//        return result;
-//    }
-//
-//    public static GridTable newTestTable() throws IOException {
-//        GTInfo info = newInfo();
-//        GTSimpleMemStore store = new GTSimpleMemStore(info);
-//        GridTable table = new GridTable(info, store);
-//
-//        GTRecord r = new GTRecord(table.getInfo());
-//        GTBuilder builder = table.rebuild();
-//
-//        builder.write(r.setValues("2015-01-14", "30", "Yang", new Long(10), new BigDecimal("10.5")));
-//        builder.write(r.setValues("2015-01-14", "30", "Luke", new Long(10), new BigDecimal("10.5")));
-//        builder.write(r.setValues("2015-01-15", "20", "Dong", new Long(10), new BigDecimal("10.5")));
-//        builder.write(r.setValues("2015-01-15", "20", "Jason", new Long(10), new BigDecimal("10.5")));
-//        builder.write(r.setValues("2015-01-15", "30", "Xu", new Long(10), new BigDecimal("10.5")));
-//        builder.write(r.setValues("2015-01-16", "20", "Mahone", new Long(10), new BigDecimal("10.5")));
-//        builder.write(r.setValues("2015-01-16", "20", "Qianhao", new Long(10), new BigDecimal("10.5")));
-//        builder.write(r.setValues("2015-01-16", "30", "George", new Long(10), new BigDecimal("10.5")));
-//        builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new Long(10), new BigDecimal("10.5")));
-//        builder.write(r.setValues("2015-01-17", "10", "Kejia", new Long(10), new BigDecimal("10.5")));
-//        builder.close();
-//
-//        return table;
-//    }
-//
-//    static GridTable newTestPerfTable() throws IOException {
-//        GTInfo info = newInfo();
-//        GTSimpleMemStore store = new GTSimpleMemStore(info);
-//        GridTable table = new GridTable(info, store);
-//
-//        GTRecord r = new GTRecord(table.getInfo());
-//        GTBuilder builder = table.rebuild();
-//
-//        for (int i = 0; i < 100000; i++) {
-//            for (int j = 0; j < 10; j++)
-//                builder.write(r.setValues("2015-01-14", "30", "Yang", new Long(10), new BigDecimal("10.5")));
-//
-//            for (int j = 0; j < 10; j++)
-//                builder.write(r.setValues("2015-01-14", "30", "Luke", new Long(10), new BigDecimal("10.5")));
-//
-//            for (int j = 0; j < 10; j++)
-//                builder.write(r.setValues("2015-01-15", "20", "Dong", new Long(10), new BigDecimal("10.5")));
-//
-//            for (int j = 0; j < 10; j++)
-//                builder.write(r.setValues("2015-01-15", "20", "Jason", new Long(10), new BigDecimal("10.5")));
-//
-//            for (int j = 0; j < 10; j++)
-//                builder.write(r.setValues("2015-01-15", "30", "Xu", new Long(10), new BigDecimal("10.5")));
-//
-//            for (int j = 0; j < 10; j++)
-//                builder.write(r.setValues("2015-01-16", "20", "Mahone", new Long(10), new BigDecimal("10.5")));
-//
-//            for (int j = 0; j < 10; j++)
-//                builder.write(r.setValues("2015-01-16", "20", "Qianhao", new Long(10), new BigDecimal("10.5")));
-//
-//            for (int j = 0; j < 10; j++)
-//                builder.write(r.setValues("2015-01-16", "30", "George", new Long(10), new BigDecimal("10.5")));
-//
-//            for (int j = 0; j < 10; j++)
-//                builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new Long(10), new BigDecimal("10.5")));
-//
-//            for (int j = 0; j < 10; j++)
-//                builder.write(r.setValues("2015-01-17", "10", "Kejia", new Long(10), new BigDecimal("10.5")));
-//        }
-//        builder.close();
-//
-//        return table;
-//    }
-//
-//    static GTInfo newInfo() {
-//        Builder builder = GTInfo.builder();
-//        builder.setCodeSystem(newDictCodeSystem());
-//        builder.setColumns(//
-//                DataType.getType("timestamp"), //
-//                DataType.getType("integer"), //
-//                DataType.getType("varchar(10)"), //
-//                DataType.getType("bigint"), //
-//                DataType.getType("decimal") //
-//        );
-//        builder.setPrimaryKey(setOf(0, 1));
-//        builder.setColumnPreferIndex(setOf(0));
-//        builder.enableColumnBlock(new ImmutableBitSet[] { setOf(0, 1), setOf(2), setOf(3, 4) });
-//        builder.enableRowBlock(4);
-//        GTInfo info = builder.build();
-//        return info;
-//    }
-//
-//    private static CubeCodeSystem newDictCodeSystem() {
-//        DimensionEncoding[] dimEncs = new DimensionEncoding[3];
-//        dimEncs[1] = new DictionaryDimEnc(newDictionaryOfInteger());
-//        dimEncs[2] = new DictionaryDimEnc(newDictionaryOfString());
-//        return new CubeCodeSystem(dimEncs);
-//    }
-//
-//    private static Dictionary newDictionaryOfString() {
-//        TrieDictionaryBuilder<String> builder = new TrieDictionaryBuilder<>(new StringBytesConverter());
-//        builder.addValue("Dong");
-//        builder.addValue("George");
-//        builder.addValue("Jason");
-//        builder.addValue("Kejia");
-//        builder.addValue("Luke");
-//        builder.addValue("Mahone");
-//        builder.addValue("Qianhao");
-//        builder.addValue("Shaofeng");
-//        builder.addValue("Xu");
-//        builder.addValue("Yang");
-//        return builder.build(0);
-//    }
-//
-//    private static Dictionary newDictionaryOfInteger() {
-//        NumberDictionaryForestBuilder builder = new NumberDictionaryForestBuilder();
-//        builder.addValue("10");
-//        builder.addValue("20");
-//        builder.addValue("30");
-//        builder.addValue("40");
-//        builder.addValue("50");
-//        builder.addValue("60");
-//        builder.addValue("70");
-//        builder.addValue("80");
-//        builder.addValue("90");
-//        builder.addValue("100");
-//        return builder.build();
-//    }
-//
-//    public static ImmutableBitSet setOf(int... values) {
-//        BitSet set = new BitSet();
-//        for (int i : values)
-//            set.set(i);
-//        return new ImmutableBitSet(set);
-//    }
+
+    private GridTable table;
+    private GTInfo info;
+    private CompareTupleFilter timeComp0;
+    private CompareTupleFilter timeComp1;
+    private CompareTupleFilter timeComp2;
+    private CompareTupleFilter timeComp3;
+    private CompareTupleFilter timeComp4;
+    private CompareTupleFilter timeComp5;
+    private CompareTupleFilter timeComp6;
+    private CompareTupleFilter timeComp7;
+    private CompareTupleFilter ageComp1;
+    private CompareTupleFilter ageComp2;
+    private CompareTupleFilter ageComp3;
+    private CompareTupleFilter ageComp4;
+
+    @After
+    public void after() throws Exception {
+
+        this.cleanupTestMetadata();
+    }
+
+    @Before
+    public void setup() throws IOException {
+
+        this.createTestMetadata();
+
+        table = newTestTable();
+        info = table.getInfo();
+
+        timeComp0 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-14"));
+        timeComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+        timeComp2 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-13"));
+        timeComp3 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-15"));
+        timeComp4 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-15"));
+        timeComp5 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-15"));
+        timeComp6 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-14"));
+        timeComp7 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "1970-01-01"));
+        ageComp1 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "10"));
+        ageComp2 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "20"));
+        ageComp3 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "30"));
+        ageComp4 = compare(info.colRef(1), FilterOperatorEnum.NEQ, enc(info, 1, "30"));
+
+    }
+
+    @Test
+    public void verifySegmentSkipping() {
+
+        ByteArray segmentStart = enc(info, 0, "2015-01-14");
+        ByteArray segmentStartX = enc(info, 0, "2015-01-14 00:00:00");//when partition col is dict encoded, time format will be free
+        assertEquals(segmentStart, segmentStartX);
+
+        {
+            LogicalTupleFilter filter = and(timeComp0, ageComp1);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(1, r.size());//scan range are [close,close]
+            assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
+            assertEquals(1, r.get(0).fuzzyKeys.size());
+            assertEquals("[[null, 10, null, null, null]]", r.get(0).fuzzyKeys.toString());
+        }
+        {
+            LogicalTupleFilter filter = and(timeComp2, ageComp1);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(1, r.size());
+        }
+        {
+            LogicalTupleFilter filter = and(timeComp4, ageComp1);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(1, r.size());
+        }
+        {
+            LogicalTupleFilter filter = and(timeComp5, ageComp1);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(1, r.size());
+        }
+        {
+            LogicalTupleFilter filter = or(and(timeComp2, ageComp1), and(timeComp1, ageComp1),
+                    and(timeComp6, ageComp1));
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(2, r.size());
+            assertEquals("[1421193600000, 10]-[null, 10]", r.get(1).toString());
+            assertEquals("[[null, 10, null, null, null], [1421193600000, 10, null, null, null]]",
+                    r.get(1).fuzzyKeys.toString());
+        }
+        {
+            LogicalTupleFilter filter = or(and(timeComp3, ageComp3), and(timeComp7, ageComp1));
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals("[[0, 10]-[1421280000000, 30]]", r.toString());
+        }
+        {
+            LogicalTupleFilter filter = or(timeComp2, timeComp1, timeComp6);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(2, r.size());
+            assertEquals("[1421193600000, null]-[null, null]", r.get(1).toString());
+            assertEquals(0, r.get(1).fuzzyKeys.size());
+        }
+        {
+            //skip FALSE filter
+            LogicalTupleFilter filter = and(ageComp1, ConstantTupleFilter.FALSE);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(0, r.size());
+        }
+        {
+            //TRUE or FALSE filter
+            LogicalTupleFilter filter = or(ConstantTupleFilter.TRUE, ConstantTupleFilter.FALSE);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(1, r.size());
+            assertEquals("[null, null]-[null, null]", r.get(0).toString());
+        }
+        {
+            //TRUE or other filter
+            LogicalTupleFilter filter = or(ageComp1, ConstantTupleFilter.TRUE);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(1, r.size());
+            assertEquals("[null, null]-[null, null]", r.get(0).toString());
+        }
+    }
+
+    @Test
+    public void verifySegmentSkipping2() {
+        {
+            LogicalTupleFilter filter = and(timeComp0, ageComp1);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(1, r.size());//scan range are [close,close]
+            assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
+            assertEquals(1, r.get(0).fuzzyKeys.size());
+            assertEquals("[[null, 10, null, null, null]]", r.get(0).fuzzyKeys.toString());
+        }
+
+        {
+            LogicalTupleFilter filter = and(timeComp5, ageComp1);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(1, r.size());//scan range are [close,close]
+        }
+    }
+
+    @Test
+    public void verifyScanRangePlanner() {
+
+        // flatten or-and & hbase fuzzy value
+        {
+            LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2));
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(1, r.size());
+            assertEquals("[1421193600000, 10]-[null, 20]", r.get(0).toString());
+            assertEquals("[[null, 10, null, null, null], [null, 20, null, null, null]]", r.get(0).fuzzyKeys.toString());
+        }
+
+        // pre-evaluate ever false
+        {
+            LogicalTupleFilter filter = and(timeComp1, timeComp2);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(0, r.size());
+        }
+
+        // pre-evaluate ever true
+        {
+            LogicalTupleFilter filter = or(timeComp1, ageComp4);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals("[[null, null]-[null, null]]", r.toString());
+        }
+
+        // merge overlap range
+        {
+            LogicalTupleFilter filter = or(timeComp1, timeComp3);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals("[[null, null]-[null, null]]", r.toString());
+        }
+
+        // merge too many ranges
+        {
+            LogicalTupleFilter filter = or(and(timeComp4, ageComp1), and(timeComp4, ageComp2),
+                    and(timeComp4, ageComp3));
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(3, r.size());
+            assertEquals("[1421280000000, 10]-[1421280000000, 10]", r.get(0).toString());
+            assertEquals("[1421280000000, 20]-[1421280000000, 20]", r.get(1).toString());
+            assertEquals("[1421280000000, 30]-[1421280000000, 30]", r.get(2).toString());
+            planner.setMaxScanRanges(2);
+            List<GTScanRange> r2 = planner.planScanRanges();
+            assertEquals("[[1421280000000, 10]-[1421280000000, 30]]", r2.toString());
+        }
+    }
+
+    @Test
+    public void verifyFirstRow() throws IOException {
+        doScanAndVerify(table,
+                new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null)
+                        .setFilterPushDown(null).createGTScanRequest(),
+                "[1421193600000, 30, Yang, 10, 10.5]", //
+                "[1421193600000, 30, Luke, 10, 10.5]", //
+                "[1421280000000, 20, Dong, 10, 10.5]", //
+                "[1421280000000, 20, Jason, 10, 10.5]", //
+                "[1421280000000, 30, Xu, 10, 10.5]", //
+                "[1421366400000, 20, Mahone, 10, 10.5]", //
+                "[1421366400000, 20, Qianhao, 10, 10.5]", //
+                "[1421366400000, 30, George, 10, 10.5]", //
+                "[1421366400000, 30, Shaofeng, 10, 10.5]", //
+                "[1421452800000, 10, Kejia, 10, 10.5]");
+    }
+
+    //for testing GTScanRequest serialization and deserialization
+    public static GTScanRequest useDeserializedGTScanRequest(GTScanRequest origin) {
+        ByteBuffer buffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);
+        GTScanRequest.serializer.serialize(origin, buffer);
+        buffer.flip();
+        GTScanRequest sGTScanRequest = GTScanRequest.serializer.deserialize(buffer);
+
+        Assert.assertArrayEquals(origin.getAggrMetricsFuncs(), sGTScanRequest.getAggrMetricsFuncs());
+        Assert.assertEquals(origin.getAggCacheMemThreshold(), sGTScanRequest.getAggCacheMemThreshold(), 0.01);
+        return sGTScanRequest;
+    }
+
+    @Test
+    public void verifyScanWithUnevaluatableFilter() throws IOException {
+        GTInfo info = table.getInfo();
+
+        CompareTupleFilter fComp = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+        ExtractTupleFilter fUnevaluatable = unevaluatable(info.colRef(1));
+        LogicalTupleFilter fNotPlusUnevaluatable = not(unevaluatable(info.colRef(1)));
+        LogicalTupleFilter filter = and(fComp, fUnevaluatable, fNotPlusUnevaluatable);
+
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
+                .setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[] { "sum" })
+                .setFilterPushDown(filter).createGTScanRequest();
+
+        // note the unEvaluatable column 1 in filter is added to group by
+        assertEquals(
+                "GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [], []], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]",
+                req.toString());
+
+        doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 20, null]",
+                "[1421280000000, 30, null, 10, null]", "[1421366400000, 20, null, 20, null]",
+                "[1421366400000, 30, null, 20, null]", "[1421452800000, 10, null, 10, null]");
+    }
+
+    @Test
+    public void verifyScanWithEvaluatableFilter() throws IOException {
+        GTInfo info = table.getInfo();
+
+        CompareTupleFilter fComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+        CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10"));
+        LogicalTupleFilter filter = and(fComp1, fComp2);
+
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
+                .setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[] { "sum" })
+                .setFilterPushDown(filter).createGTScanRequest();
+        // note the evaluatable column 1 in filter is added to returned columns but not in group by
+        assertEquals(
+                "GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]",
+                req.toString());
+
+        doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 30, null]",
+                "[1421366400000, 20, null, 40, null]");
+    }
+
+    @Test
+    public void verifyAggregateAndHavingFilter() throws IOException {
+        GTInfo info = table.getInfo();
+
+        TblColRef havingCol = TblColRef.newInnerColumn("SUM_OF_BIGDECIMAL", InnerDataTypeEnum.LITERAL);
+        havingCol.getColumnDesc().setId("1"); // point to the first aggregated measure
+        CompareTupleFilter havingFilter = compare(havingCol, FilterOperatorEnum.GT, "20");
+
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
+                .setAggrGroupBy(setOf(1)).setAggrMetrics(setOf(4)).setAggrMetricsFuncs(new String[] { "sum" })
+                .setHavingFilterPushDown(havingFilter).createGTScanRequest();
+
+        doScanAndVerify(table, useDeserializedGTScanRequest(req), "[null, 20, null, null, 42.0]",
+                "[null, 30, null, null, 52.5]");
+    }
+
+    @SuppressWarnings("unused")
+    private void testFilterScannerPerfInner(GridTable table, GTInfo info, LogicalTupleFilter filter)
+            throws IOException {
+        long start = System.currentTimeMillis();
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
+                .setFilterPushDown(filter).createGTScanRequest();
+        int i = 0;
+        try (IGTScanner scanner = table.scan(req)) {
+            for (GTRecord r : scanner) {
+                i++;
+            }
+        }
+        long end = System.currentTimeMillis();
+        System.out.println(
+                (end - start) + "ms with filter cache enabled=" + FilterResultCache.DEFAULT_OPTION + ", " + i + " rows");
+    }
+
+    @Test
+    public void verifyConvertFilterConstants1() {
+        GTInfo info = table.getInfo();
+
+        TableDesc extTable = TableDesc.mockup("ext");
+        TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
+        TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
+
+        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+        CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.EQ, "10");
+        LogicalTupleFilter filter = and(fComp1, fComp2);
+
+        List<TblColRef> colMapping = Lists.newArrayList();
+        colMapping.add(extColA);
+        colMapping.add(extColB);
+
+        TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+        assertEquals(
+                "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 EQ [\\x00]]",
+                newFilter.toString());
+    }
+
+    @Test
+    public void verifyConvertFilterConstants2() {
+        GTInfo info = table.getInfo();
+
+        TableDesc extTable = TableDesc.mockup("ext");
+        TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
+        TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
+
+        List<TblColRef> colMapping = Lists.newArrayList();
+        colMapping.add(extColA);
+        colMapping.add(extColB);
+        
+        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+        
+        // $1<"9" round down to FALSE
+        {
+            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LT, "9"));
+            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+            assertEquals(
+                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 ISNOTNULL []]",
+                    newFilter.toString());
+        }
+
+        // $1<"10" needs no rounding
+        {
+            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LT, "10"));
+            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+            assertEquals(
+                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LT [\\x00]]",
+                    newFilter.toString());
+        }
+        
+        // $1<"11" round down to <="10"
+        {
+            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LT, "11"));
+            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+            assertEquals(
+                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LTE [\\x00]]",
+                    newFilter.toString());
+        }
+        
+        // $1<="9" round down to FALSE
+        {
+            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LTE, "9"));
+            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+            assertEquals(ConstantTupleFilter.FALSE, newFilter);
+        }
+        
+        // $1<="10" needs no rounding
+        {
+            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LTE, "10"));
+            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+            assertEquals(
+                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LTE [\\x00]]",
+                    newFilter.toString());
+        }
+        
+        // $1<="11" round down to <="10"
+        {
+            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LTE, "11"));
+            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+            assertEquals(
+                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LTE [\\x00]]",
+                    newFilter.toString());
+        }
+    }
+
+    @Test
+    public void verifyConvertFilterConstants3() {
+        GTInfo info = table.getInfo();
+
+        TableDesc extTable = TableDesc.mockup("ext");
+        TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
+        TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
+
+        List<TblColRef> colMapping = Lists.newArrayList();
+        colMapping.add(extColA);
+        colMapping.add(extColB);
+        
+        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+        
+        // $1>"101" round up to FALSE
+        {
+            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GT, "101"));
+            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+            assertEquals("AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 ISNOTNULL []]",
+                    newFilter.toString());
+        }
+
+        // $1>"100" needs no rounding
+        {
+            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GT, "100"));
+            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+            assertEquals(
+                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GT [\\x09]]",
+                    newFilter.toString());
+        }
+        
+        // $1>"99" round up to >="100"
+        {
+            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GT, "99"));
+            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+            assertEquals(
+                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GTE [\\x09]]",
+                    newFilter.toString());
+        }
+        
+        // $1>="101" round up to FALSE
+        {
+            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GTE, "101"));
+            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+            assertEquals(ConstantTupleFilter.FALSE, newFilter);
+        }
+        
+        // $1>="100" needs no rounding
+        {
+            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GTE, "100"));
+            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+            assertEquals(
+                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GTE [\\x09]]",
+                    newFilter.toString());
+        }
+        
+        // $1>="99" round up to >="100"
+        {
+            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GTE, "99"));
+            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+            assertEquals(
+                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GTE [\\x09]]",
+                    newFilter.toString());
+        }
+    }
+
+    @Test
+    public void verifyConvertFilterConstants4() {
+        GTInfo info = table.getInfo();
+
+        TableDesc extTable = TableDesc.mockup("ext");
+        TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
+        TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
+
+        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+        CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.IN, "9", "10", "15");
+        LogicalTupleFilter filter = and(fComp1, fComp2);
+
+        List<TblColRef> colMapping = Lists.newArrayList();
+        colMapping.add(extColA);
+        colMapping.add(extColB);
+
+        // $1 in ("9", "10", "15") has only "10" left
+        TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+        assertEquals(
+                "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 IN [\\x00]]",
+                newFilter.toString());
+    }
+
+    private void doScanAndVerify(GridTable table, GTScanRequest req, String... verifyRows) throws IOException {
+        System.out.println(req);
+        try (IGTScanner scanner = table.scan(req)) {
+            int i = 0;
+            for (GTRecord r : scanner) {
+                System.out.println(r);
+                if (verifyRows == null || i >= verifyRows.length) {
+                    Assert.fail();
+                }
+                assertEquals(verifyRows[i], r.toString());
+                i++;
+            }
+        }
+    }
+
+    public static ByteArray enc(GTInfo info, int col, String value) {
+        ByteBuffer buf = ByteBuffer.allocate(info.getMaxColumnLength());
+        info.getCodeSystem().encodeColumnValue(col, value, buf);
+        return ByteArray.copyOf(buf.array(), buf.arrayOffset(), buf.position());
+    }
+
+    public static ExtractTupleFilter unevaluatable(TblColRef col) {
+        ExtractTupleFilter r = new ExtractTupleFilter(FilterOperatorEnum.EXTRACT);
+        r.addChild(new ColumnTupleFilter(col));
+        return r;
+    }
+
+    public static CompareTupleFilter compare(TblColRef col, FilterOperatorEnum op, Object... value) {
+        CompareTupleFilter result = new CompareTupleFilter(op);
+        result.addChild(new ColumnTupleFilter(col));
+        result.addChild(new ConstantTupleFilter(Arrays.asList(value)));
+        return result;
+    }
+
+    public static LogicalTupleFilter and(TupleFilter... children) {
+        return logic(FilterOperatorEnum.AND, children);
+    }
+
+    public static LogicalTupleFilter or(TupleFilter... children) {
+        return logic(FilterOperatorEnum.OR, children);
+    }
+
+    public static LogicalTupleFilter not(TupleFilter child) {
+        return logic(FilterOperatorEnum.NOT, child);
+    }
+
+    public static LogicalTupleFilter logic(FilterOperatorEnum op, TupleFilter... children) {
+        LogicalTupleFilter result = new LogicalTupleFilter(op);
+        for (TupleFilter c : children) {
+            result.addChild(c);
+        }
+        return result;
+    }
+
+    public static GridTable newTestTable() throws IOException {
+        GTInfo info = newInfo();
+        GTSimpleMemStore store = new GTSimpleMemStore(info);
+        GridTable table = new GridTable(info, store);
+
+        GTRecord r = new GTRecord(table.getInfo());
+        GTBuilder builder = table.rebuild();
+
+        builder.write(r.setValues("2015-01-14", "30", "Yang", new Long(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-14", "30", "Luke", new Long(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-15", "20", "Dong", new Long(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-15", "20", "Jason", new Long(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-15", "30", "Xu", new Long(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-16", "20", "Mahone", new Long(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-16", "20", "Qianhao", new Long(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-16", "30", "George", new Long(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new Long(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-17", "10", "Kejia", new Long(10), new BigDecimal("10.5")));
+        builder.close();
+
+        return table;
+    }
+
+    static GridTable newTestPerfTable() throws IOException {
+        GTInfo info = newInfo();
+        GTSimpleMemStore store = new GTSimpleMemStore(info);
+        GridTable table = new GridTable(info, store);
+
+        GTRecord r = new GTRecord(table.getInfo());
+        GTBuilder builder = table.rebuild();
+
+        for (int i = 0; i < 100000; i++) {
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-14", "30", "Yang", new Long(10), new BigDecimal("10.5")));
+
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-14", "30", "Luke", new Long(10), new BigDecimal("10.5")));
+
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-15", "20", "Dong", new Long(10), new BigDecimal("10.5")));
+
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-15", "20", "Jason", new Long(10), new BigDecimal("10.5")));
+
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-15", "30", "Xu", new Long(10), new BigDecimal("10.5")));
+
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-16", "20", "Mahone", new Long(10), new BigDecimal("10.5")));
+
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-16", "20", "Qianhao", new Long(10), new BigDecimal("10.5")));
+
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-16", "30", "George", new Long(10), new BigDecimal("10.5")));
+
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new Long(10), new BigDecimal("10.5")));
+
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-17", "10", "Kejia", new Long(10), new BigDecimal("10.5")));
+        }
+        builder.close();
+
+        return table;
+    }
+
+    static GTInfo newInfo() {
+        Builder builder = GTInfo.builder();
+        builder.setCodeSystem(newDictCodeSystem());
+        builder.setColumns(//
+                DataType.getType("timestamp"), //
+                DataType.getType("integer"), //
+                DataType.getType("varchar(10)"), //
+                DataType.getType("bigint"), //
+                DataType.getType("decimal") //
+        );
+        builder.setPrimaryKey(setOf(0, 1));
+        builder.setColumnPreferIndex(setOf(0));
+        builder.enableColumnBlock(new ImmutableBitSet[] { setOf(0, 1), setOf(2), setOf(3, 4) });
+        builder.enableRowBlock(4);
+        GTInfo info = builder.build();
+        return info;
+    }
+
+    private static CubeCodeSystem newDictCodeSystem() {
+        DimensionEncoding[] dimEncs = new DimensionEncoding[3];
+        dimEncs[1] = new DictionaryDimEnc(newDictionaryOfInteger());
+        dimEncs[2] = new DictionaryDimEnc(newDictionaryOfString());
+        return new CubeCodeSystem(dimEncs);
+    }
+
+    private static Dictionary newDictionaryOfString() {
+        TrieDictionaryBuilder<String> builder = new TrieDictionaryBuilder<>(new StringBytesConverter());
+        builder.addValue("Dong");
+        builder.addValue("George");
+        builder.addValue("Jason");
+        builder.addValue("Kejia");
+        builder.addValue("Luke");
+        builder.addValue("Mahone");
+        builder.addValue("Qianhao");
+        builder.addValue("Shaofeng");
+        builder.addValue("Xu");
+        builder.addValue("Yang");
+        return builder.build(0);
+    }
+
+    private static Dictionary newDictionaryOfInteger() {
+        NumberDictionaryForestBuilder builder = new NumberDictionaryForestBuilder();
+        builder.addValue("10");
+        builder.addValue("20");
+        builder.addValue("30");
+        builder.addValue("40");
+        builder.addValue("50");
+        builder.addValue("60");
+        builder.addValue("70");
+        builder.addValue("80");
+        builder.addValue("90");
+        builder.addValue("100");
+        return builder.build();
+    }
+
+    public static ImmutableBitSet setOf(int... values) {
+        BitSet set = new BitSet();
+        for (int i : values)
+            set.set(i);
+        return new ImmutableBitSet(set);
+    }
 }
diff --git a/pom.xml b/pom.xml
index 3fe15f2..75d33a9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -284,11 +284,11 @@
         <artifactId>kylin-core-metadata</artifactId>
         <version>${project.version}</version>
       </dependency>
-<!--      <dependency>-->
-<!--        <groupId>org.apache.kylin</groupId>-->
-<!--        <artifactId>kylin-core-dictionary</artifactId>-->
-<!--        <version>${project.version}</version>-->
-<!--      </dependency>-->
+      <dependency>
+        <groupId>org.apache.kylin</groupId>
+        <artifactId>kylin-core-dictionary</artifactId>
+        <version>${project.version}</version>
+      </dependency>
       <dependency>
         <groupId>org.apache.kylin</groupId>
         <artifactId>kylin-core-cube</artifactId>
@@ -1530,7 +1530,7 @@
     <module>external</module>
     <module>core-common</module>
     <module>core-metadata</module>
-<!--    <module>core-dictionary</module>-->
+    <module>core-dictionary</module>
     <module>core-cube</module>
     <module>core-job</module>
     <module>core-storage</module>
@@ -1602,7 +1602,7 @@
 
   <profiles>
     <profile>
-      <id>spark2</id>
+      <id>sandbox</id>
       <activation>
         <activeByDefault>true</activeByDefault>
         <property>
@@ -1745,6 +1745,114 @@
         </plugins>
       </build>
     </profile>
+    <profile>
+      <id>cdh5.7</id>
+      <properties>
+        <hadoop2.version>2.6.0-cdh5.7.0</hadoop2.version>
+        <yarn.version>2.6.0-cdh5.7.0</yarn.version>
+        <hive.version>1.1.0-cdh5.7.0</hive.version>
+        <hive-hcatalog.version>1.1.0-cdh5.7.0</hive-hcatalog.version>
+        <hbase-hadoop2.version>1.2.0-cdh5.7.0</hbase-hadoop2.version>
+        <zookeeper.version>3.4.5-cdh5.7.0</zookeeper.version>
+      </properties>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <fork>true</fork>
+              <meminitial>1024m</meminitial>
+              <maxmem>2048m</maxmem>
+            </configuration>
+          </plugin>
+
+    <plugin>
+      <groupId>org.apache.maven.plugins</groupId>
+      <artifactId>maven-dependency-plugin</artifactId>
+      <executions>
+        <execution>
+          <id>copy-jamm</id>
+          <goals>
+            <goal>copy</goal>
+          </goals>
+          <phase>generate-test-resources</phase>
+          <configuration>
+            <artifactItems>
+              <artifactItem>
+                <groupId>com.github.jbellis</groupId>
+                <artifactId>jamm</artifactId>
+                <outputDirectory>${project.build.testOutputDirectory}
+                </outputDirectory>
+                <destFileName>jamm.jar</destFileName>
+              </artifactItem>
+            </artifactItems>
+          </configuration>
+        </execution>
+      </executions>
+    </plugin>
+
+    <plugin>
+      <groupId>org.jacoco</groupId>
+      <artifactId>jacoco-maven-plugin</artifactId>
+      <configuration>
+        <append>true</append>
+        <destFile>
+          ${sonar.jacoco.reportPaths}
+        </destFile>
+      </configuration>
+      <executions>
+        <execution>
+          <id>pre-test</id>
+          <goals>
+            <goal>prepare-agent</goal>
+          </goals>
+          <configuration>
+            <propertyName>surefireArgLine</propertyName>
+          </configuration>
+        </execution>
+        <execution>
+          <id>post-test</id>
+          <phase>test</phase>
+          <goals>
+            <goal>report</goal>
+          </goals>
+        </execution>
+      </executions>
+    </plugin>
+    <plugin>
+      <groupId>org.apache.maven.plugins</groupId>
+      <artifactId>maven-surefire-plugin</artifactId>
+      <version>2.21.0</version>
+      <configuration>
+        <reportsDirectory>${project.basedir}/../target/surefire-reports
+        </reportsDirectory>
+        <excludes>
+          <exclude>**/IT*.java</exclude>
+          <exclude>org.apache.kylin.engine.spark2.NManualBuildAndQueryCuboidTest</exclude>
+          <exclude>org.apache.kylin.engine.spark2.NBuildAndQueryTest</exclude>
+          <exclude>org.apache.kylin.engine.spark2.NBadQueryAndPushDownTest</exclude>
+        </excludes>
+        <systemProperties>
+          <property>
+            <name>buildCubeUsingProvidedData</name>
+            <value>false</value>
+          </property>
+          <property>
+            <name>log4j.configuration</name>
+            <value>
+              file:${project.basedir}/../build/conf/kylin-tools-log4j.properties
+            </value>
+          </property>
+        </systemProperties>
+        <argLine>-javaagent:${project.build.testOutputDirectory}/jamm.jar
+          ${argLine} ${surefireArgLine}
+        </argLine>
+      </configuration>
+    </plugin>
+  </plugins>
+</build>
+</profile>
 <profile>
 <!-- This profile adds/overrides few features of the 'apache-release'
      profile in the parent pom. -->
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java b/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java
new file mode 100644
index 0000000..ad2e20c
--- /dev/null
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query.enumerator;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.kylin.common.debug.BackdoorToggles;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.RealizationEntry;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.tuple.Tuple;
+import org.apache.kylin.query.relnode.OLAPContext;
+import org.apache.kylin.query.schema.OLAPTable;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class LookupTableEnumerator implements Enumerator<Object[]> {
+    private final static Logger logger = LoggerFactory.getLogger(LookupTableEnumerator.class);
+
+    private final ILookupTable lookupTable;
+    private final List<ColumnDesc> colDescs;
+    private final Object[] current;
+    private Iterator<String[]> iterator;
+
+    public LookupTableEnumerator(OLAPContext olapContext) {
+
+        //TODO: assuming LookupTableEnumerator is handled by a cube
+        CubeInstance cube = null;
+
+        if (olapContext.realization instanceof CubeInstance) {
+            cube = (CubeInstance) olapContext.realization;
+            ProjectInstance project = cube.getProjectInstance();
+            List<RealizationEntry> realizationEntries = project.getRealizationEntries();
+            String lookupTableName = olapContext.firstTableScan.getTableName();
+            CubeManager cubeMgr = CubeManager.getInstance(cube.getConfig());
+
+            // Make force hit cube in lookup table
+            String forceHitCubeName = BackdoorToggles.getForceHitCube();
+            if (!StringUtil.isEmpty(forceHitCubeName)) {
+                String forceHitCubeNameLower = forceHitCubeName.toLowerCase(Locale.ROOT);
+                String[] forceHitCubeNames = forceHitCubeNameLower.split(",");
+                final Set<String> forceHitCubeNameSet = new HashSet<String>(Arrays.asList(forceHitCubeNames));
+                cube = cubeMgr.findLatestSnapshot(
+                        (List<RealizationEntry>) realizationEntries.stream()
+                                .filter(x -> forceHitCubeNameSet.contains(x.getRealization().toLowerCase(Locale.ROOT))),
+                        lookupTableName, cube);
+                olapContext.realization = cube;
+            } else {
+                cube = cubeMgr.findLatestSnapshot(realizationEntries, lookupTableName, cube);
+                olapContext.realization = cube;
+            }
+        } else if (olapContext.realization instanceof HybridInstance) {
+            final HybridInstance hybridInstance = (HybridInstance) olapContext.realization;
+            final IRealization latestRealization = hybridInstance.getLatestRealization();
+            if (latestRealization instanceof CubeInstance) {
+                cube = (CubeInstance) latestRealization;
+            } else {
+                throw new IllegalStateException();
+            }
+        }
+
+        String lookupTableName = olapContext.firstTableScan.getTableName();
+        DimensionDesc dim = cube.getDescriptor().findDimensionByTable(lookupTableName);
+        if (dim == null)
+            throw new IllegalStateException("No dimension with derived columns found for lookup table " + lookupTableName + ", cube desc " + cube.getDescriptor());
+
+        CubeManager cubeMgr = CubeManager.getInstance(cube.getConfig());
+        this.lookupTable = cubeMgr.getLookupTable(cube.getLatestReadySegment(), dim.getJoin());
+
+        OLAPTable olapTable = (OLAPTable) olapContext.firstTableScan.getOlapTable();
+        this.colDescs = olapTable.getSourceColumns();
+        this.current = new Object[colDescs.size()];
+
+        reset();
+    }
+
+    @Override
+    public boolean moveNext() {
+        boolean hasNext = iterator.hasNext();
+        if (hasNext) {
+            String[] row = iterator.next();
+            for (int i = 0, n = colDescs.size(); i < n; i++) {
+                ColumnDesc colDesc = colDescs.get(i);
+                int colIdx = colDesc.getZeroBasedIndex();
+                if (colIdx >= 0) {
+                    current[i] = Tuple.convertOptiqCellValue(row[colIdx], colDesc.getUpgradedType().getName());
+                } else {
+                    current[i] = null; // fake column
+                }
+            }
+        }
+        return hasNext;
+    }
+
+    @Override
+    public Object[] current() {
+        // NOTE if without the copy, sql_lookup/query03.sql will yields messy result. Very weird coz other lookup queries are all good.
+        return Arrays.copyOf(current, current.length);
+    }
+
+    @Override
+    public void reset() {
+        this.iterator = lookupTable.iterator();
+    }
+
+    @Override
+    public void close() {
+        try {
+            lookupTable.close();
+        } catch (IOException e) {
+            logger.error("error when close lookup table", e);
+        }
+    }
+
+}
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
index 6c0b5b1..c094ff5 100644
--- a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
@@ -64,6 +64,10 @@ public class OLAPQuery extends AbstractEnumerable<Object[]> implements Enumerabl
         case OLAP:
             return BackdoorToggles.getPrepareOnly() ? new EmptyEnumerator()
                     : new OLAPEnumerator(olapContext, optiqContext);
+        case LOOKUP_TABLE:
+            return BackdoorToggles.getPrepareOnly() ? new EmptyEnumerator() : new LookupTableEnumerator(olapContext);
+        case COL_DICT:
+            return BackdoorToggles.getPrepareOnly() ? new EmptyEnumerator() : new DictionaryEnumerator(olapContext);
         case HIVE:
             return BackdoorToggles.getPrepareOnly() ? new EmptyEnumerator() : new HiveEnumerator(olapContext);
         default:
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
index 6ab86a8..bfd6c4d 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
@@ -38,6 +38,7 @@ import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexProgram;
 import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.metadata.filter.FilterOptimizeTransformer;
 import org.apache.kylin.metadata.filter.LogicalTupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter;
@@ -54,7 +55,7 @@ public class OLAPFilterRel extends Filter implements OLAPRel {
 
     ColumnRowType columnRowType;
     OLAPContext context;
-    boolean autoJustTimezone = false;
+    boolean autoJustTimezone = KylinConfig.getInstanceFromEnv().getStreamingDerivedTimeTimezone().length() > 0;
 
     public OLAPFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
         super(cluster, traits, child, condition);
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java b/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java
index dd79a4f..ffff10b 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java
@@ -37,6 +37,7 @@ import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.calcite.util.NlsString;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.metadata.filter.CaseTupleFilter;
@@ -59,6 +60,7 @@ import java.util.GregorianCalendar;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TimeZone;
 
 public class TupleFilterVisitor extends RexVisitorImpl<TupleFilter> {
 
@@ -68,7 +70,8 @@ public class TupleFilterVisitor extends RexVisitorImpl<TupleFilter> {
 
     // is the fact table is a streamingv2 table
     private boolean autoJustByTimezone = false;
-    private static final long TIME_ZONE_OFFSET = 0;
+    private static final long TIME_ZONE_OFFSET = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getStreamingDerivedTimeTimezone())
+            .getRawOffset();
 
     public TupleFilterVisitor(ColumnRowType inputRowType) {
         super(true);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index a8285ae..00370e3 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -36,7 +36,7 @@ import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.cube.CubeInstance;
-//import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.cuboid.TreeCuboidScheduler;
@@ -308,20 +308,20 @@ public class CubeController extends BasicController {
      *
      * @throws IOException
      */
-//    @RequestMapping(value = "/{cubeName}/segs/{segmentName}/refresh_lookup", method = {
-//            RequestMethod.PUT }, produces = { "application/json" })
-//    @ResponseBody
-//    public CubeInstance rebuildLookupSnapshot(@PathVariable String cubeName, @PathVariable String segmentName,
-//            @RequestParam(value = "lookupTable") String lookupTable) {
-//        try {
-//            final CubeManager cubeMgr = cubeService.getCubeManager();
-//            final CubeInstance cube = cubeMgr.getCube(cubeName);
-//            return cubeService.rebuildLookupSnapshot(cube, segmentName, lookupTable);
-//        } catch (IOException e) {
-//            logger.error(e.getLocalizedMessage(), e);
-//            throw new InternalErrorException(e.getLocalizedMessage(), e);
-//        }
-//    }
+    @RequestMapping(value = "/{cubeName}/segs/{segmentName}/refresh_lookup", method = {
+            RequestMethod.PUT }, produces = { "application/json" })
+    @ResponseBody
+    public CubeInstance rebuildLookupSnapshot(@PathVariable String cubeName, @PathVariable String segmentName,
+            @RequestParam(value = "lookupTable") String lookupTable) {
+        try {
+            final CubeManager cubeMgr = cubeService.getCubeManager();
+            final CubeInstance cube = cubeMgr.getCube(cubeName);
+            return cubeService.rebuildLookupSnapshot(cube, segmentName, lookupTable);
+        } catch (IOException e) {
+            logger.error(e.getLocalizedMessage(), e);
+            throw new InternalErrorException(e.getLocalizedMessage(), e);
+        }
+    }
 
     /**
      * Delete a cube segment
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index dd45599..bd30109 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -64,7 +64,7 @@ import org.apache.kylin.metadata.model.IStorageAware;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
-//import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.project.RealizationEntry;
@@ -592,20 +592,20 @@ public class CubeService extends BasicService implements InitializingBean {
         return getCubeManager().updateCube(update);
     }
 
-//    public CubeInstance rebuildLookupSnapshot(CubeInstance cube, String segmentName, String lookupTable)
-//            throws IOException {
-//        aclEvaluate.checkProjectOperationPermission(cube);
-//        Message msg = MsgPicker.getMsg();
-//        TableDesc tableDesc = getTableManager().getTableDesc(lookupTable, cube.getProject());
-//        if (tableDesc.isView()) {
-//            throw new BadRequestException(
-//                    String.format(Locale.ROOT, msg.getREBUILD_SNAPSHOT_OF_VIEW(), tableDesc.getName()));
-//        }
-//        CubeSegment seg = cube.getSegment(segmentName, SegmentStatusEnum.READY);
-//        getCubeManager().buildSnapshotTable(seg, lookupTable, null);
-//
-//        return cube;
-//    }
+    public CubeInstance rebuildLookupSnapshot(CubeInstance cube, String segmentName, String lookupTable)
+            throws IOException {
+        aclEvaluate.checkProjectOperationPermission(cube);
+        Message msg = MsgPicker.getMsg();
+        TableDesc tableDesc = getTableManager().getTableDesc(lookupTable, cube.getProject());
+        if (tableDesc.isView()) {
+            throw new BadRequestException(
+                    String.format(Locale.ROOT, msg.getREBUILD_SNAPSHOT_OF_VIEW(), tableDesc.getName()));
+        }
+        CubeSegment seg = cube.getSegment(segmentName, SegmentStatusEnum.READY);
+        getCubeManager().buildSnapshotTable(seg, lookupTable, null);
+
+        return cube;
+    }
 
     public CubeInstance deleteSegmentById(CubeInstance cube, String uuid) throws IOException {
         aclEvaluate.checkProjectWritePermission(cube);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
index 966fd1f..6f282ff 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -47,6 +47,9 @@ import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.dict.lookup.SnapshotManager;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.engine.spark.source.CsvSource;
 import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.CsvColumnDesc;
@@ -60,6 +63,8 @@ import org.apache.kylin.rest.msg.MsgPicker;
 import org.apache.kylin.rest.response.TableDescResponse;
 import org.apache.kylin.rest.response.TableSnapshotResponse;
 import org.apache.kylin.rest.util.AclEvaluate;
+import org.apache.kylin.source.IReadableTable;
+import org.apache.kylin.source.IReadableTable.TableSignature;
 import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.ISourceMetadataExplorer;
 import org.apache.kylin.source.SourceManager;
@@ -357,32 +362,38 @@ public class TableService extends BasicService {
 //    }
 
     public List<TableSnapshotResponse> getLookupTableSnapshots(String project, String tableName) throws IOException {
-        return Lists.newArrayList();
+        TableDesc tableDesc = getTableManager().getTableDesc(tableName, project);
+        if (SourceManager.getSource(tableDesc).getClass() == CsvSource.class) {
+            return new ArrayList<>();
+        }
+        IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc, null);
+        TableSignature signature = hiveTable.getSignature();
+        return internalGetLookupTableSnapshots(tableName, signature);
+    }
+
+    List<TableSnapshotResponse> internalGetLookupTableSnapshots(String tableName, TableSignature signature)
+            throws IOException {
+        SnapshotManager snapshotManager = SnapshotManager.getInstance(getConfig());
+        List<SnapshotTable> metaStoreTableSnapshots = snapshotManager.getSnapshots(tableName, signature);
+
+        Map<String, List<String>> snapshotUsageMap = getSnapshotUsages();
+
+        List<TableSnapshotResponse> result = Lists.newArrayList();
+
+        for (SnapshotTable metaStoreTableSnapshot : metaStoreTableSnapshots) {
+            TableSnapshotResponse response = new TableSnapshotResponse();
+            response.setSnapshotID(metaStoreTableSnapshot.getId());
+            response.setSnapshotType(TableSnapshotResponse.TYPE_INNER);
+            response.setLastBuildTime(metaStoreTableSnapshot.getLastBuildTime());
+            response.setStorageType(SnapshotTable.STORAGE_TYPE_METASTORE);
+            response.setSourceTableSize(metaStoreTableSnapshot.getSignature().getSize());
+            response.setSourceTableLastModifyTime(metaStoreTableSnapshot.getSignature().getLastModifiedTime());
+            response.setCubesAndSegmentsUsage(snapshotUsageMap.get(metaStoreTableSnapshot.getResourcePath()));
+            result.add(response);
+        }
+
+        return result;
     }
-//
-//    List<TableSnapshotResponse> internalGetLookupTableSnapshots(String tableName, TableSignature signature)
-//            throws IOException {
-//        SnapshotManager snapshotManager = SnapshotManager.getInstance(getConfig());
-//        List<SnapshotTable> metaStoreTableSnapshots = snapshotManager.getSnapshots(tableName, signature);
-//
-//        Map<String, List<String>> snapshotUsageMap = getSnapshotUsages();
-//
-//        List<TableSnapshotResponse> result = Lists.newArrayList();
-//
-//        for (SnapshotTable metaStoreTableSnapshot : metaStoreTableSnapshots) {
-//            TableSnapshotResponse response = new TableSnapshotResponse();
-//            response.setSnapshotID(metaStoreTableSnapshot.getId());
-//            response.setSnapshotType(TableSnapshotResponse.TYPE_INNER);
-//            response.setLastBuildTime(metaStoreTableSnapshot.getLastBuildTime());
-//            response.setStorageType(SnapshotTable.STORAGE_TYPE_METASTORE);
-//            response.setSourceTableSize(metaStoreTableSnapshot.getSignature().getSize());
-//            response.setSourceTableLastModifyTime(metaStoreTableSnapshot.getSignature().getLastModifiedTime());
-//            response.setCubesAndSegmentsUsage(snapshotUsageMap.get(metaStoreTableSnapshot.getResourcePath()));
-//            result.add(response);
-//        }
-//
-//        return result;
-//    }
 
     /**
      * @return Map of SnapshotID, CubeName or SegmentName list
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index 6236ef9..8ff5719 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Base64;
+import java.util.Objects;
 import java.util.Set;
 import java.util.Locale;
 import java.util.Collections;
@@ -92,15 +93,15 @@ public class HiveInputBase {
 
             // create global dict
             KylinConfig dictConfig = (flatDesc.getSegment()).getConfig();
-//            String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumns();
-//            if (mrHiveDictColumns.length > 0) {
-//                String globalDictDatabase = dictConfig.getMrHiveDictDB();
-//                if (null == globalDictDatabase) {
-//                    throw new IllegalArgumentException("Mr-Hive Global dict database is null.");
-//                }
-//                String globalDictTable = cubeName + dictConfig.getMrHiveDictTableSuffix();
-//                addStepPhase1_DoCreateMrHiveGlobalDict(jobFlow, mrHiveDictColumns, globalDictDatabase, globalDictTable);
-//            }
+            String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumns();
+            if (mrHiveDictColumns.length > 0) {
+                String globalDictDatabase = dictConfig.getMrHiveDictDB();
+                if (null == globalDictDatabase) {
+                    throw new IllegalArgumentException("Mr-Hive Global dict database is null.");
+                }
+                String globalDictTable = cubeName + dictConfig.getMrHiveDictTableSuffix();
+                addStepPhase1_DoCreateMrHiveGlobalDict(jobFlow, mrHiveDictColumns, globalDictDatabase, globalDictTable);
+            }
 
             // then count and redistribute
             if (cubeConfig.isHiveRedistributeEnabled()) {
@@ -288,13 +289,13 @@ public class HiveInputBase {
             deleteTables.add(getIntermediateTableIdentity());
 
             // mr-hive dict and inner table do not need delete hdfs
-//            String[] mrHiveDicts = flatDesc.getSegment().getConfig().getMrHiveDictColumns();
-//            if (Objects.nonNull(mrHiveDicts) && mrHiveDicts.length > 0) {
-//                String dictDb = flatDesc.getSegment().getConfig().getMrHiveDictDB();
-//                String tableName = dictDb + "." + flatDesc.getTableName() + "_"
-//                        + MRHiveDictUtil.DictHiveType.GroupBy.getName();
-//                deleteTables.add(tableName);
-//            }
+            String[] mrHiveDicts = flatDesc.getSegment().getConfig().getMrHiveDictColumns();
+            if (Objects.nonNull(mrHiveDicts) && mrHiveDicts.length > 0) {
+                String dictDb = flatDesc.getSegment().getConfig().getMrHiveDictDB();
+                String tableName = dictDb + "." + flatDesc.getTableName() + "_"
+                        + MRHiveDictUtil.DictHiveType.GroupBy.getName();
+                deleteTables.add(tableName);
+            }
             step.setIntermediateTables(deleteTables);
 
             step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir)));