You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/07/06 03:51:54 UTC

[kylin] 01/01: Revert "Remove unused property, class and maven module which is for Kylin 3 only"

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch revert-1679-kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit d92e40878612fe1d4297169365dee0642ec45bb4
Author: Xiaoxiang Yu <xx...@apache.org>
AuthorDate: Tue Jul 6 11:50:58 2021 +0800

    Revert "Remove unused property, class and maven module which is for Kylin 3 only"
    
    This reverts commit e9291e3b232cb628bbfa8dc9fc21fbab1d491a19.
---
 .../kylin/engine/mr/SortedColumnDFSFile.java       |    6 +-
 .../engine/mr/common/MapReduceExecutable.java      |    6 +
 .../kylin/engine/mr/common/MapReduceUtil.java      |  267 ++--
 .../org/apache/kylin/common/KylinConfigBase.java   |  813 ++++++++----
 .../apache/kylin/common/annotation/ConfigTag.java  |    3 -
 .../org/apache/kylin/common/util/HadoopUtil.java   |    4 +
 .../org/apache/kylin/common/KylinConfigTest.java   |   20 +-
 core-cube/pom.xml                                  |    8 +-
 .../java/org/apache/kylin/cube/CubeManager.java    |  341 ++---
 .../java/org/apache/kylin/cube/CubeSegment.java    |   34 +-
 .../kylin/cube/cli/DictionaryGeneratorCLI.java     |  187 +++
 .../apache/kylin/cube/cli/DumpDictionaryCLI.java   |   62 +
 .../java/org/apache/kylin/cube/model/CubeDesc.java |   46 +-
 .../apache/kylin/cube/model/SnapshotTableDesc.java |   10 +-
 .../cube/model/validation/rule/DictionaryRule.java |    6 +
 .../org/apache/kylin/cube/util/CubingUtils.java    |  284 ++--
 .../model/validation/rule/DictionaryRuleTest.java  |   14 +
 .../org/apache/kylin/dict/DictionaryGenerator.java |  382 +++---
 .../apache/kylin/dict/lookup}/ILookupTable.java    |    2 +-
 .../apache/kylin/dict/lookup/SnapshotManager.java  |   12 +-
 .../java/org/apache/kylin/job/JoinedFlatTable.java |   22 +-
 .../kylin/dimension/DimensionEncodingFactory.java  |    1 -
 .../org/apache/kylin/storage/StorageFactory.java   |   13 +
 .../storage/gtrecord/CubeScanRangePlanner.java     |   15 +-
 .../kylin/storage/gtrecord/CubeSegmentScanner.java |    6 +
 .../kylin/storage/gtrecord/CubeTupleConverter.java |    4 +-
 .../storage/gtrecord/GTCubeStorageQueryBase.java   |    2 +-
 .../storage/translate/DerivedFilterTranslator.java |    2 +-
 .../kylin/storage/gtrecord/DictGridTableTest.java  | 1380 ++++++++++----------
 pom.xml                                            |  122 +-
 .../query/enumerator/LookupTableEnumerator.java    |  147 +++
 .../apache/kylin/query/enumerator/OLAPQuery.java   |    4 +
 .../apache/kylin/query/relnode/OLAPFilterRel.java  |    3 +-
 .../query/relnode/visitor/TupleFilterVisitor.java  |    5 +-
 .../kylin/rest/controller/CubeController.java      |   30 +-
 .../org/apache/kylin/rest/service/CubeService.java |   30 +-
 .../apache/kylin/rest/service/TableService.java    |   61 +-
 .../apache/kylin/source/hive/HiveInputBase.java    |   33 +-
 38 files changed, 2673 insertions(+), 1714 deletions(-)

diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java
index 507898d..bcf4b98 100644
--- a/build-engine/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java
+++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.dict.ByteComparator;
+import org.apache.kylin.dict.StringBytesConverter;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.source.IReadableTable;
 import org.slf4j.Logger;
@@ -90,9 +92,9 @@ public class SortedColumnDFSFile implements IReadableTable {
     }
 
     private Comparator<String> getComparatorByType(DataType type) {
-        Comparator<String> comparator = null;
+        Comparator<String> comparator;
         if (!type.isNumberFamily()) {
-//            comparator = new ByteComparator<>(new StringBytesConverter());
+            comparator = new ByteComparator<>(new StringBytesConverter());
         } else if (type.isIntegerFamily()) {
             comparator = new Comparator<String>() {
                 @Override
diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index c6c80f4..4978fa0 100755
--- a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -500,6 +500,12 @@ public class MapReduceExecutable extends AbstractExecutable {
         for (Map.Entry<String, String> entry : configOverride.getMRConfigOverride().entrySet()) {
             conf.set(entry.getKey(), entry.getValue());
         }
+        if (conf.get("mapreduce.job.is-mem-hungry") != null
+                && Boolean.parseBoolean(conf.get("mapreduce.job.is-mem-hungry"))) {
+            for (Map.Entry<String, String> entry : configOverride.getMemHungryConfigOverride().entrySet()) {
+                conf.set(entry.getKey(), entry.getValue());
+            }
+        }
 
         if (StringUtils.isNotBlank(cubeName)) {
             remainingArgs.add("-" + BatchConstants.ARG_CUBE_NAME);
diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
index 4fa025b..ecde4aa 100644
--- a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
+++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
@@ -18,130 +18,147 @@
 
 package org.apache.kylin.engine.mr.common;
 
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.job.exception.JobException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
 public class MapReduceUtil {
-//
-//    private static final Logger logger = LoggerFactory.getLogger(MapReduceUtil.class);
-//
-//    /**
-//     * @return reducer number for calculating hll
-//     */
-//    public static int getCuboidHLLCounterReducerNum(CubeInstance cube) {
-//        int nCuboids = cube.getCuboidScheduler().getAllCuboidIds().size();
-//        int shardBase = (nCuboids - 1) / cube.getConfig().getHadoopJobPerReducerHLLCuboidNumber() + 1;
-//
-//        int hllMaxReducerNumber = cube.getConfig().getHadoopJobHLLMaxReducerNumber();
-//        if (shardBase > hllMaxReducerNumber) {
-//            shardBase = hllMaxReducerNumber;
-//        }
-//        return shardBase;
-//    }
-//
-//    /**
-//     * @param cuboidScheduler specified can provide more flexibility
-//     * */
-//    public static int getLayeredCubingReduceTaskNum(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler,
-//            double totalMapInputMB, int level)
-//            throws ClassNotFoundException, IOException, InterruptedException, JobException {
-//        CubeDesc cubeDesc = cubeSegment.getCubeDesc();
-//        KylinConfig kylinConfig = cubeDesc.getConfig();
-//
-//        double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
-//        double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
-//        logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level "
-//                + level);
-//
-//        CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, cuboidScheduler, kylinConfig);
-//
-//        double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst;
-//
-//        if (level == -1) {
-//            //merge case
-//            double estimatedSize = cubeStatsReader.estimateCubeSize();
-//            adjustedCurrentLayerSizeEst = estimatedSize > totalMapInputMB ? totalMapInputMB : estimatedSize;
-//            logger.debug("estimated size {}, input size {}, adjustedCurrentLayerSizeEst: {}", estimatedSize,
-//                    totalMapInputMB, adjustedCurrentLayerSizeEst);
-//        } else if (level == 0) {
-//            //base cuboid case TODO: the estimation could be very WRONG because it has no correction
-//            adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0);
-//            logger.debug("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst);
-//        } else {
-//            parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1);
-//            currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level);
-//            adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst;
-//            logger.debug(
-//                    "totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}",
-//                    totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst);
-//        }
-//
-//        // number of reduce tasks
-//        int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / perReduceInputMB * reduceCountRatio + 0.99);
-//
-//        // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance
-//        if (cubeDesc.hasMemoryHungryMeasures()) {
-//            logger.debug("Multiply reducer num by 4 to boost performance for memory hungry measures");
-//            numReduceTasks = numReduceTasks * 4;
-//        }
-//
-//        // at least 1 reducer by default
-//        numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
-//        // no more than 500 reducer by default
-//        numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
-//
-//        return numReduceTasks;
-//    }
-//
-//    public static int getInmemCubingReduceTaskNum(CubeSegment cubeSeg, CuboidScheduler cuboidScheduler)
-//            throws IOException {
-//        KylinConfig kylinConfig = cubeSeg.getConfig();
-//
-//        Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, cuboidScheduler, kylinConfig).getCuboidSizeMap();
-//        double totalSizeInM = 0;
-//        for (Double cuboidSize : cubeSizeMap.values()) {
-//            totalSizeInM += cuboidSize;
-//        }
-//        return getReduceTaskNum(totalSizeInM, kylinConfig);
-//    }
-//
-//    // @return the first indicates the total reducer number, the second indicates the reducer number for base cuboid
-//    public static Pair<Integer, Integer> getConvergeCuboidDataReduceTaskNums(CubeSegment cubeSeg) throws IOException {
-//        long baseCuboidId = cubeSeg.getCuboidScheduler().getBaseCuboidId();
-//
-//        Set<Long> overlapCuboids = Sets.newHashSet(cubeSeg.getCuboidScheduler().getAllCuboidIds());
-//        overlapCuboids.retainAll(cubeSeg.getCubeInstance().getCuboidsRecommend());
-//        overlapCuboids.add(baseCuboidId);
-//
-//        Pair<Map<Long, Long>, Long> cuboidStats = CuboidStatsReaderUtil
-//                .readCuboidStatsWithSourceFromSegment(overlapCuboids, cubeSeg);
-//        Map<Long, Double> cubeSizeMap = CubeStatsReader.getCuboidSizeMapFromRowCount(cubeSeg, cuboidStats.getFirst(),
-//                cuboidStats.getSecond());
-//        double totalSizeInM = 0;
-//        for (Double cuboidSize : cubeSizeMap.values()) {
-//            totalSizeInM += cuboidSize;
-//        }
-//
-//        double baseSizeInM = cubeSizeMap.get(baseCuboidId);
-//
-//        KylinConfig kylinConfig = cubeSeg.getConfig();
-//        int nBase = getReduceTaskNum(baseSizeInM, kylinConfig);
-//        int nOther = getReduceTaskNum(totalSizeInM - baseSizeInM, kylinConfig);
-//        return new Pair<>(nBase + nOther, nBase);
-//    }
-//
-//    private static int getReduceTaskNum(double totalSizeInM, KylinConfig kylinConfig) {
-//        double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
-//        double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
-//
-//        // number of reduce tasks
-//        int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB * reduceCountRatio);
-//
-//        // at least 1 reducer by default
-//        numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
-//        // no more than 500 reducer by default
-//        numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
-//
-//        logger.info("Having total map input MB " + Math.round(totalSizeInM));
-//        logger.info("Having per reduce MB " + perReduceInputMB);
-//        logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks);
-//        return numReduceTasks;
-//    }
+
+    private static final Logger logger = LoggerFactory.getLogger(MapReduceUtil.class);
+
+    /**
+     * @return reducer number for calculating hll
+     */
+    public static int getCuboidHLLCounterReducerNum(CubeInstance cube) {
+        int nCuboids = cube.getCuboidScheduler().getAllCuboidIds().size();
+        int shardBase = (nCuboids - 1) / cube.getConfig().getHadoopJobPerReducerHLLCuboidNumber() + 1;
+
+        int hllMaxReducerNumber = cube.getConfig().getHadoopJobHLLMaxReducerNumber();
+        if (shardBase > hllMaxReducerNumber) {
+            shardBase = hllMaxReducerNumber;
+        }
+        return shardBase;
+    }
+
+    /**
+     * @param cuboidScheduler specified can provide more flexibility
+     * */
+    public static int getLayeredCubingReduceTaskNum(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler,
+            double totalMapInputMB, int level)
+            throws ClassNotFoundException, IOException, InterruptedException, JobException {
+        CubeDesc cubeDesc = cubeSegment.getCubeDesc();
+        KylinConfig kylinConfig = cubeDesc.getConfig();
+
+        double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
+        double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
+        logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level "
+                + level);
+
+        CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, cuboidScheduler, kylinConfig);
+
+        double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst;
+
+        if (level == -1) {
+            //merge case
+            double estimatedSize = cubeStatsReader.estimateCubeSize();
+            adjustedCurrentLayerSizeEst = estimatedSize > totalMapInputMB ? totalMapInputMB : estimatedSize;
+            logger.debug("estimated size {}, input size {}, adjustedCurrentLayerSizeEst: {}", estimatedSize,
+                    totalMapInputMB, adjustedCurrentLayerSizeEst);
+        } else if (level == 0) {
+            //base cuboid case TODO: the estimation could be very WRONG because it has no correction
+            adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0);
+            logger.debug("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst);
+        } else {
+            parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1);
+            currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level);
+            adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst;
+            logger.debug(
+                    "totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}",
+                    totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst);
+        }
+
+        // number of reduce tasks
+        int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / perReduceInputMB * reduceCountRatio + 0.99);
+
+        // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance
+        if (cubeDesc.hasMemoryHungryMeasures()) {
+            logger.debug("Multiply reducer num by 4 to boost performance for memory hungry measures");
+            numReduceTasks = numReduceTasks * 4;
+        }
+
+        // at least 1 reducer by default
+        numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
+        // no more than 500 reducer by default
+        numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
+
+        return numReduceTasks;
+    }
+
+    public static int getInmemCubingReduceTaskNum(CubeSegment cubeSeg, CuboidScheduler cuboidScheduler)
+            throws IOException {
+        KylinConfig kylinConfig = cubeSeg.getConfig();
+
+        Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, cuboidScheduler, kylinConfig).getCuboidSizeMap();
+        double totalSizeInM = 0;
+        for (Double cuboidSize : cubeSizeMap.values()) {
+            totalSizeInM += cuboidSize;
+        }
+        return getReduceTaskNum(totalSizeInM, kylinConfig);
+    }
+
+    // @return the first indicates the total reducer number, the second indicates the reducer number for base cuboid
+    public static Pair<Integer, Integer> getConvergeCuboidDataReduceTaskNums(CubeSegment cubeSeg) throws IOException {
+        long baseCuboidId = cubeSeg.getCuboidScheduler().getBaseCuboidId();
+
+        Set<Long> overlapCuboids = Sets.newHashSet(cubeSeg.getCuboidScheduler().getAllCuboidIds());
+        overlapCuboids.retainAll(cubeSeg.getCubeInstance().getCuboidsRecommend());
+        overlapCuboids.add(baseCuboidId);
+
+        Pair<Map<Long, Long>, Long> cuboidStats = CuboidStatsReaderUtil
+                .readCuboidStatsWithSourceFromSegment(overlapCuboids, cubeSeg);
+        Map<Long, Double> cubeSizeMap = CubeStatsReader.getCuboidSizeMapFromRowCount(cubeSeg, cuboidStats.getFirst(),
+                cuboidStats.getSecond());
+        double totalSizeInM = 0;
+        for (Double cuboidSize : cubeSizeMap.values()) {
+            totalSizeInM += cuboidSize;
+        }
+
+        double baseSizeInM = cubeSizeMap.get(baseCuboidId);
+
+        KylinConfig kylinConfig = cubeSeg.getConfig();
+        int nBase = getReduceTaskNum(baseSizeInM, kylinConfig);
+        int nOther = getReduceTaskNum(totalSizeInM - baseSizeInM, kylinConfig);
+        return new Pair<>(nBase + nOther, nBase);
+    }
+
+    private static int getReduceTaskNum(double totalSizeInM, KylinConfig kylinConfig) {
+        double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
+        double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
+
+        // number of reduce tasks
+        int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB * reduceCountRatio);
+
+        // at least 1 reducer by default
+        numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
+        // no more than 500 reducer by default
+        numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
+
+        logger.info("Having total map input MB " + Math.round(totalSizeInM));
+        logger.info("Having per reduce MB " + perReduceInputMB);
+        logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks);
+        return numReduceTasks;
+    }
 }
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index fc76506..39f8eae 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -292,14 +292,6 @@ public abstract class KylinConfigBase implements Serializable {
                 || "LOCAL".equals(getOptional("kylin.env", "DEV"));
     }
 
-    public boolean isUTEnv() {
-        return "UT".equals(getDeployEnv());
-    }
-
-    public boolean isLocalEnv() {
-        return "LOCAL".equals(getDeployEnv());
-    }
-
     public String getDeployEnv() {
         return getOptional("kylin.env", "DEV");
     }
@@ -351,7 +343,7 @@ public abstract class KylinConfigBase implements Serializable {
         String root = getOptional("kylin.env.hdfs-metastore-bigcell-dir");
 
         if (root == null) {
-            return getHdfsWorkingDirectory();
+            return getJdbcHdfsWorkingDirectory();
         }
 
         Path path = new Path(root);
@@ -360,8 +352,12 @@ public abstract class KylinConfigBase implements Serializable {
                     "kylin.env.hdfs-metastore-bigcell-dir must be absolute, but got " + root);
 
         // make sure path is qualified
-        FileSystem fs = HadoopUtil.getWorkingFileSystem();
-        path = fs.makeQualified(path);
+        try {
+            FileSystem fs = HadoopUtil.getReadFileSystem();
+            path = fs.makeQualified(path);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
 
         root = new Path(path, StringUtils.replaceChars(getMetadataUrlPrefix(), ':', '-')).toString();
 
@@ -378,6 +374,28 @@ public abstract class KylinConfigBase implements Serializable {
         return cachedBigCellDirectory;
     }
 
+    public String getReadHdfsWorkingDirectory() {
+        if (StringUtils.isNotEmpty(getHBaseClusterFs())) {
+            Path workingDir = new Path(getHdfsWorkingDirectory());
+            return new Path(getHBaseClusterFs(), Path.getPathWithoutSchemeAndAuthority(workingDir)).toString() + "/";
+        }
+
+        return getHdfsWorkingDirectory();
+    }
+
+    private String getJdbcHdfsWorkingDirectory() {
+        if (StringUtils.isNotEmpty(getJdbcFileSystem())) {
+            Path workingDir = new Path(getReadHdfsWorkingDirectory());
+            return new Path(getJdbcFileSystem(), Path.getPathWithoutSchemeAndAuthority(workingDir)).toString() + "/";
+        }
+
+        return getReadHdfsWorkingDirectory();
+    }
+
+    private String getJdbcFileSystem() {
+        return getOptional("kylin.storage.columnar.jdbc.file-system", "");
+    }
+
     public String getHdfsWorkingDirectory(String project) {
         if (isProjectIsolationEnabled() && project != null) {
             return new Path(getHdfsWorkingDirectory(), project).toString() + "/";
@@ -461,7 +479,7 @@ public abstract class KylinConfigBase implements Serializable {
         Map<String, String> r = Maps.newLinkedHashMap();
         // ref constants in ISourceAware
         r.put("", "org.apache.kylin.common.persistence.FileResourceStore");
-//        r.put("hbase", "org.apache.kylin.storage.hbase.HBaseResourceStore");
+        r.put("hbase", "org.apache.kylin.storage.hbase.HBaseResourceStore");
         r.put("hdfs", "org.apache.kylin.common.persistence.HDFSResourceStore");
         r.put("ifile", "org.apache.kylin.common.persistence.IdentifierFileResourceStore");
         r.put("jdbc", "org.apache.kylin.common.persistence.JDBCResourceStore");
@@ -516,7 +534,6 @@ public abstract class KylinConfigBase implements Serializable {
         return (DistributedLockFactory) ClassUtil.newInstance(clsName);
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHBaseMappingAdapter() {
         return getOptional("kylin.metadata.hbasemapping-adapter");
     }
@@ -525,17 +542,14 @@ public abstract class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.metadata.check-copy-on-write", FALSE));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHbaseClientScannerTimeoutPeriod() {
         return getOptional("kylin.metadata.hbase-client-scanner-timeout-period", "10000");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHbaseRpcTimeout() {
         return getOptional("kylin.metadata.hbase-rpc-timeout", "5000");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHbaseClientRetriesNumber() {
         return getOptional("kylin.metadata.hbase-client-retries-number", "1");
     }
@@ -548,31 +562,119 @@ public abstract class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.metadata.able-change-string-to-datetime", "false"));
     }
 
-    public String getMetadataDialect() {
-        return getOptional("kylin.metadata.jdbc.dialect", "mysql");
+    // ============================================================================
+    // DICTIONARY & SNAPSHOT
+    // ============================================================================
+
+    public boolean isUseForestTrieDictionary() {
+        return Boolean.parseBoolean(getOptional("kylin.dictionary.use-forest-trie", TRUE));
     }
 
-    public boolean isJsonAlwaysSmallCell() {
-        return Boolean.parseBoolean(getOptional("kylin.metadata.jdbc.json-always-small-cell", TRUE));
+    public long getTrieDictionaryForestMaxTrieSizeMB() {
+        return Integer.parseInt(getOptional("kylin.dictionary.forest-trie-max-mb", "500"));
     }
 
-    public int getSmallCellMetadataWarningThreshold() {
-        return Integer.parseInt(
-                getOptional("kylin.metadata.jdbc.small-cell-meta-size-warning-threshold", String.valueOf(100 << 20))); //100mb
+    public long getCachedDictMaxEntrySize() {
+        return Long.parseLong(getOptional("kylin.dictionary.max-cache-entry", "3000"));
     }
 
-    public int getSmallCellMetadataErrorThreshold() {
-        return Integer.parseInt(
-                getOptional("kylin.metadata.jdbc.small-cell-meta-size-error-threshold", String.valueOf(1 << 30))); // 1gb
+    public int getCachedDictMaxSize() {
+        return Integer.parseInt(getOptional("kylin.dictionary.max-cache-size", "-1"));
     }
 
-    public int getJdbcResourceStoreMaxCellSize() {
-        return Integer.parseInt(getOptional("kylin.metadata.jdbc.max-cell-size", "1048576")); // 1mb
+    public boolean isGrowingDictEnabled() {
+        return Boolean.parseBoolean(this.getOptional("kylin.dictionary.growing-enabled", FALSE));
+    }
+
+    public boolean isDictResuable() {
+        return Boolean.parseBoolean(this.getOptional("kylin.dictionary.resuable", FALSE));
+    }
+
+    public long getCachedDictionaryMaxEntrySize() {
+        return Long.parseLong(getOptional("kylin.dictionary.cached-dict-max-cache-entry", "50000"));
+    }
+
+    public int getAppendDictEntrySize() {
+        return Integer.parseInt(getOptional("kylin.dictionary.append-entry-size", "10000000"));
+    }
+
+    public int getAppendDictMaxVersions() {
+        return Integer.parseInt(getOptional("kylin.dictionary.append-max-versions", "3"));
+    }
+
+    public int getAppendDictVersionTTL() {
+        return Integer.parseInt(getOptional("kylin.dictionary.append-version-ttl", "259200000"));
+    }
+
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    public int getCachedSnapshotMaxEntrySize() {
+        return Integer.parseInt(getOptional("kylin.snapshot.max-cache-entry", "500"));
+    }
+
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    public int getTableSnapshotMaxMB() {
+        return Integer.parseInt(getOptional("kylin.snapshot.max-mb", "300"));
+    }
+
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    public int getExtTableSnapshotShardingMB() {
+        return Integer.parseInt(getOptional("kylin.snapshot.ext.shard-mb", "500"));
+    }
+
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    public String getExtTableSnapshotLocalCachePath() {
+        return getOptional("kylin.snapshot.ext.local.cache.path", "lookup_cache");
+    }
+
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    public double getExtTableSnapshotLocalCacheMaxSizeGB() {
+        return Double.parseDouble(getOptional("kylin.snapshot.ext.local.cache.max-size-gb", "200"));
+    }
+
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    public long getExtTableSnapshotLocalCacheCheckVolatileRange() {
+        return Long.parseLong(getOptional("kylin.snapshot.ext.local.cache.check.volatile", "3600000"));
     }
 
+    @ConfigTag({ConfigTag.Tag.DEPRECATED, ConfigTag.Tag.CUBE_LEVEL})
+    public boolean isShrunkenDictFromGlobalEnabled() {
+        return Boolean.parseBoolean(this.getOptional("kylin.dictionary.shrunken-from-global-enabled", TRUE));
+    }
+
+
     // ============================================================================
-    // DICTIONARY & SNAPSHOT
+    // Hive Global Dictionary
+    //
+    // ============================================================================
+
+    /**
+     * @return if mr-hive dict not enabled, return empty array
+     * else return array contains "{TABLE_NAME}_{COLUMN_NAME}"
+     */
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    public String[] getMrHiveDictColumns() {
+        String columnStr = getOptional("kylin.dictionary.mr-hive.columns", "");
+        if (!columnStr.equals("")) {
+            return columnStr.split(",");
+        }
+        return new String[0];
+    }
+
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    public String getMrHiveDictDB() {
+        return getOptional("kylin.dictionary.mr-hive.database", getHiveDatabaseForIntermediateTable());
+    }
+
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    public String getMrHiveDictTableSuffix() {
+        return getOptional("kylin.dictionary.mr-hive.table.suffix", "_global_dict");
+    }
+
     // ============================================================================
+    // Distributed/Spark Global dictionary
+    // Add wiki link here
+    // ============================================================================
+
 
     public int getGlobalDictV2MinHashPartitions() {
         return Integer.parseInt(getOptional("kylin.dictionary.globalV2-min-hash-partitions", "10"));
@@ -606,42 +708,6 @@ public abstract class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.dictionary.globalV2-check", "true"));
     }
 
-    /*
-     * Detect dataset skew in dictionary encode step.
-     * */
-    public boolean detectDataSkewInDictEncodingEnabled() {
-        return Boolean.valueOf(getOptional("kylin.dictionary.detect.data.skew.in.encoding", "false"));
-    }
-
-    /*
-     * In some data skew cases, the repartition step during dictionary encoding will be slow.
-     * We can choose to sample from the dataset to detect skewed. This configuration is used to set the sample rate.
-     * */
-    public double sampleRateInEncodingSkewDetection() {
-        return Double.valueOf(getOptional("kylin.dictionary.detect.data.skew.sample.rate", "0.1"));
-    }
-
-    /*
-     * In KYLIN4, dictionaries are hashed into several buckets, column data are repartitioned by the same hash algorithm
-     * during encoding step too. In data skew cases, the repartition step will be very slow. Kylin will automatically
-     * sample from the source to detect skewed data and repartition these skewed data to random partitions.
-     * This configuration is used to set the skew data threshhold, valued from 0 to 1.
-     * e.g.
-     *   if you set this value to 0.05, for each value that takes up more than 5% percent of the total will be regarded
-     *   as skew data, as a result the skewed data will be no more than 20 records
-     * */
-    public double skewPercentageThreshHold() {
-        return Double.valueOf(getOptional("kylin.dictionary.data.skew.percentage.threshhold", "0.05"));
-    }
-
-    public boolean isSnapshotParallelBuildEnabled() {
-        return Boolean.parseBoolean(getOptional("kylin.snapshot.parallel-build-enabled", "true"));
-    }
-
-    public int snapshotParallelBuildTimeoutSeconds() {
-        return Integer.parseInt(getOptional("kylin.snapshot.parallel-build-timeout-seconds", "3600"));
-    }
-
     // ============================================================================
     // CUBE
     // ============================================================================
@@ -973,6 +1039,14 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.job.error-record-threshold", "0"));
     }
 
+    public boolean isAdvancedFlatTableUsed() {
+        return Boolean.parseBoolean(getOptional("kylin.job.use-advanced-flat-table", FALSE));
+    }
+
+    public String getAdvancedFlatTableClass() {
+        return getOptional("kylin.job.advanced-flat-table.class");
+    }
+
     public String getJobTrackingURLPattern() {
         return getOptional("kylin.job.tracking-url-pattern", "");
     }
@@ -1032,7 +1106,7 @@ public abstract class KylinConfigBase implements Serializable {
     /**
      * was for route to hive, not used any more
      */
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    @Deprecated
     public String getHiveUrl() {
         return getOptional("kylin.source.hive.connection-url", "");
     }
@@ -1040,7 +1114,7 @@ public abstract class KylinConfigBase implements Serializable {
     /**
      * was for route to hive, not used any more
      */
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    @Deprecated
     public String getHiveUser() {
         return getOptional("kylin.source.hive.connection-user", "");
     }
@@ -1048,87 +1122,71 @@ public abstract class KylinConfigBase implements Serializable {
     /**
      * was for route to hive, not used any more
      */
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    @Deprecated
     public String getHivePassword() {
         return getOptional("kylin.source.hive.connection-password", "");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public Map<String, String> getHiveConfigOverride() {
         return getPropertiesByPrefix("kylin.source.hive.config-override.");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getOverrideHiveTableLocation(String table) {
         return getOptional("kylin.source.hive.table-location." + table.toUpperCase(Locale.ROOT));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isHiveKeepFlatTable() {
         return Boolean.parseBoolean(this.getOptional("kylin.source.hive.keep-flat-table", FALSE));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHiveDatabaseForIntermediateTable() {
         return CliCommandExecutor.checkHiveProperty(this.getOptional("kylin.source.hive.database-for-flat-table", DEFAULT));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getFlatTableStorageFormat() {
         return this.getOptional("kylin.source.hive.flat-table-storage-format", "SEQUENCEFILE");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getFlatTableFieldDelimiter() {
         return this.getOptional("kylin.source.hive.flat-table-field-delimiter", "\u001F");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isHiveRedistributeEnabled() {
         return Boolean.parseBoolean(this.getOptional("kylin.source.hive.redistribute-flat-table", TRUE));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHiveClientMode() {
         return getOptional("kylin.source.hive.client", "cli");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHiveBeelineShell() {
         return getOptional("kylin.source.hive.beeline-shell", "beeline");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHiveBeelineParams() {
         return getOptional("kylin.source.hive.beeline-params", "");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean getEnableSparkSqlForTableOps() {
         return Boolean.parseBoolean(getOptional("kylin.source.hive.enable-sparksql-for-table-ops", FALSE));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getSparkSqlBeelineShell() {
         return getOptional("kylin.source.hive.sparksql-beeline-shell", "");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getSparkSqlBeelineParams() {
         return getOptional("kylin.source.hive.sparksql-beeline-params", "");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean getHiveTableDirCreateFirst() {
         return Boolean.parseBoolean(getOptional("kylin.source.hive.table-dir-create-first", FALSE));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getFlatHiveTableClusterByDictColumn() {
         return getOptional("kylin.source.hive.flat-table-cluster-by-dict-column");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getHiveRedistributeColumnCount() {
         return Integer.parseInt(getOptional("kylin.source.hive.redistribute-column-count", "3"));
     }
@@ -1185,7 +1243,7 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
     // ============================================================================
-    // SOURCE.KAFKA(Removed)
+    // SOURCE.KAFKA
     // ============================================================================
 
     @ConfigTag(ConfigTag.Tag.NOT_IMPLEMENTED)
@@ -1194,7 +1252,7 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
     // ============================================================================
-    // SOURCE.JDBC(Removed)
+    // SOURCE.JDBC
     // ============================================================================
 
     @ConfigTag(ConfigTag.Tag.NOT_IMPLEMENTED)
@@ -1253,7 +1311,7 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
     // ============================================================================
-    // STORAGE.HBASE(Removed)
+    // STORAGE.HBASE
     // ============================================================================
 
     public Map<Integer, String> getStorageEngines() {
@@ -1463,7 +1521,7 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
     // ============================================================================
-    // ENGINE.MR(Removed)
+    // ENGINE.MR
     // ============================================================================
 
     public Map<Integer, String> getJobEngines() {
@@ -1482,6 +1540,8 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.engine.default", "6"));
     }
 
+
+
     public String getKylinJobJarPath() {
         final String jobJar = getOptional(KYLIN_ENGINE_MR_JOB_JAR);
         if (StringUtils.isNotEmpty(jobJar)) {
@@ -1499,63 +1559,76 @@ public abstract class KylinConfigBase implements Serializable {
         System.setProperty(KYLIN_ENGINE_MR_JOB_JAR, path);
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getKylinJobMRLibDir() {
         return getOptional("kylin.engine.mr.lib-dir", "");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public Map<String, String> getMRConfigOverride() {
         return getPropertiesByPrefix("kylin.engine.mr.config-override.");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
-    public int getHadoopJobMinReducerNumber() {
-        return Integer.parseInt(getOptional("kylin.engine.mr.min-reducer-number", "1"));
+    // used for some mem-hungry step
+    public Map<String, String> getMemHungryConfigOverride() {
+        return getPropertiesByPrefix("kylin.engine.mr.mem-hungry-config-override.");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
-    public int getHadoopJobMaxReducerNumber() {
-        return Integer.parseInt(getOptional("kylin.engine.mr.max-reducer-number", "500"));
+    public Map<String, String> getUHCMRConfigOverride() {
+        return getPropertiesByPrefix("kylin.engine.mr.uhc-config-override.");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
-    public int getHadoopJobMapperInputRows() {
-        return Integer.parseInt(getOptional("kylin.engine.mr.mapper-input-rows", "1000000"));
+    public Map<String, String> getBaseCuboidMRConfigOverride() {
+        return getPropertiesByPrefix("kylin.engine.mr.base-cuboid-config-override.");
+    }
+
+    public Map<String, String> getSparkConfigOverride() {
+        return getPropertiesByPrefix("kylin.engine.spark-conf.");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public Map<String, String> getFlinkConfigOverride() {
         return getPropertiesByPrefix("kylin.engine.flink-conf.");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public Map<String, String> getSparkConfigOverrideWithSpecificName(String configName) {
         return getPropertiesByPrefix("kylin.engine.spark-conf-" + configName + ".");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public Map<String, String> getFlinkConfigOverrideWithSpecificName(String configName) {
         return getPropertiesByPrefix("kylin.engine.flink-conf-" + configName + ".");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    public double getDefaultHadoopJobReducerInputMB() {
+        return Double.parseDouble(getOptional("kylin.engine.mr.reduce-input-mb", "500"));
+    }
+
+    public double getDefaultHadoopJobReducerCountRatio() {
+        return Double.parseDouble(getOptional("kylin.engine.mr.reduce-count-ratio", "1.0"));
+    }
+
+    public int getHadoopJobMinReducerNumber() {
+        return Integer.parseInt(getOptional("kylin.engine.mr.min-reducer-number", "1"));
+    }
+
+    public int getHadoopJobMaxReducerNumber() {
+        return Integer.parseInt(getOptional("kylin.engine.mr.max-reducer-number", "500"));
+    }
+
+    public int getHadoopJobMapperInputRows() {
+        return Integer.parseInt(getOptional("kylin.engine.mr.mapper-input-rows", "1000000"));
+    }
+
     public int getCuboidStatsCalculatorMaxNumber() {
         // set 1 to disable multi-thread statistics calculation
         return Integer.parseInt(getOptional("kylin.engine.mr.max-cuboid-stats-calculator-number", "1"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getCuboidNumberPerStatsCalculator() {
         return Integer.parseInt(getOptional("kylin.engine.mr.cuboid-number-per-stats-calculator", "100"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getHadoopJobPerReducerHLLCuboidNumber() {
         return Integer.parseInt(getOptional("kylin.engine.mr.per-reducer-hll-cuboid-number", "100"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getHadoopJobHLLMaxReducerNumber() {
         // by default multi-reducer hll calculation is disabled
         return Integer.parseInt(getOptional("kylin.engine.mr.hll-max-reducer-number", "1"));
@@ -1566,27 +1639,22 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.engine.mr.uhc-reducer-count", "3"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isBuildUHCDictWithMREnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.mr.build-uhc-dict-in-additional-step", FALSE));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isBuildDictInReducerEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.mr.build-dict-in-reducer", TRUE));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getYarnStatusCheckUrl() {
         return getOptional("kylin.engine.mr.yarn-check-status-url", null);
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getYarnStatusCheckIntervalSeconds() {
         return Integer.parseInt(getOptional("kylin.engine.mr.yarn-check-interval-seconds", "10"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isUseLocalClasspathEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.mr.use-local-classpath", TRUE));
     }
@@ -1595,114 +1663,122 @@ public abstract class KylinConfigBase implements Serializable {
      * different version hive use different UNION style
      * https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Union
      */
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHiveUnionStyle() {
         return getOptional("kylin.hive.union.style", "UNION");
     }
 
     // ============================================================================
-    // ENGINE.SPARK (DEPRECATED)
+    // ENGINE.SPARK
     // ============================================================================
 
     public String getHadoopConfDir() {
         return getOptional("kylin.env.hadoop-conf-dir", "");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    /**
+     * Get the sparder app name, default value is: 'sparder_on_localhost-7070'
+     */
+    public String getSparderAppName() {
+        String customSparderAppName = getOptional("kylin.query.sparder-context.app-name", "");
+        if (StringUtils.isEmpty(customSparderAppName)) {
+            customSparderAppName =
+                    "sparder_on_" + getServerRestAddress().replaceAll(":", "-");
+        }
+        return customSparderAppName;
+    }
+
     public String getSparkAdditionalJars() {
         return getOptional("kylin.engine.spark.additional-jars", "");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getFlinkAdditionalJars() {
         return getOptional("kylin.engine.flink.additional-jars", "");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public float getSparkRDDPartitionCutMB() {
         return Float.parseFloat(getOptional("kylin.engine.spark.rdd-partition-cut-mb", "10.0"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public float getFlinkPartitionCutMB() {
         return Float.parseFloat(getOptional("kylin.engine.flink.partition-cut-mb", "10.0"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getSparkMinPartition() {
         return Integer.parseInt(getOptional("kylin.engine.spark.min-partition", "1"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getFlinkMinPartition() {
         return Integer.parseInt(getOptional("kylin.engine.flink.min-partition", "1"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getSparkMaxPartition() {
         return Integer.parseInt(getOptional("kylin.engine.spark.max-partition", "5000"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getFlinkMaxPartition() {
         return Integer.parseInt(getOptional("kylin.engine.flink.max-partition", "5000"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getSparkStorageLevel() {
         return getOptional("kylin.engine.spark.storage-level", "MEMORY_AND_DISK_SER");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isSparkSanityCheckEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.spark.sanity-check-enabled", FALSE));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    public boolean isSparkFactDistinctEnable() {
+        return Boolean.parseBoolean(getOptional("kylin.engine.spark-fact-distinct", "false"));
+    }
+
+    public boolean isSparkUHCDictionaryEnable() {
+        return Boolean.parseBoolean(getOptional("kylin.engine.spark-udc-dictionary", "false"));
+    }
+
+    public boolean isSparkCardinalityEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.engine.spark-cardinality", "false"));
+    }
+
     public int getSparkOutputMaxSize() {
         return Integer.valueOf(getOptional("kylin.engine.spark.output.max-size", "10485760"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isSparkDimensionDictionaryEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.spark-dimension-dictionary", "false"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isFlinkSanityCheckEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.flink.sanity-check-enabled", FALSE));
     }
 
+    public boolean isSparCreateHiveTableViaSparkEnable() {
+        return Boolean.parseBoolean(getOptional("kylin.engine.spark-create-table-enabled", "false"));
+    }
+
     // ============================================================================
     // ENGINE.LIVY
     // ============================================================================
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isLivyEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.livy-conf.livy-enabled", FALSE));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getLivyRestApiBacktick() {
         return getOptional("kylin.engine.livy.backtick.quote", "");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getLivyUrl() {
         return getOptional("kylin.engine.livy-conf.livy-url");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public Map<String, String> getLivyKey() {
         return getPropertiesByPrefix("kylin.engine.livy-conf.livy-key.");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public Map<String, String> getLivyArr() {
         return getPropertiesByPrefix("kylin.engine.livy-conf.livy-arr.");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public Map<String, String> getLivyMap() {
         return getPropertiesByPrefix("kylin.engine.livy-conf.livy-map.");
     }
@@ -1763,7 +1839,6 @@ public abstract class KylinConfigBase implements Serializable {
 
     // check KYLIN-3358, need deploy coprocessor if enabled
     // finally should be deprecated
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isDynamicColumnEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.query.enable-dynamic-column", FALSE));
     }
@@ -1818,13 +1893,11 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.query.project-concurrent-running-threshold", "0"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public long getQueryMaxScanBytes() {
         long value = Long.parseLong(getOptional("kylin.query.max-scan-bytes", "0"));
         return value > 0 ? value : Long.MAX_VALUE;
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public long getQueryMaxReturnRows() {
         return Integer.parseInt(this.getOptional("kylin.query.max-return-rows", "5000000"));
     }
@@ -1942,7 +2015,6 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.query.max-dimension-count-distinct", "5000000"));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public Map<String, String> getUDFs() {
         Map<String, String> udfMap = Maps.newLinkedHashMap();
         udfMap.put("version", "org.apache.kylin.query.udf.VersionUDF");
@@ -1975,12 +2047,10 @@ public abstract class KylinConfigBase implements Serializable {
         return this.getOptional("kylin.query.schema-factory", "org.apache.kylin.query.schema.OLAPSchemaFactory");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getPushDownRunnerClassName() {
         return getOptional("kylin.query.pushdown.runner-class-name", "");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public List<String> getPushDownRunnerIds() {
         List<String> ids = Lists.newArrayList();
         String idsStr = getOptional("kylin.query.pushdown.runner.ids", "");
@@ -1992,7 +2062,6 @@ public abstract class KylinConfigBase implements Serializable {
         return ids;
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String[] getPushDownConverterClassNames() {
         return getOptionalStringArray("kylin.query.pushdown.converter-class-names",
                 new String[]{"org.apache.kylin.source.adhocquery.HivePushDownConverter"});
@@ -2002,7 +2071,6 @@ public abstract class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(this.getOptional("kylin.query.pushdown.cache-enabled", FALSE));
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getJdbcUrl(String id) {
         if (null == id) {
             return getOptional("kylin.query.pushdown.jdbc.url", "");
@@ -2011,7 +2079,6 @@ public abstract class KylinConfigBase implements Serializable {
         }
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getJdbcDriverClass(String id) {
         if (null == id) {
             return getOptional("kylin.query.pushdown.jdbc.driver", "");
@@ -2020,7 +2087,6 @@ public abstract class KylinConfigBase implements Serializable {
         }
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getJdbcUsername(String id) {
         if (null == id) {
             return getOptional("kylin.query.pushdown.jdbc.username", "");
@@ -2029,7 +2095,6 @@ public abstract class KylinConfigBase implements Serializable {
         }
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getJdbcPassword(String id) {
         if (null == id) {
             return getOptional("kylin.query.pushdown.jdbc.password", "");
@@ -2038,7 +2103,6 @@ public abstract class KylinConfigBase implements Serializable {
         }
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getPoolMaxTotal(String id) {
         if (null == id) {
             return Integer.parseInt(
@@ -2051,7 +2115,6 @@ public abstract class KylinConfigBase implements Serializable {
         }
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getPoolMaxIdle(String id) {
         if (null == id) {
             return Integer.parseInt(
@@ -2064,7 +2127,6 @@ public abstract class KylinConfigBase implements Serializable {
         }
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getPoolMinIdle(String id) {
         if (null == id) {
             return Integer.parseInt(
@@ -2378,23 +2440,221 @@ public abstract class KylinConfigBase implements Serializable {
         return getOptional("kylin.tool.auto-migrate-cube.dest-config", "");
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
-    public String getJdbcSourceAdaptor() {
-        return getOptional("kylin.source.jdbc.adaptor");
-    }
-
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
-    public boolean isLimitPushDownEnabled() {
-        return Boolean.parseBoolean(getOptional("kylin.storage.limit-push-down-enabled", TRUE));
-    }
-
     // ============================================================================
-    // Realtime streaming (Removed)
+    // jdbc metadata resource store
     // ============================================================================
 
+    public String getMetadataDialect() {
+        return getOptional("kylin.metadata.jdbc.dialect", "mysql");
+    }
+
+    public boolean isJsonAlwaysSmallCell() {
+        return Boolean.parseBoolean(getOptional("kylin.metadata.jdbc.json-always-small-cell", TRUE));
+    }
+
+    public int getSmallCellMetadataWarningThreshold() {
+        return Integer.parseInt(
+                getOptional("kylin.metadata.jdbc.small-cell-meta-size-warning-threshold", String.valueOf(100 << 20))); //100mb
+    }
+
+    public int getSmallCellMetadataErrorThreshold() {
+        return Integer.parseInt(
+                getOptional("kylin.metadata.jdbc.small-cell-meta-size-error-threshold", String.valueOf(1 << 30))); // 1gb
+    }
+
+    public int getJdbcResourceStoreMaxCellSize() {
+        return Integer.parseInt(getOptional("kylin.metadata.jdbc.max-cell-size", "1048576")); // 1mb
+    }
+
+    public String getJdbcSourceAdaptor() {
+        return getOptional("kylin.source.jdbc.adaptor");
+    }
+
+    public boolean isLimitPushDownEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.storage.limit-push-down-enabled", TRUE));
+    }
+
+    // ============================================================================
+    // Realtime streaming
+    // ============================================================================
+    public String getStreamingStoreClass() {
+        return getOptional("kylin.stream.store.class",
+                "org.apache.kylin.stream.core.storage.columnar.ColumnarSegmentStore");
+    }
+
+    public String getStreamingBasicCuboidJobDFSBlockSize() {
+        return getOptional("kylin.stream.job.dfs.block.size", String.valueOf(16 * 1024 * 1024));
+    }
+
+    public String getStreamingIndexPath() {
+        return getOptional("kylin.stream.index.path", "stream_index");
+    }
+
+    public int getStreamingCubeConsumerTasksNum() {
+        return Integer.parseInt(getOptional("kylin.stream.cube-num-of-consumer-tasks", "3"));
+    }
+
+    public int getStreamingCubeWindowInSecs() {
+        return Integer.parseInt(getOptional("kylin.stream.cube.window", "3600"));
+    }
+
+    public int getStreamingCubeDurationInSecs() {
+        return Integer.parseInt(getOptional("kylin.stream.cube.duration", "7200"));
+    }
+
+    public int getStreamingCubeMaxDurationInSecs() {
+        return Integer.parseInt(getOptional("kylin.stream.cube.duration.max", "43200"));
+    }
+
+    public int getStreamingCheckPointFileMaxNum() {
+        return Integer.parseInt(getOptional("kylin.stream.checkpoint.file.max.num", "5"));
+    }
+
+    public int getStreamingCheckPointIntervalsInSecs() {
+        return Integer.parseInt(getOptional("kylin.stream.index.checkpoint.intervals", "300"));
+    }
+
+    public int getStreamingIndexMaxRows() {
+        return Integer.parseInt(getOptional("kylin.stream.index.maxrows", "50000"));
+    }
+
+    public int getStreamingMaxImmutableSegments() {
+        return Integer.parseInt(getOptional("kylin.stream.immutable.segments.max.num", "100"));
+    }
+
+    public boolean isStreamingConsumeFromLatestOffsets() {
+        return Boolean.parseBoolean(getOptional("kylin.stream.consume.offsets.latest", "true"));
+    }
+
+    public String getStreamingNode() {
+        return getOptional("kylin.stream.node", null);
+    }
+
+    public Map<String, String> getStreamingNodeProperties() {
+        return getPropertiesByPrefix("kylin.stream.node");
+    }
+
+    public String getStreamingMetadataStoreType() {
+        return getOptional("kylin.stream.metadata.store.type", "zk");
+    }
+
+    public String getStreamingSegmentRetentionPolicy() {
+        return getOptional("kylin.stream.segment.retention.policy", "fullBuild");
+    }
+
+    public String getStreamingAssigner() {
+        return getOptional("kylin.stream.assigner", "DefaultAssigner");
+    }
+
+    public int getCoordinatorHttpClientTimeout() {
+        return Integer.parseInt(getOptional("kylin.stream.coordinator.client.timeout.millsecond", "5000"));
+    }
+
+    public int getReceiverHttpClientTimeout() {
+        return Integer.parseInt(getOptional("kylin.stream.receiver.client.timeout.millsecond", "5000"));
+    }
+
+    public int getStreamingReceiverHttpMaxThreads() {
+        return Integer.parseInt(getOptional("kylin.stream.receiver.http.max.threads", "200"));
+    }
+
+    public int getStreamingReceiverHttpMinThreads() {
+        return Integer.parseInt(getOptional("kylin.stream.receiver.http.min.threads", "10"));
+    }
+
+    public int getStreamingReceiverQueryCoreThreads() {
+        return Integer.parseInt(getOptional("kylin.stream.receiver.query-core-threads", "50"));
+    }
+
+    public int getStreamingReceiverQueryMaxThreads() {
+        return Integer.parseInt(getOptional("kylin.stream.receiver.query-max-threads", "200"));
+    }
+
+    public int getStreamingReceiverUseThreadsPerQuery() {
+        return Integer.parseInt(getOptional("kylin.stream.receiver.use-threads-per-query", "8"));
+    }
+
+    public int getStreamingRPCHttpConnTimeout() {
+        return Integer.parseInt(getOptional("kylin.stream.rpc.http.connect.timeout", "10000"));
+    }
+
+    public int getStreamingRPCHttpReadTimeout() {
+        return Integer.parseInt(getOptional("kylin.stream.rpc.http.read.timeout", "60000"));
+    }
+
+    public boolean isStreamingBuildAdditionalCuboids() {
+        return Boolean.parseBoolean(getOptional("kylin.stream.build.additional.cuboids", "false"));
+    }
+
+    public Map<String, String> getStreamingSegmentRetentionPolicyProperties(String policyName) {
+        return getPropertiesByPrefix("kylin.stream.segment.retention.policy." + policyName + ".");
+    }
+
+    public int getStreamingMaxFragmentsInSegment() {
+        return Integer.parseInt(getOptional("kylin.stream.segment-max-fragments", "50"));
+    }
+
+    public int getStreamingMinFragmentsInSegment() {
+        return Integer.parseInt(getOptional("kylin.stream.segment-min-fragments", "15"));
+    }
+
+    public int getStreamingMaxFragmentSizeInMb() {
+        return Integer.parseInt(getOptional("kylin.stream.max-fragment-size-mb", "300"));
+    }
+
+    public boolean isStreamingFragmentsAutoMergeEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.stream.fragments-auto-merge-enable", "true"));
+    }
+
+    public boolean isStreamingConcurrentScanEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.stream.segment.concurrent.scan", "false"));
+    }
+
+    public boolean isStreamingStandAloneMode() {
+        return Boolean.parseBoolean(getOptional("kylin.stream.stand-alone.mode", "false"));
+    }
+
+    public boolean isNewCoordinatorEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.stream.new.coordinator-enabled", "true"));
+    }
+
+    public String getLocalStorageImpl() {
+        return getOptional("kylin.stream.settled.storage", null);
+    }
+
+    public String getStreamMetrics() {
+        return getOptional("kylin.stream.metrics.option", "");
+    }
+
+    /**
+     * whether to print encode integer value for count distinct string value, only for debug/test purpose
+     */
+    public boolean isPrintRealtimeDictEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.stream.print-realtime-dict-enabled", "false"));
+    }
+
+    public long getStreamMetricsInterval() {
+        return Long.parseLong(getOptional("kylin.stream.metrics.interval", "5"));
+    }
+
+    /**
+     * whether realtime query should add timezone offset by kylin's web-timezone, please refer to KYLIN-4010 for detail
+     */
+    public String getStreamingDerivedTimeTimezone() {
+        return (getOptional("kylin.stream.event.timezone", ""));
+    }
+
+    public boolean isAutoResubmitDiscardJob() {
+        return Boolean.parseBoolean(getOptional("kylin.stream.auto-resubmit-after-discard-enabled", "true"));
+    }
+
+    public String getHiveDatabaseLambdaCube() {
+        return this.getOptional("kylin.stream.hive.database-for-lambda-cube", DEFAULT);
+    }
+
+    // ============================================================================
+    // Health Check CLI
     // ============================================================================
-    // Health Check CLI
-    // ============================================================================
 
     public int getWarningSegmentNum() {
         return Integer.parseInt(getOptional("kylin.tool.health-check.warning-segment-num", "-1"));
@@ -2417,7 +2677,7 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
     // ============================================================================
-    // ENGINE.SPARK (Kylin 4)
+    // Kylin 4.x related
     // ============================================================================
 
     public String getKylinParquetJobJarPath() {
@@ -2516,106 +2776,22 @@ public abstract class KylinConfigBase implements Serializable {
         return new Path(path);
     }
 
-    public Map<String, String> getSparkConfigOverride() {
-        return getPropertiesByPrefix("kylin.engine.spark-conf.");
-    }
-
-    public boolean isAutoSetSparkConf() {
-        return Boolean.parseBoolean(getOptional("kylin.spark-conf.auto.prior", "true"));
-    }
-
-    public String getBuildConf() {
-        return getOptional("kylin.engine.submit-hadoop-conf-dir", "");
-    }
-
-    public boolean isJobLogPrintEnabled() {
-        return Boolean.parseBoolean(getOptional("kylin.job.log-print-enabled", "true"));
-    }
-
-    @ConfigTag(ConfigTag.Tag.DEBUG_HACK)
-    public String getClusterInfoFetcherClassName() {
-        return getOptional("kylin.engine.spark.cluster-info-fetcher-class-name",
-                "org.apache.kylin.cluster.YarnInfoFetcher");
-    }
-
-    @ConfigTag(ConfigTag.Tag.DEBUG_HACK)
-    public String getSparkMergeClassName() {
-        return getOptional("kylin.engine.spark.merge-class-name", "org.apache.kylin.engine.spark.job.CubeMergeJob");
-    }
-
-    public String getParentDatasetStorageLevel() {
-        return getOptional("kylin.engine.spark.parent-dataset.storage.level", "NONE");
-    }
-
-    public int getMaxParentDatasetPersistCount() {
-        return Integer.parseInt(getOptional("kylin.engine.spark.parent-dataset.max.persist.count", "1"));
-    }
-
-    public int getRepartitionNumAfterEncode() {
-        return Integer.valueOf(getOptional("kylin.engine.spark.dataset.repartition.num.after.encoding", "0"));
-    }
-
-
-    public int getSparkEngineMaxRetryTime() {
-        return Integer.parseInt(getOptional("kylin.engine.max-retry-time", "3"));
-    }
-
-    public double getSparkEngineRetryMemoryGradient() {
-        return Double.parseDouble(getOptional("kylin.engine.retry-memory-gradient", "1.5"));
-    }
-
-    public double getSparkEngineRetryOverheadMemoryGradient() {
-        return Double.parseDouble(getOptional("kylin.engine.retry-overheadMemory-gradient", "0.2"));
-    }
-
-    public Double getMaxAllocationResourceProportion() {
-        return Double.parseDouble(getOptional("kylin.engine.max-allocation-proportion", "0.9"));
-    }
-
-    public int getSparkEngineBaseExecutorInstances() {
-        return Integer.parseInt(getOptional("kylin.engine.base-executor-instance", "5"));
-    }
-
-    public String getSparkEngineRequiredTotalCores() {
-        return getOptional("kylin.engine.spark.required-cores", "1");
-    }
-
-    public String getSparkEngineExecutorInstanceStrategy() {
-        return getOptional("kylin.engine.executor-instance-strategy", "100,2,500,3,1000,4");
+    public boolean isSnapshotParallelBuildEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.snapshot.parallel-build-enabled", "true"));
     }
 
-    public int getSnapshotShardSizeMB() {
-        return Integer.parseInt(getOptional("kylin.snapshot.shard-size-mb", "128"));
+    public boolean isUTEnv() {
+        return "UT".equals(getDeployEnv());
     }
 
-    /***
-     * Global dictionary will be split into several buckets. To encode a column to int value more
-     * efficiently, source dataset will be repartitioned by the to-be encoded column to the same
-     * amount of partitions as the dictionary's bucket size.
-     *
-     * It sometimes bring side effect, because repartitioning by a single column is more likely to cause
-     * serious data skew, causing one task takes the majority of time in first layer's cuboid building.
-     *
-     * When faced with this case, you can try repartitioning encoded dataset by all
-     * RowKey columns to avoid data skew. The repartition size is default to max bucket
-     * size of all dictionaries, but you can also set to other flexible value by this option:
-     * 'kylin.engine.spark.dataset.repartition.num.after.encoding'
-     ***/
-    public boolean rePartitionEncodedDatasetWithRowKey() {
-        return Boolean.valueOf(getOptional("kylin.engine.spark.repartition.encoded.dataset", "false"));
+    public boolean isLocalEnv() {
+        return "LOCAL".equals(getDeployEnv());
     }
 
-    /**
-     * If we should calculate cuboid statistics for each segment, which is needed for cube planner phase two
-     */
-    public boolean isSegmentStatisticsEnabled() {
-        return Boolean.parseBoolean(this.getOptional("kylin.engine.segment-statistics-enabled", "false"));
+    public int snapshotParallelBuildTimeoutSeconds() {
+        return Integer.parseInt(getOptional("kylin.snapshot.parallel-build-timeout-seconds", "3600"));
     }
 
-    // ============================================================================
-    // STORAGE.PARQUET
-    // ============================================================================
-
     public String getStorageProvider() {
         return getOptional("kylin.storage.provider", "org.apache.kylin.common.storage.DefaultStorageProvider");
     }
@@ -2666,21 +2842,61 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.valueOf(getOptional("kylin.storage.columnar.dfs-replication", "3"));
     }
 
-    // ============================================================================
-    // Query Engine (Sparder)
-    // ============================================================================
+    public boolean isAutoSetSparkConf() {
+        return Boolean.parseBoolean(getOptional("kylin.spark-conf.auto.prior", "true"));
+    }
 
-    /**
-     * Get the sparder app name, default value is: 'sparder_on_localhost-7070'
-     */
-    public String getSparderAppName() {
-        String customSparderAppName = getOptional("kylin.query.sparder-context.app-name", "");
-        if (StringUtils.isEmpty(customSparderAppName)) {
-            customSparderAppName =
-                    "sparder_on_" + getServerRestAddress().replaceAll(":", "-");
-        }
-        return customSparderAppName;
+    public String getBuildConf() {
+        return getOptional("kylin.engine.submit-hadoop-conf-dir", "");
+    }
+
+    public boolean isJobLogPrintEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.job.log-print-enabled", "true"));
+    }
+
+    @ConfigTag(ConfigTag.Tag.DEBUG_HACK)
+    public String getClusterInfoFetcherClassName() {
+        return getOptional("kylin.engine.spark.cluster-info-fetcher-class-name",
+                "org.apache.kylin.cluster.YarnInfoFetcher");
+    }
+
+    @ConfigTag(ConfigTag.Tag.DEBUG_HACK)
+    public String getSparkMergeClassName() {
+        return getOptional("kylin.engine.spark.merge-class-name", "org.apache.kylin.engine.spark.job.CubeMergeJob");
+    }
+
+    public int getSparkEngineMaxRetryTime() {
+        return Integer.parseInt(getOptional("kylin.engine.max-retry-time", "3"));
     }
+
+    public double getSparkEngineRetryMemoryGradient() {
+        return Double.parseDouble(getOptional("kylin.engine.retry-memory-gradient", "1.5"));
+    }
+
+    public double getSparkEngineRetryOverheadMemoryGradient() {
+        return Double.parseDouble(getOptional("kylin.engine.retry-overheadMemory-gradient", "0.2"));
+    }
+
+    public Double getMaxAllocationResourceProportion() {
+        return Double.parseDouble(getOptional("kylin.engine.max-allocation-proportion", "0.9"));
+    }
+
+    public int getSparkEngineBaseExecutorInstances() {
+        return Integer.parseInt(getOptional("kylin.engine.base-executor-instance", "5"));
+    }
+
+    public String getSparkEngineRequiredTotalCores() {
+        return getOptional("kylin.engine.spark.required-cores", "1");
+    }
+
+    public String getSparkEngineExecutorInstanceStrategy() {
+        return getOptional("kylin.engine.executor-instance-strategy", "100,2,500,3,1000,4");
+    }
+
+    public int getSnapshotShardSizeMB() {
+        return Integer.parseInt(getOptional("kylin.snapshot.shard-size-mb", "128"));
+    }
+
     /**
      * driver memory that can be used by join(mostly BHJ)
      */
@@ -2852,22 +3068,25 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.parseInt(this.getOptional("kylin.canary.sparder-context-period-min", "3"));
     }
 
+    /**
+     * If we should calculate cuboid statistics for each segment, which is needed for cube planner phase two
+     */
+    public boolean isSegmentStatisticsEnabled() {
+        return Boolean.parseBoolean(this.getOptional("kylin.engine.segment-statistics-enabled", "false"));
+    }
 
     // ============================================================================
     // Spark with Kerberos
     // ============================================================================
 
-    @ConfigTag(ConfigTag.Tag.NOT_CLEAR)
     public Boolean isKerberosEnabled() {
         return Boolean.valueOf(getOptional("kylin.kerberos.enabled", FALSE));
     }
 
-    @ConfigTag(ConfigTag.Tag.NOT_CLEAR)
     public String getKerberosKeytab() {
         return getOptional("kylin.kerberos.keytab", "");
     }
 
-    @ConfigTag(ConfigTag.Tag.NOT_CLEAR)
     public String getKerberosKeytabPath() {
         return KylinConfig.getKylinConfDir() + File.separator + getKerberosKeytab();
     }
@@ -2917,8 +3136,64 @@ public abstract class KylinConfigBase implements Serializable {
         return KylinConfig.getKylinConfDir() + File.separator + getKerberosJaasConf();
     }
 
-    @ConfigTag(ConfigTag.Tag.NOT_CLEAR)
     public String getKerberosPrincipal() {
         return getOptional("kylin.kerberos.principal");
     }
+
+    public String getParentDatasetStorageLevel() {
+        return getOptional("kylin.engine.spark.parent-dataset.storage.level", "NONE");
+    }
+
+    public int getMaxParentDatasetPersistCount() {
+        return Integer.parseInt(getOptional("kylin.engine.spark.parent-dataset.max.persist.count", "1"));
+    }
+
+    public int getRepartitionNumAfterEncode() {
+        return Integer.valueOf(getOptional("kylin.engine.spark.dataset.repartition.num.after.encoding", "0"));
+    }
+
+    /***
+     * Global dictionary will be split into several buckets. To encode a column to int value more
+     * efficiently, source dataset will be repartitioned by the to-be encoded column to the same
+     * amount of partitions as the dictionary's bucket size.
+     *
+     * It sometimes bring side effect, because repartitioning by a single column is more likely to cause
+     * serious data skew, causing one task takes the majority of time in first layer's cuboid building.
+     *
+     * When faced with this case, you can try repartitioning encoded dataset by all
+     * RowKey columns to avoid data skew. The repartition size is default to max bucket
+     * size of all dictionaries, but you can also set to other flexible value by this option:
+     * 'kylin.engine.spark.dataset.repartition.num.after.encoding'
+     ***/
+    public boolean rePartitionEncodedDatasetWithRowKey() {
+        return Boolean.valueOf(getOptional("kylin.engine.spark.repartition.encoded.dataset", "false"));
+    }
+
+    /*
+     * Detect dataset skew in dictionary encode step.
+     * */
+    public boolean detectDataSkewInDictEncodingEnabled() {
+        return Boolean.valueOf(getOptional("kylin.dictionary.detect.data.skew.in.encoding", "false"));
+    }
+
+    /*
+    * In some data skew cases, the repartition step during dictionary encoding will be slow.
+    * We can choose to sample from the dataset to detect skewed. This configuration is used to set the sample rate.
+    * */
+    public double sampleRateInEncodingSkewDetection() {
+        return Double.valueOf(getOptional("kylin.dictionary.detect.data.skew.sample.rate", "0.1"));
+    }
+
+    /*
+    * In KYLIN4, dictionaries are hashed into several buckets, column data are repartitioned by the same hash algorithm
+    * during encoding step too. In data skew cases, the repartition step will be very slow. Kylin will automatically
+    * sample from the source to detect skewed data and repartition these skewed data to random partitions.
+    * This configuration is used to set the skew data threshhold, valued from 0 to 1.
+    * e.g.
+    *   if you set this value to 0.05, for each value that takes up more than 5% percent of the total will be regarded
+    *   as skew data, as a result the skewed data will be no more than 20 records
+    * */
+    public double skewPercentageThreshHold() {
+        return Double.valueOf(getOptional("kylin.dictionary.data.skew.percentage.threshhold", "0.05"));
+    }
 }
diff --git a/core-common/src/main/java/org/apache/kylin/common/annotation/ConfigTag.java b/core-common/src/main/java/org/apache/kylin/common/annotation/ConfigTag.java
index 965d252..53a6967 100644
--- a/core-common/src/main/java/org/apache/kylin/common/annotation/ConfigTag.java
+++ b/core-common/src/main/java/org/apache/kylin/common/annotation/ConfigTag.java
@@ -39,9 +39,6 @@ public @interface ConfigTag {
          */
         DEPRECATED,
 
-        /**
-         * Not tested/verified
-         */
         NOT_CLEAR,
 
         /**
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index 9b782de..e5facf4 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -101,6 +101,10 @@ public class HadoopUtil {
         return getFileSystem(workingPath, conf);
     }
 
+    public static FileSystem getReadFileSystem() throws IOException {
+        return getFileSystem(KylinConfig.getInstanceFromEnv().getReadHdfsWorkingDirectory());
+    }
+
     public static FileSystem getFileSystem(String path) {
         return getFileSystem(new Path(makeURI(path)));
     }
diff --git a/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java b/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java
index 29f92a6..b4ac16b 100644
--- a/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java
@@ -76,16 +76,16 @@ public class KylinConfigTest extends HotLoadKylinPropertiesTestCase {
         assertEquals("1234", configExt.getOptional("1234"));
     }
 
-//    @Test
-//    public void testPropertiesHotLoad() {
-//        KylinConfig config = KylinConfig.getInstanceFromEnv();
-//        assertEquals("whoami@kylin.apache.org", config.getKylinOwner());
-//
-//        updateProperty("kylin.storage.hbase.owner-tag", "kylin@kylin.apache.org");
-//        KylinConfig.getInstanceFromEnv().reloadFromSiteProperties();
-//
-//        assertEquals("kylin@kylin.apache.org", config.getKylinOwner());
-//    }
+    @Test
+    public void testPropertiesHotLoad() {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        assertEquals("whoami@kylin.apache.org", config.getKylinOwner());
+
+        updateProperty("kylin.storage.hbase.owner-tag", "kylin@kylin.apache.org");
+        KylinConfig.getInstanceFromEnv().reloadFromSiteProperties();
+
+        assertEquals("kylin@kylin.apache.org", config.getKylinOwner());
+    }
 
     @Test
     public void testGetMetadataUrlPrefix() {
diff --git a/core-cube/pom.xml b/core-cube/pom.xml
index 91be01b..c024c06 100644
--- a/core-cube/pom.xml
+++ b/core-cube/pom.xml
@@ -38,10 +38,10 @@
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-core-metadata</artifactId>
         </dependency>
-<!--        <dependency>-->
-<!--            <groupId>org.apache.kylin</groupId>-->
-<!--            <artifactId>kylin-core-dictionary</artifactId>-->
-<!--        </dependency>-->
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-dictionary</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-shaded-guava</artifactId>
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 3e99268..585a37a 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -27,6 +27,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -40,23 +41,34 @@ import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.persistence.WriteConflictException;
 import org.apache.kylin.common.util.AutoReadWriteLock;
 import org.apache.kylin.common.util.AutoReadWriteLock.AutoLock;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeDescTiretreeGlobalDomainDictUtil;
 import org.apache.kylin.cube.model.SnapshotTableDesc;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.LookupProviderFactory;
+import org.apache.kylin.dict.lookup.SnapshotManager;
+import org.apache.kylin.dict.lookup.SnapshotTable;
 import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
 import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
 import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
 import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.IEngineAware;
 import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.PartitionDesc;
 import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.SegmentRange.TSRange;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.Segments;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.project.RealizationEntry;
@@ -65,6 +77,8 @@ import org.apache.kylin.metadata.realization.IRealizationProvider;
 import org.apache.kylin.metadata.realization.RealizationRegistry;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.source.IReadableTable;
+import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.source.SourcePartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -115,6 +129,7 @@ public class CubeManager implements IRealizationProvider {
 
     // a few inner classes to group related methods
     private SegmentAssist segAssist = new SegmentAssist();
+    private DictionaryAssist dictAssist = new DictionaryAssist();
 
     private Random ran = new Random();
 
@@ -541,24 +556,27 @@ public class CubeManager implements IRealizationProvider {
     }
 
     public ILookupTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) {
-        return null;
+        String tableName = join.getPKSide().getTableIdentity();
+        CubeDesc cubeDesc = cubeSegment.getCubeDesc();
+        SnapshotTableDesc snapshotTableDesc = cubeDesc.getSnapshotTableDesc(tableName);
+        return getInMemLookupTable(cubeSegment, join, snapshotTableDesc);
     }
 
-//    private ILookupTable getInMemLookupTable(CubeSegment cubeSegment, JoinDesc join,
-//            SnapshotTableDesc snapshotTableDesc) {
-//        String tableName = join.getPKSide().getTableIdentity();
-//        String snapshotResPath = getSnapshotResPath(cubeSegment, tableName, snapshotTableDesc);
-//        String[] pkCols = join.getPrimaryKey();
-//
-//        try {
-//            SnapshotTable snapshot = getSnapshotManager().getSnapshotTable(snapshotResPath);
-//            TableDesc tableDesc = getMetadataManager().getTableDesc(tableName, cubeSegment.getProject());
-//            return LookupProviderFactory.getInMemLookupTable(tableDesc, pkCols, snapshot);
-//        } catch (IOException e) {
-//            throw new IllegalStateException(
-//                    "Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e);
-//        }
-//    }
+    private ILookupTable getInMemLookupTable(CubeSegment cubeSegment, JoinDesc join,
+            SnapshotTableDesc snapshotTableDesc) {
+        String tableName = join.getPKSide().getTableIdentity();
+        String snapshotResPath = getSnapshotResPath(cubeSegment, tableName, snapshotTableDesc);
+        String[] pkCols = join.getPrimaryKey();
+
+        try {
+            SnapshotTable snapshot = getSnapshotManager().getSnapshotTable(snapshotResPath);
+            TableDesc tableDesc = getMetadataManager().getTableDesc(tableName, cubeSegment.getProject());
+            return LookupProviderFactory.getInMemLookupTable(tableDesc, pkCols, snapshot);
+        } catch (IOException e) {
+            throw new IllegalStateException(
+                    "Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e);
+        }
+    }
 
     private String getSnapshotResPath(CubeSegment cubeSegment, String tableName, SnapshotTableDesc snapshotTableDesc) {
         String snapshotResPath;
@@ -576,11 +594,20 @@ public class CubeManager implements IRealizationProvider {
 
     @VisibleForTesting
     /*private*/ String generateStorageLocation(int engineType) {
+        String namePrefix = config.getHBaseTableNamePrefix();
+        String namespace = config.getHBaseStorageNameSpace();
         String tableName = "";
         do {
             StringBuffer sb = new StringBuffer();
             int identifierLength = HBASE_TABLE_LENGTH;
+            if (engineType != IEngineAware.ID_SPARK_II) {
+                if ((namespace.equals("default") || namespace.equals("")) == false) {
+                    sb.append(namespace).append(":");
+                }
+                sb.append(namePrefix);
+            } else {
                 identifierLength = PARQUET_IDENTIFIER_LENGTH;
+            }
             for (int i = 0; i < identifierLength; i++) {
                 sb.append(ALPHA_NUM.charAt(ran.nextInt(ALPHA_NUM.length())));
             }
@@ -601,6 +628,14 @@ public class CubeManager implements IRealizationProvider {
         return TableMetadataManager.getInstance(config);
     }
 
+    private DictionaryManager getDictionaryManager() {
+        return DictionaryManager.getInstance(config);
+    }
+
+    private SnapshotManager getSnapshotManager() {
+        return SnapshotManager.getInstance(config);
+    }
+
     private ResourceStore getStore() {
         return ResourceStore.getStore(this.config);
     }
@@ -1090,143 +1125,143 @@ public class CubeManager implements IRealizationProvider {
     // Dictionary/Snapshot related methods
     // ============================================================================
 
-//    public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable)
-//            throws IOException {
-//        return dictAssist.buildDictionary(cubeSeg, col, inpTable);
-//    }
-//
-//    public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable,
-//            Dictionary<String> dict) throws IOException {
-//        return dictAssist.saveDictionary(cubeSeg, col, inpTable, dict);
-//    }
-//
-//    /**
-//     * return null if no dictionary for given column
-//     */
-//    public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) {
-//        return dictAssist.getDictionary(cubeSeg, col);
-//    }
-//
-//    public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable, String uuid) throws IOException {
-//        return dictAssist.buildSnapshotTable(cubeSeg, lookupTable, uuid);
-//    }
-//
-//    private TableMetadataManager getMetadataManager() {
-//        return TableMetadataManager.getInstance(config);
-//    }
-//
-//    private class DictionaryAssist {
-//        public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable)
-//                throws IOException {
-//            CubeDesc cubeDesc = cubeSeg.getCubeDesc();
-//            if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
-//                return null;
-//
-//            String builderClass = cubeDesc.getDictionaryBuilderClass(col);
-//            DictionaryInfo dictInfo = getDictionaryManager().buildDictionary(col, inpTable, builderClass);
-//
-//            saveDictionaryInfo(cubeSeg, col, dictInfo);
-//            return dictInfo;
-//        }
-//
-//        public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable,
-//                Dictionary<String> dict) throws IOException {
-//            CubeDesc cubeDesc = cubeSeg.getCubeDesc();
-//            if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
-//                return null;
-//
-//            DictionaryInfo dictInfo = getDictionaryManager().saveDictionary(col, inpTable, dict);
-//
-//            saveDictionaryInfo(cubeSeg, col, dictInfo);
-//            return dictInfo;
-//        }
-//
-//        private void saveDictionaryInfo(CubeSegment cubeSeg, TblColRef col, DictionaryInfo dictInfo)
-//                throws IOException {
-//            if (dictInfo == null)
-//                return;
-//
-//            // work on copy instead of cached objects
-//            CubeInstance cubeCopy = cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy
-//            CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid());
-//
-//            Dictionary<?> dict = dictInfo.getDictionaryObject();
-//            segCopy.putDictResPath(col, dictInfo.getResourcePath());
-//            segCopy.getRowkeyStats().add(new Object[] { col.getIdentity(), dict.getSize(), dict.getSizeOfId() });
-//
-//            CubeUpdate update = new CubeUpdate(cubeCopy);
-//            update.setToUpdateSegs(segCopy);
-//            updateCube(update);
-//        }
-//
-//        /**
-//         * return null if no dictionary for given column
-//         */
-//        @SuppressWarnings("unchecked")
-//        public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) {
-//            DictionaryInfo info = null;
-//            String dictResPath = null;
-//            try {
-//                DictionaryManager dictMgr = getDictionaryManager();
-//
-//                //tiretree global domain dic
-//                List<CubeDescTiretreeGlobalDomainDictUtil.GlobalDict> globalDicts = cubeSeg.getCubeDesc().listDomainDict();
-//                if (!globalDicts.isEmpty()) {
-//                    dictResPath = CubeDescTiretreeGlobalDomainDictUtil.globalReuseDictPath(cubeSeg.getConfig(), col, cubeSeg.getCubeDesc());
-//                }
-//
-//                if (Objects.isNull(dictResPath)){
-//                    dictResPath = cubeSeg.getDictResPath(col);
-//                }
-//
-//                if (dictResPath == null)
-//                    return null;
-//
-//                info = dictMgr.getDictionaryInfo(dictResPath);
-//                if (info == null)
-//                    throw new IllegalStateException("No dictionary found by " + dictResPath
-//                            + ", invalid cube state; cube segment" + cubeSeg + ", col " + col);
-//            } catch (IOException e) {
-//                throw new IllegalStateException("Failed to get dictionary for cube segment" + cubeSeg + ", col" + col,
-//                        e);
-//            }
-//            return (Dictionary<String>) info.getDictionaryObject();
-//        }
-//
-//        public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable, String uuid)
-//                throws IOException {
-//            // work on copy instead of cached objects
-//            CubeInstance cubeCopy = cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy
-//            CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid());
-//
-//            TableMetadataManager metaMgr = getTableManager();
-//            SnapshotManager snapshotMgr = getSnapshotManager();
-//
-//            TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable, segCopy.getProject()));
-//            IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc, uuid);
-//            SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc, cubeSeg.getConfig());
-//
-//            CubeDesc cubeDesc = cubeSeg.getCubeDesc();
-//            if (!cubeDesc.isGlobalSnapshotTable(lookupTable)) {
-//                segCopy.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
-//                CubeUpdate update = new CubeUpdate(cubeCopy);
-//                update.setToUpdateSegs(segCopy);
-//                updateCube(update);
-//
-//                // Update the input cubeSeg after the resource store updated
-//                cubeSeg.putSnapshotResPath(lookupTable, segCopy.getSnapshotResPath(lookupTable));
-//            } else {
-//                CubeUpdate cubeUpdate = new CubeUpdate(cubeCopy);
-//                Map<String, String> map = Maps.newHashMap();
-//                map.put(lookupTable, snapshot.getResourcePath());
-//                cubeUpdate.setUpdateTableSnapshotPath(map);
-//                updateCube(cubeUpdate);
-//
-//                cubeSeg.getCubeInstance().putSnapshotResPath(lookupTable, snapshot.getResourcePath());
-//            }
-//            return snapshot;
-//        }
-//    }
+    public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable)
+            throws IOException {
+        return dictAssist.buildDictionary(cubeSeg, col, inpTable);
+    }
+
+    public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable,
+            Dictionary<String> dict) throws IOException {
+        return dictAssist.saveDictionary(cubeSeg, col, inpTable, dict);
+    }
+
+    /**
+     * return null if no dictionary for given column
+     */
+    public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) {
+        return dictAssist.getDictionary(cubeSeg, col);
+    }
+
+    public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable, String uuid) throws IOException {
+        return dictAssist.buildSnapshotTable(cubeSeg, lookupTable, uuid);
+    }
+
+    private TableMetadataManager getMetadataManager() {
+        return TableMetadataManager.getInstance(config);
+    }
+
+    private class DictionaryAssist {
+        public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable)
+                throws IOException {
+            CubeDesc cubeDesc = cubeSeg.getCubeDesc();
+            if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
+                return null;
+
+            String builderClass = cubeDesc.getDictionaryBuilderClass(col);
+            DictionaryInfo dictInfo = getDictionaryManager().buildDictionary(col, inpTable, builderClass);
+
+            saveDictionaryInfo(cubeSeg, col, dictInfo);
+            return dictInfo;
+        }
+
+        public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable,
+                Dictionary<String> dict) throws IOException {
+            CubeDesc cubeDesc = cubeSeg.getCubeDesc();
+            if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
+                return null;
+
+            DictionaryInfo dictInfo = getDictionaryManager().saveDictionary(col, inpTable, dict);
+
+            saveDictionaryInfo(cubeSeg, col, dictInfo);
+            return dictInfo;
+        }
+
+        private void saveDictionaryInfo(CubeSegment cubeSeg, TblColRef col, DictionaryInfo dictInfo)
+                throws IOException {
+            if (dictInfo == null)
+                return;
+
+            // work on copy instead of cached objects
+            CubeInstance cubeCopy = cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy
+            CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid());
+
+            Dictionary<?> dict = dictInfo.getDictionaryObject();
+            segCopy.putDictResPath(col, dictInfo.getResourcePath());
+            segCopy.getRowkeyStats().add(new Object[] { col.getIdentity(), dict.getSize(), dict.getSizeOfId() });
+
+            CubeUpdate update = new CubeUpdate(cubeCopy);
+            update.setToUpdateSegs(segCopy);
+            updateCube(update);
+        }
+
+        /**
+         * return null if no dictionary for given column
+         */
+        @SuppressWarnings("unchecked")
+        public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) {
+            DictionaryInfo info = null;
+            String dictResPath = null;
+            try {
+                DictionaryManager dictMgr = getDictionaryManager();
+
+                //tiretree global domain dic
+                List<CubeDescTiretreeGlobalDomainDictUtil.GlobalDict> globalDicts = cubeSeg.getCubeDesc().listDomainDict();
+                if (!globalDicts.isEmpty()) {
+                    dictResPath = CubeDescTiretreeGlobalDomainDictUtil.globalReuseDictPath(cubeSeg.getConfig(), col, cubeSeg.getCubeDesc());
+                }
+
+                if (Objects.isNull(dictResPath)){
+                    dictResPath = cubeSeg.getDictResPath(col);
+                }
+
+                if (dictResPath == null)
+                    return null;
+
+                info = dictMgr.getDictionaryInfo(dictResPath);
+                if (info == null)
+                    throw new IllegalStateException("No dictionary found by " + dictResPath
+                            + ", invalid cube state; cube segment" + cubeSeg + ", col " + col);
+            } catch (IOException e) {
+                throw new IllegalStateException("Failed to get dictionary for cube segment" + cubeSeg + ", col" + col,
+                        e);
+            }
+            return (Dictionary<String>) info.getDictionaryObject();
+        }
+
+        public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable, String uuid)
+                throws IOException {
+            // work on copy instead of cached objects
+            CubeInstance cubeCopy = cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy
+            CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid());
+
+            TableMetadataManager metaMgr = getTableManager();
+            SnapshotManager snapshotMgr = getSnapshotManager();
+
+            TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable, segCopy.getProject()));
+            IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc, uuid);
+            SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc, cubeSeg.getConfig());
+
+            CubeDesc cubeDesc = cubeSeg.getCubeDesc();
+            if (!cubeDesc.isGlobalSnapshotTable(lookupTable)) {
+                segCopy.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
+                CubeUpdate update = new CubeUpdate(cubeCopy);
+                update.setToUpdateSegs(segCopy);
+                updateCube(update);
+
+                // Update the input cubeSeg after the resource store updated
+                cubeSeg.putSnapshotResPath(lookupTable, segCopy.getSnapshotResPath(lookupTable));
+            } else {
+                CubeUpdate cubeUpdate = new CubeUpdate(cubeCopy);
+                Map<String, String> map = Maps.newHashMap();
+                map.put(lookupTable, snapshot.getResourcePath());
+                cubeUpdate.setUpdateTableSnapshotPath(map);
+                updateCube(cubeUpdate);
+
+                cubeSeg.getCubeInstance().putSnapshotResPath(lookupTable, snapshot.getResourcePath());
+            }
+            return snapshot;
+        }
+    }
 
     /**
      * To keep "select * from LOOKUP_TABLE" has consistent and latest result, we manually choose
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 5205cc5..c32da70 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -371,24 +371,26 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
         this.storageLocationIdentifier = storageLocationIdentifier;
     }
 
-//    public Map<TblColRef, Dictionary<String>> buildDictionaryMap() {
-//        Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
-//        for (TblColRef col : getCubeDesc().getAllColumnsHaveDictionary()) {
-//            result.put(col, (Dictionary<String>) getDictionary(col));
-//        }
-//        return result;
-//    }
-//
-//    public Map<TblColRef, Dictionary<String>> buildGlobalDictionaryMap(int globalColumnsSize) {
-//        Map<TblColRef, Dictionary<String>> result = Maps.newHashMapWithExpectedSize(globalColumnsSize);
-//        for (TblColRef col : getCubeDesc().getAllGlobalDictColumns()) {
-//            result.put(col, getDictionary(col));
-//        }
-//        return result;
-//    }
+    public Map<TblColRef, Dictionary<String>> buildDictionaryMap() {
+        Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
+        for (TblColRef col : getCubeDesc().getAllColumnsHaveDictionary()) {
+            result.put(col, (Dictionary<String>) getDictionary(col));
+        }
+        return result;
+    }
+
+    public Map<TblColRef, Dictionary<String>> buildGlobalDictionaryMap(int globalColumnsSize) {
+        Map<TblColRef, Dictionary<String>> result = Maps.newHashMapWithExpectedSize(globalColumnsSize);
+        for (TblColRef col : getCubeDesc().getAllGlobalDictColumns()) {
+            result.put(col, getDictionary(col));
+        }
+        return result;
+    }
 
     public Dictionary<String> getDictionary(TblColRef col) {
-        return null;
+        TblColRef reuseCol = getCubeDesc().getDictionaryReuseColumn(col);
+        CubeManager cubeMgr = CubeManager.getInstance(this.getCubeInstance().getConfig());
+        return cubeMgr.getDictionary(this, reuseCol);
     }
 
     public CubeDimEncMap getDimensionEncodingMap() {
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
new file mode 100644
index 0000000..0815942
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.cli;
+
+import java.io.IOException;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.io.IOUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryInfoSerializer;
+import org.apache.kylin.dict.DictionaryProvider;
+import org.apache.kylin.dict.DistinctColumnValuesProvider;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.dict.lookup.SnapshotTableSerializer;
+import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.IReadableTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kylin.shaded.com.google.common.collect.Sets;
+
+public class DictionaryGeneratorCLI {
+
+    private DictionaryGeneratorCLI() {
+    }
+
+    private static final Logger logger = LoggerFactory.getLogger(DictionaryGeneratorCLI.class);
+
+    public static void processSegment(KylinConfig config, String cubeName, String segmentID, String uuid,
+            DistinctColumnValuesProvider factTableValueProvider, DictionaryProvider dictProvider) throws IOException {
+        CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+        CubeSegment segment = cube.getSegmentById(segmentID);
+
+        int retryTime = 0;
+        while (retryTime < 3) {
+            if (retryTime > 0) {
+                logger.info("Rebuild dictionary and snapshot for Cube: {}, Segment: {}, {} times.", cubeName, segmentID,
+                        retryTime);
+            }
+
+            processSegment(config, segment, uuid, factTableValueProvider, dictProvider);
+
+            if (isAllDictsAndSnapshotsReady(config, cubeName, segmentID)) {
+                break;
+            }
+            retryTime++;
+        }
+
+        if (retryTime >= 3) {
+            logger.error("Not all dictionaries and snapshots ready for cube segment: {}", segmentID);
+        } else {
+            logger.info("Succeed to build all dictionaries and snapshots for cube segment: {}", segmentID);
+        }
+    }
+
+    private static void processSegment(KylinConfig config, CubeSegment cubeSeg, String uuid,
+            DistinctColumnValuesProvider factTableValueProvider, DictionaryProvider dictProvider) throws IOException {
+        CubeManager cubeMgr = CubeManager.getInstance(config);
+
+        // dictionary
+        for (TblColRef col : cubeSeg.getCubeDesc().getAllColumnsNeedDictionaryBuilt()) {
+            logger.info("Building dictionary for {}", col);
+            IReadableTable inpTable = factTableValueProvider.getDistinctValuesFor(col);
+
+            Dictionary<String> preBuiltDict = null;
+            if (dictProvider != null) {
+                preBuiltDict = dictProvider.getDictionary(col);
+            }
+
+            if (preBuiltDict != null) {
+                logger.debug("Dict for '{}' has already been built, save it", col.getName());
+                cubeMgr.saveDictionary(cubeSeg, col, inpTable, preBuiltDict);
+            } else {
+                logger.debug("Dict for '{}' not pre-built, build it from {}", col.getName(), inpTable);
+                cubeMgr.buildDictionary(cubeSeg, col, inpTable);
+            }
+        }
+
+        // snapshot
+        Set<String> toSnapshot = Sets.newHashSet();
+        Set<TableRef> toCheckLookup = Sets.newHashSet();
+        for (DimensionDesc dim : cubeSeg.getCubeDesc().getDimensions()) {
+            TableRef table = dim.getTableRef();
+            if (cubeSeg.getModel().isLookupTable(table)) {
+                // only the snapshot desc is not ext type, need to take snapshot
+                toSnapshot.add(table.getTableIdentity());
+                toCheckLookup.add(table);
+            }
+        }
+
+        for (String tableIdentity : toSnapshot) {
+            logger.info("Building snapshot of {}", tableIdentity);
+            cubeMgr.buildSnapshotTable(cubeSeg, tableIdentity, uuid);
+        }
+
+        CubeInstance updatedCube = cubeMgr.getCube(cubeSeg.getCubeInstance().getName());
+        cubeSeg = updatedCube.getSegmentById(cubeSeg.getUuid());
+        for (TableRef lookup : toCheckLookup) {
+            logger.info("Checking snapshot of {}", lookup);
+            try {
+                JoinDesc join = cubeSeg.getModel().getJoinsTree().getJoinByPKSide(lookup);
+                ILookupTable table = cubeMgr.getLookupTable(cubeSeg, join);
+                if (table != null) {
+                    IOUtils.closeStream(table);
+                }
+            } catch (Throwable th) {
+                throw new RuntimeException(String.format(Locale.ROOT, "Checking snapshot of %s failed.", lookup), th);
+            }
+        }
+    }
+
+    private static boolean isAllDictsAndSnapshotsReady(KylinConfig config, String cubeName, String segmentID) {
+        CubeInstance cube = CubeManager.getInstance(config).reloadCube(cubeName);
+        CubeSegment segment = cube.getSegmentById(segmentID);
+        ResourceStore store = ResourceStore.getStore(config);
+
+        // check dicts
+        logger.info("Begin to check if all dictionaries exist of Segment: {}", segmentID);
+        Map<String, String> dictionaries = segment.getDictionaries();
+        for (Map.Entry<String, String> entry : dictionaries.entrySet()) {
+            String dictResPath = entry.getValue();
+            String dictKey = entry.getKey();
+            try {
+                DictionaryInfo dictInfo = store.getResource(dictResPath, DictionaryInfoSerializer.INFO_SERIALIZER);
+                if (dictInfo == null) {
+                    logger.warn("Dictionary=[key: {}, resource path: {}] doesn't exist in resource store", dictKey,
+                            dictResPath);
+                    return false;
+                }
+            } catch (IOException e) {
+                logger.warn("Dictionary=[key: {}, path: {}] failed to check, details: {}", dictKey, dictResPath, e);
+                return false;
+            }
+        }
+
+        // check snapshots
+        logger.info("Begin to check if all snapshots exist of Segment: {}", segmentID);
+        Map<String, String> snapshots = segment.getSnapshots();
+        for (Map.Entry<String, String> entry : snapshots.entrySet()) {
+            String snapshotKey = entry.getKey();
+            String snapshotResPath = entry.getValue();
+            try {
+                SnapshotTable snapshot = store.getResource(snapshotResPath, SnapshotTableSerializer.INFO_SERIALIZER);
+                if (snapshot == null) {
+                    logger.info("SnapshotTable=[key: {}, resource path: {}] doesn't exist in resource store",
+                            snapshotKey, snapshotResPath);
+                    return false;
+                }
+            } catch (IOException e) {
+                logger.warn("SnapshotTable=[key: {}, resource path: {}]  failed to check, details: {}", snapshotKey,
+                        snapshotResPath, e);
+                return false;
+            }
+        }
+
+        logger.info("All dictionaries and snapshots exist checking succeed for Cube Segment: {}", segmentID);
+        return true;
+    }
+}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DumpDictionaryCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DumpDictionaryCLI.java
new file mode 100644
index 0000000..729a6da
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DumpDictionaryCLI.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.cli;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Date;
+
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryInfoSerializer;
+
+public class DumpDictionaryCLI {
+
+    public static void main(String[] args) throws IOException {
+        for (String path : args) {
+            dump(new File(path));
+        }
+    }
+
+    public static void dump(File f) throws IOException {
+        if (f.isDirectory()) {
+            File[] files = f.listFiles();
+            if (files == null) {
+                return;
+            }
+            for (File c : files)
+                dump(c);
+            return;
+        }
+
+        if (f.getName().endsWith(".dict")) {
+            DictionaryInfoSerializer ser = new DictionaryInfoSerializer();
+            DictionaryInfo dictInfo = ser.deserialize(new DataInputStream(new FileInputStream(f)));
+
+            System.out.println("============================================================================");
+            System.out.println("File: " + f.getAbsolutePath());
+            System.out.println(new Date(dictInfo.getLastModified()));
+            System.out.println(JsonUtil.writeValueAsIndentString(dictInfo));
+            dictInfo.getDictionaryObject().dump(System.out);
+            System.out.println();
+        }
+    }
+}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 6bf67db..c0c6882 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -60,6 +60,8 @@ import org.apache.kylin.common.util.Array;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.dict.GlobalDictionaryBuilder;
+import org.apache.kylin.dict.global.SegmentAppendTrieDictBuilder;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.measure.extendedcolumn.ExtendedColumnMeasureType;
 import org.apache.kylin.metadata.MetadataConstants;
@@ -1557,30 +1559,30 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware {
         return null;
     }
 
-//    public List<TblColRef> getAllGlobalDictColumns() {
-//        List<TblColRef> globalDictCols = new ArrayList<TblColRef>();
-//        List<DictionaryDesc> dictionaryDescList = getDictionaries();
-//
-//        if (dictionaryDescList == null) {
-//            return globalDictCols;
-//        }
-//
-//        for (DictionaryDesc dictionaryDesc : dictionaryDescList) {
-//            String cls = dictionaryDesc.getBuilderClass();
-//            if (GlobalDictionaryBuilder.class.getName().equals(cls)
-//                    || SegmentAppendTrieDictBuilder.class.getName().equals(cls))
-//                globalDictCols.add(dictionaryDesc.getColumnRef());
-//        }
-//        return globalDictCols;
-//    }
+    public List<TblColRef> getAllGlobalDictColumns() {
+        List<TblColRef> globalDictCols = new ArrayList<TblColRef>();
+        List<DictionaryDesc> dictionaryDescList = getDictionaries();
+
+        if (dictionaryDescList == null) {
+            return globalDictCols;
+        }
+
+        for (DictionaryDesc dictionaryDesc : dictionaryDescList) {
+            String cls = dictionaryDesc.getBuilderClass();
+            if (GlobalDictionaryBuilder.class.getName().equals(cls)
+                    || SegmentAppendTrieDictBuilder.class.getName().equals(cls))
+                globalDictCols.add(dictionaryDesc.getColumnRef());
+        }
+        return globalDictCols;
+    }
 
     // UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns
-//    public List<TblColRef> getAllUHCColumns() {
-//        List<TblColRef> uhcColumns = new ArrayList<>();
-//        uhcColumns.addAll(getAllGlobalDictColumns());
-//        uhcColumns.addAll(getShardByColumns());
-//        return uhcColumns;
-//    }
+    public List<TblColRef> getAllUHCColumns() {
+        List<TblColRef> uhcColumns = new ArrayList<>();
+        uhcColumns.addAll(getAllGlobalDictColumns());
+        uhcColumns.addAll(getShardByColumns());
+        return uhcColumns;
+    }
 
     public String getProject() {
         DataModelDesc modelDesc = getModel();
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/SnapshotTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/SnapshotTableDesc.java
index 28861a1..30f533b 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/SnapshotTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/SnapshotTableDesc.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.cube.model;
 
+import org.apache.kylin.dict.lookup.SnapshotTable;
+
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -31,8 +33,8 @@ SnapshotTableDesc implements java.io.Serializable{
     private String tableName;
 
     @JsonProperty("storage_type")
-    private String storageType = "Parquet";
-    
+    private String storageType = SnapshotTable.STORAGE_TYPE_METASTORE;
+
     @JsonProperty("local_cache_enable")
     private boolean enableLocalCache = true;
 
@@ -63,6 +65,10 @@ SnapshotTableDesc implements java.io.Serializable{
         this.global = global;
     }
 
+    public boolean isExtSnapshotTable() {
+        return !SnapshotTable.STORAGE_TYPE_METASTORE.equals(storageType);
+    }
+
     public boolean isEnableLocalCache() {
         return enableLocalCache;
     }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
index 04535fd..9023f28 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
@@ -30,6 +30,7 @@ import org.apache.kylin.cube.model.RowKeyDesc;
 import org.apache.kylin.cube.model.validation.IValidatorRule;
 import org.apache.kylin.cube.model.validation.ResultLevel;
 import org.apache.kylin.cube.model.validation.ValidateContext;
+import org.apache.kylin.dict.GlobalDictionaryBuilder;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -91,6 +92,11 @@ public class DictionaryRule implements IValidatorRule<CubeDesc> {
                 }
             }
 
+            if (StringUtils.isNotEmpty(builderClass) && builderClass.equalsIgnoreCase(GlobalDictionaryBuilder.class.getName()) && dimensionColumns.contains(dictCol) && rowKeyDesc.isUseDictionary(dictCol)) {
+                context.addResult(ResultLevel.ERROR, ERROR_GLOBAL_DICTIONNARY_ONLY_MEASURE + dictCol);
+                return;
+            }
+
             if (reuseCol != null) {
                 reuseDictionaries.add(dictDesc);
             } else {
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
index 7c3958c..c6d0c00 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
@@ -18,150 +18,150 @@
 
 package org.apache.kylin.cube.util;
 
-//import java.io.IOException;
-//import java.util.HashMap;
-//import java.util.List;
-//import java.util.Locale;
-//import java.util.Map;
-//import java.util.Set;
-//
-//import org.apache.kylin.common.util.Dictionary;
-//import org.apache.kylin.cube.CubeInstance;
-//import org.apache.kylin.cube.CubeSegment;
-//import org.apache.kylin.cube.cuboid.Cuboid;
-//import org.apache.kylin.cube.model.CubeDesc;
-//import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
-//import org.apache.kylin.dict.DictionaryGenerator;
-//import org.apache.kylin.dict.DictionaryInfo;
-//import org.apache.kylin.dict.DictionaryManager;
-//import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
-//import org.apache.kylin.measure.hllc.HLLCounter;
-//import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-//import org.apache.kylin.metadata.model.TblColRef;
-//import org.apache.kylin.source.IReadableTable;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//
-//import org.apache.kylin.shaded.com.google.common.collect.HashMultimap;
-//import org.apache.kylin.shaded.com.google.common.collect.Maps;
-//import org.apache.kylin.shaded.com.google.common.hash.HashFunction;
-//import org.apache.kylin.shaded.com.google.common.hash.Hasher;
-//import org.apache.kylin.shaded.com.google.common.hash.Hashing;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.IReadableTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kylin.shaded.com.google.common.collect.HashMultimap;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
+import org.apache.kylin.shaded.com.google.common.hash.HashFunction;
+import org.apache.kylin.shaded.com.google.common.hash.Hasher;
+import org.apache.kylin.shaded.com.google.common.hash.Hashing;
 
 /**
  */
 public class CubingUtils {
-//
-//    private static Logger logger = LoggerFactory.getLogger(CubingUtils.class);
-//
-//    public static Map<Long, HLLCounter> sampling(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDescIn,
-//            Iterable<List<String>> streams) {
-//        final CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(flatDescIn, cubeDesc);
-//        final int rowkeyLength = cubeDesc.getRowkey().getRowKeyColumns().length;
-//        final Set<Long> allCuboidIds = cubeDesc.getInitialCuboidScheduler().getAllCuboidIds();
-//        final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-//        final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMap();
-//
-//        final Map<Long, HLLCounter> result = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
-//        for (Long cuboidId : allCuboidIds) {
-//            result.put(cuboidId, new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision()));
-//            Integer[] cuboidBitSet = new Integer[Long.bitCount(cuboidId)];
-//
-//            long mask = Long.highestOneBit(baseCuboidId);
-//            int position = 0;
-//            for (int i = 0; i < rowkeyLength; i++) {
-//                if ((mask & cuboidId) > 0) {
-//                    cuboidBitSet[position] = i;
-//                    position++;
-//                }
-//                mask = mask >> 1;
-//            }
-//            allCuboidsBitSet.put(cuboidId, cuboidBitSet);
-//        }
-//
-//        HashFunction hf = Hashing.murmur3_32();
-//        byte[][] row_hashcodes = new byte[rowkeyLength][];
-//        for (List<String> row : streams) {
-//            //generate hash for each row key column
-//            for (int i = 0; i < rowkeyLength; i++) {
-//                Hasher hc = hf.newHasher();
-//                final String cell = row.get(flatDesc.getRowKeyColumnIndexes()[i]);
-//                if (cell != null) {
-//                    row_hashcodes[i] = hc.putUnencodedChars(cell).hash().asBytes();
-//                } else {
-//                    row_hashcodes[i] = hc.putInt(0).hash().asBytes();
-//                }
-//            }
-//
-//            for (Map.Entry<Long, HLLCounter> longHyperLogLogPlusCounterNewEntry : result.entrySet()) {
-//                Long cuboidId = longHyperLogLogPlusCounterNewEntry.getKey();
-//                HLLCounter counter = longHyperLogLogPlusCounterNewEntry.getValue();
-//                Hasher hc = hf.newHasher();
-//                final Integer[] cuboidBitSet = allCuboidsBitSet.get(cuboidId);
-//                for (int position = 0; position < cuboidBitSet.length; position++) {
-//                    hc.putBytes(row_hashcodes[cuboidBitSet[position]]);
-//                }
-//                counter.add(hc.hash().asBytes());
-//            }
-//        }
-//        return result;
-//    }
-//
-//    public static Map<TblColRef, Dictionary<String>> buildDictionary(final CubeInstance cubeInstance,
-//            Iterable<List<String>> recordList) throws IOException {
-//        final List<TblColRef> columnsNeedToBuildDictionary = cubeInstance.getDescriptor()
-//                .listDimensionColumnsExcludingDerived(true);
-//        final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap();
-//        int index = 0;
-//        for (TblColRef column : columnsNeedToBuildDictionary) {
-//            tblColRefMap.put(index++, column);
-//        }
-//
-//        HashMap<TblColRef, Dictionary<String>> result = Maps.newHashMap();
-//
-//        HashMultimap<TblColRef, String> valueMap = HashMultimap.create();
-//        for (List<String> row : recordList) {
-//            for (int i = 0; i < row.size(); i++) {
-//                String cell = row.get(i);
-//                if (tblColRefMap.containsKey(i)) {
-//                    valueMap.put(tblColRefMap.get(i), cell);
-//                }
-//            }
-//        }
-//        for (TblColRef tblColRef : valueMap.keySet()) {
-//            Set<String> values = valueMap.get(tblColRef);
-//            Dictionary<String> dict = DictionaryGenerator.buildDictionary(tblColRef.getType(),
-//                    new IterableDictionaryValueEnumerator(values));
-//            result.put(tblColRef, dict);
-//        }
-//        return result;
-//    }
-//
-//    @SuppressWarnings("unchecked")
-//    public static Map<TblColRef, Dictionary<String>> writeDictionary(CubeSegment cubeSegment,
-//            Map<TblColRef, Dictionary<String>> dictionaryMap, long startOffset, long endOffset) {
-//        Map<TblColRef, Dictionary<String>> realDictMap = Maps.newHashMap();
-//
-//        for (Map.Entry<TblColRef, Dictionary<String>> entry : dictionaryMap.entrySet()) {
-//            final TblColRef tblColRef = entry.getKey();
-//            final Dictionary<String> dictionary = entry.getValue();
-//            IReadableTable.TableSignature signature = new IReadableTable.TableSignature();
-//            signature.setLastModifiedTime(System.currentTimeMillis());
-//            signature.setPath(String.format(Locale.ROOT, "streaming_%s_%s", startOffset, endOffset));
-//            signature.setSize(endOffset - startOffset);
-//            DictionaryInfo dictInfo = new DictionaryInfo(tblColRef.getColumnDesc(), tblColRef.getDatatype(), signature);
-//            logger.info("writing dictionary for TblColRef:" + tblColRef.toString());
-//            DictionaryManager dictionaryManager = DictionaryManager.getInstance(cubeSegment.getCubeDesc().getConfig());
-//            try {
-//                DictionaryInfo realDict = dictionaryManager.trySaveNewDict(dictionary, dictInfo);
-//                cubeSegment.putDictResPath(tblColRef, realDict.getResourcePath());
-//                realDictMap.put(tblColRef, (Dictionary<String>) realDict.getDictionaryObject());
-//            } catch (IOException e) {
-//                throw new RuntimeException("error save dictionary for column:" + tblColRef, e);
-//            }
-//        }
-//
-//        return realDictMap;
-//    }
-//
+
+    private static Logger logger = LoggerFactory.getLogger(CubingUtils.class);
+
+    public static Map<Long, HLLCounter> sampling(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDescIn,
+            Iterable<List<String>> streams) {
+        final CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(flatDescIn, cubeDesc);
+        final int rowkeyLength = cubeDesc.getRowkey().getRowKeyColumns().length;
+        final Set<Long> allCuboidIds = cubeDesc.getInitialCuboidScheduler().getAllCuboidIds();
+        final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+        final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMap();
+
+        final Map<Long, HLLCounter> result = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
+        for (Long cuboidId : allCuboidIds) {
+            result.put(cuboidId, new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision()));
+            Integer[] cuboidBitSet = new Integer[Long.bitCount(cuboidId)];
+
+            long mask = Long.highestOneBit(baseCuboidId);
+            int position = 0;
+            for (int i = 0; i < rowkeyLength; i++) {
+                if ((mask & cuboidId) > 0) {
+                    cuboidBitSet[position] = i;
+                    position++;
+                }
+                mask = mask >> 1;
+            }
+            allCuboidsBitSet.put(cuboidId, cuboidBitSet);
+        }
+
+        HashFunction hf = Hashing.murmur3_32();
+        byte[][] row_hashcodes = new byte[rowkeyLength][];
+        for (List<String> row : streams) {
+            //generate hash for each row key column
+            for (int i = 0; i < rowkeyLength; i++) {
+                Hasher hc = hf.newHasher();
+                final String cell = row.get(flatDesc.getRowKeyColumnIndexes()[i]);
+                if (cell != null) {
+                    row_hashcodes[i] = hc.putUnencodedChars(cell).hash().asBytes();
+                } else {
+                    row_hashcodes[i] = hc.putInt(0).hash().asBytes();
+                }
+            }
+
+            for (Map.Entry<Long, HLLCounter> longHyperLogLogPlusCounterNewEntry : result.entrySet()) {
+                Long cuboidId = longHyperLogLogPlusCounterNewEntry.getKey();
+                HLLCounter counter = longHyperLogLogPlusCounterNewEntry.getValue();
+                Hasher hc = hf.newHasher();
+                final Integer[] cuboidBitSet = allCuboidsBitSet.get(cuboidId);
+                for (int position = 0; position < cuboidBitSet.length; position++) {
+                    hc.putBytes(row_hashcodes[cuboidBitSet[position]]);
+                }
+                counter.add(hc.hash().asBytes());
+            }
+        }
+        return result;
+    }
+
+    public static Map<TblColRef, Dictionary<String>> buildDictionary(final CubeInstance cubeInstance,
+            Iterable<List<String>> recordList) throws IOException {
+        final List<TblColRef> columnsNeedToBuildDictionary = cubeInstance.getDescriptor()
+                .listDimensionColumnsExcludingDerived(true);
+        final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap();
+        int index = 0;
+        for (TblColRef column : columnsNeedToBuildDictionary) {
+            tblColRefMap.put(index++, column);
+        }
+
+        HashMap<TblColRef, Dictionary<String>> result = Maps.newHashMap();
+
+        HashMultimap<TblColRef, String> valueMap = HashMultimap.create();
+        for (List<String> row : recordList) {
+            for (int i = 0; i < row.size(); i++) {
+                String cell = row.get(i);
+                if (tblColRefMap.containsKey(i)) {
+                    valueMap.put(tblColRefMap.get(i), cell);
+                }
+            }
+        }
+        for (TblColRef tblColRef : valueMap.keySet()) {
+            Set<String> values = valueMap.get(tblColRef);
+            Dictionary<String> dict = DictionaryGenerator.buildDictionary(tblColRef.getType(),
+                    new IterableDictionaryValueEnumerator(values));
+            result.put(tblColRef, dict);
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static Map<TblColRef, Dictionary<String>> writeDictionary(CubeSegment cubeSegment,
+            Map<TblColRef, Dictionary<String>> dictionaryMap, long startOffset, long endOffset) {
+        Map<TblColRef, Dictionary<String>> realDictMap = Maps.newHashMap();
+
+        for (Map.Entry<TblColRef, Dictionary<String>> entry : dictionaryMap.entrySet()) {
+            final TblColRef tblColRef = entry.getKey();
+            final Dictionary<String> dictionary = entry.getValue();
+            IReadableTable.TableSignature signature = new IReadableTable.TableSignature();
+            signature.setLastModifiedTime(System.currentTimeMillis());
+            signature.setPath(String.format(Locale.ROOT, "streaming_%s_%s", startOffset, endOffset));
+            signature.setSize(endOffset - startOffset);
+            DictionaryInfo dictInfo = new DictionaryInfo(tblColRef.getColumnDesc(), tblColRef.getDatatype(), signature);
+            logger.info("writing dictionary for TblColRef:" + tblColRef.toString());
+            DictionaryManager dictionaryManager = DictionaryManager.getInstance(cubeSegment.getCubeDesc().getConfig());
+            try {
+                DictionaryInfo realDict = dictionaryManager.trySaveNewDict(dictionary, dictInfo);
+                cubeSegment.putDictResPath(tblColRef, realDict.getResourcePath());
+                realDictMap.put(tblColRef, (Dictionary<String>) realDict.getDictionaryObject());
+            } catch (IOException e) {
+                throw new RuntimeException("error save dictionary for column:" + tblColRef, e);
+            }
+        }
+
+        return realDictMap;
+    }
+
 }
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java b/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
index 90cabfa..de436d0 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.cube.model.validation.rule;
 
 import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_DUPLICATE_DICTIONARY_COLUMN;
+import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_GLOBAL_DICTIONNARY_ONLY_MEASURE;
 import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_REUSE_BUILDER_BOTH_EMPTY;
 import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_REUSE_BUILDER_BOTH_SET;
 import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_TRANSITIVE_REUSE;
@@ -36,6 +37,7 @@ import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.DictionaryDesc;
 import org.apache.kylin.cube.model.validation.ValidateContext;
+import org.apache.kylin.dict.GlobalDictionaryBuilder;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -73,6 +75,7 @@ public class DictionaryRuleTest extends LocalFileMetadataTestCase {
     @Test
     public void testBadDesc() throws IOException {
         testDictionaryDesc(ERROR_DUPLICATE_DICTIONARY_COLUMN, DictionaryDesc.create("ORDER_ID", null, "FakeBuilderClass"));
+        testDictionaryDesc(ERROR_DUPLICATE_DICTIONARY_COLUMN, DictionaryDesc.create("ORDER_ID", null, GlobalDictionaryBuilder.class.getName()));
     }
 
     @Test
@@ -92,6 +95,17 @@ public class DictionaryRuleTest extends LocalFileMetadataTestCase {
                 DictionaryDesc.create("price", "lstg_site_id", null));
     }
 
+    @Test
+    public void testBadDesc5() throws IOException {
+        testDictionaryDesc(ERROR_GLOBAL_DICTIONNARY_ONLY_MEASURE,
+                DictionaryDesc.create("CATEG_LVL2_NAME", null, GlobalDictionaryBuilder.class.getName()));
+    }
+
+    @Test
+    public void testGoodDesc2() throws IOException {
+        testDictionaryDesc(null, DictionaryDesc.create("SELLER_ID", null, GlobalDictionaryBuilder.class.getName()));
+    }
+
     private void testDictionaryDesc(String expectMessage, DictionaryDesc... descs) throws IOException {
         DictionaryRule rule = new DictionaryRule();
         File f = new File(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/cube_desc/test_kylin_cube_without_slr_left_join_desc.json");
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
index f132569..a0730ff 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
@@ -40,196 +40,196 @@ import com.google.common.collect.Lists;
  */
 @SuppressWarnings({ "rawtypes", "unchecked" })
 public class DictionaryGenerator {
-//
-//    private static final Logger logger = LoggerFactory.getLogger(DictionaryGenerator.class);
-//
-//    public static IDictionaryBuilder newDictionaryBuilder(DataType dataType) {
-//        Preconditions.checkNotNull(dataType, "dataType cannot be null");
-//
-//        // build dict, case by data type
-//        IDictionaryBuilder builder;
-//        boolean useForest = KylinConfig.getInstanceFromEnv().isUseForestTrieDictionary();
-//        if (dataType.isNumberFamily())
-//            builder = useForest ? new NumberTrieDictForestBuilder() : new NumberTrieDictBuilder();
-//        else
-//            builder = useForest ? new StringTrieDictForestBuilder() : new StringTrieDictBuilder();
-//        return builder;
-//    }
-//
-//    public static Dictionary<String> buildDictionary(DataType dataType, IDictionaryValueEnumerator valueEnumerator)
-//            throws IOException {
-//        return buildDictionary(newDictionaryBuilder(dataType), null, valueEnumerator);
-//    }
-//
-//    static Dictionary<String> buildDictionary(IDictionaryBuilder builder, DictionaryInfo dictInfo,
-//            IDictionaryValueEnumerator valueEnumerator) throws IOException {
-//        int baseId = 0; // always 0 for now
-//        int nSamples = 5;
-//        ArrayList<String> samples = new ArrayList<String>(nSamples);
-//
-//        // init the builder
-//        builder.init(dictInfo, baseId, null);
-//
-//        // add values
-//        try {
-//            while (valueEnumerator.moveNext()) {
-//                String value = valueEnumerator.current();
-//
-//                boolean accept = builder.addValue(value);
-//
-//                if (accept && samples.size() < nSamples && samples.contains(value) == false)
-//                    samples.add(value);
-//            }
-//        } catch (IOException e) {
-//            logger.error("Error during adding dict value.", e);
-//            builder.clear();
-//            throw e;
-//        }
-//
-//        // build
-//        Dictionary<String> dict = builder.build();
-//        logger.debug("Dictionary cardinality: " + dict.getSize());
-//        logger.debug("Dictionary builder class: " + builder.getClass().getName());
-//        logger.debug("Dictionary class: " + dict.getClass().getName());
-//        // log a few samples
-//        StringBuilder buf = new StringBuilder();
-//        for (String s : samples) {
-//            if (buf.length() > 0) {
-//                buf.append(", ");
-//            }
-//            buf.append(s.toString()).append("=>").append(dict.getIdFromValue(s));
-//        }
-//        logger.debug("Dictionary value samples: " + buf.toString());
-//
-//        return dict;
-//    }
-//
-//    public static Dictionary mergeDictionaries(DataType dataType, List<DictionaryInfo> sourceDicts) throws IOException {
-//        List<Dictionary<String>> dictList = Lists.transform(sourceDicts, new Function<DictionaryInfo, Dictionary<String>>() {
-//            @Nullable
-//            @Override
-//            public Dictionary<String> apply(@Nullable DictionaryInfo input) {
-//                return input.dictionaryObject;
-//            }
-//        });
-//        return buildDictionary(dataType, new MultipleDictionaryValueEnumerator(dataType, dictList));
-//    }
-//
-//    private static class StringTrieDictBuilder implements IDictionaryBuilder {
-//        int baseId;
-//        TrieDictionaryBuilder builder;
-//
-//        @Override
-//        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
-//            this.baseId = baseId;
-//            this.builder = new TrieDictionaryBuilder(new StringBytesConverter());
-//        }
-//
-//        @Override
-//        public boolean addValue(String value) {
-//            if (value == null)
-//                return false;
-//
-//            builder.addValue(value);
-//            return true;
-//        }
-//
-//        @Override
-//        public Dictionary<String> build() throws IOException {
-//            return builder.build(baseId);
-//        }
-//
-//        @Override
-//        public void clear() {
-//
-//        }
-//    }
-//
-//    private static class StringTrieDictForestBuilder implements IDictionaryBuilder {
-//        TrieDictionaryForestBuilder builder;
-//
-//        @Override
-//        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
-//            builder = new TrieDictionaryForestBuilder(new StringBytesConverter(), baseId);
-//        }
-//
-//        @Override
-//        public boolean addValue(String value) {
-//            if (value == null)
-//                return false;
-//
-//            builder.addValue(value);
-//            return true;
-//        }
-//
-//        @Override
-//        public Dictionary<String> build() throws IOException {
-//            return builder.build();
-//        }
-//
-//        @Override
-//        public void clear() {
-//
-//        }
-//    }
-//
-//    @SuppressWarnings("deprecation")
-//    private static class NumberTrieDictBuilder implements IDictionaryBuilder {
-//        int baseId;
-//        NumberDictionaryBuilder builder;
-//
-//        @Override
-//        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
-//            this.baseId = baseId;
-//            this.builder = new NumberDictionaryBuilder();
-//        }
-//
-//        @Override
-//        public boolean addValue(String value) {
-//            if (StringUtils.isBlank(value)) // empty string is treated as null
-//                return false;
-//
-//            builder.addValue(value);
-//            return true;
-//        }
-//
-//        @Override
-//        public Dictionary<String> build() throws IOException {
-//            return builder.build(baseId);
-//        }
-//
-//        @Override
-//        public void clear() {
-//
-//        }
-//    }
-//
-//    private static class NumberTrieDictForestBuilder implements IDictionaryBuilder {
-//        NumberDictionaryForestBuilder builder;
-//
-//        @Override
-//        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
-//            builder = new NumberDictionaryForestBuilder(baseId);
-//        }
-//
-//        @Override
-//        public boolean addValue(String value) {
-//            if (StringUtils.isBlank(value)) // empty string is treated as null
-//                return false;
-//
-//            builder.addValue(value);
-//            return true;
-//        }
-//
-//        @Override
-//        public Dictionary<String> build() throws IOException {
-//            return builder.build();
-//        }
-//
-//        @Override
-//        public void clear() {
-//
-//        }
-//    }
+
+    private static final Logger logger = LoggerFactory.getLogger(DictionaryGenerator.class);
+
+    public static IDictionaryBuilder newDictionaryBuilder(DataType dataType) {
+        Preconditions.checkNotNull(dataType, "dataType cannot be null");
+
+        // build dict, case by data type
+        IDictionaryBuilder builder;
+        boolean useForest = KylinConfig.getInstanceFromEnv().isUseForestTrieDictionary();
+        if (dataType.isNumberFamily())
+            builder = useForest ? new NumberTrieDictForestBuilder() : new NumberTrieDictBuilder();
+        else
+            builder = useForest ? new StringTrieDictForestBuilder() : new StringTrieDictBuilder();
+        return builder;
+    }
+
+    public static Dictionary<String> buildDictionary(DataType dataType, IDictionaryValueEnumerator valueEnumerator)
+            throws IOException {
+        return buildDictionary(newDictionaryBuilder(dataType), null, valueEnumerator);
+    }
+
+    static Dictionary<String> buildDictionary(IDictionaryBuilder builder, DictionaryInfo dictInfo,
+            IDictionaryValueEnumerator valueEnumerator) throws IOException {
+        int baseId = 0; // always 0 for now
+        int nSamples = 5;
+        ArrayList<String> samples = new ArrayList<String>(nSamples);
+
+        // init the builder
+        builder.init(dictInfo, baseId, null);
+
+        // add values
+        try {
+            while (valueEnumerator.moveNext()) {
+                String value = valueEnumerator.current();
+
+                boolean accept = builder.addValue(value);
+
+                if (accept && samples.size() < nSamples && samples.contains(value) == false)
+                    samples.add(value);
+            }
+        } catch (IOException e) {
+            logger.error("Error during adding dict value.", e);
+            builder.clear();
+            throw e;
+        }
+
+        // build
+        Dictionary<String> dict = builder.build();
+        logger.debug("Dictionary cardinality: " + dict.getSize());
+        logger.debug("Dictionary builder class: " + builder.getClass().getName());
+        logger.debug("Dictionary class: " + dict.getClass().getName());
+        // log a few samples
+        StringBuilder buf = new StringBuilder();
+        for (String s : samples) {
+            if (buf.length() > 0) {
+                buf.append(", ");
+            }
+            buf.append(s.toString()).append("=>").append(dict.getIdFromValue(s));
+        }
+        logger.debug("Dictionary value samples: " + buf.toString());
+
+        return dict;
+    }
+
+    public static Dictionary mergeDictionaries(DataType dataType, List<DictionaryInfo> sourceDicts) throws IOException {
+        List<Dictionary<String>> dictList = Lists.transform(sourceDicts, new Function<DictionaryInfo, Dictionary<String>>() {
+            @Nullable
+            @Override
+            public Dictionary<String> apply(@Nullable DictionaryInfo input) {
+                return input.dictionaryObject;
+            }
+        });
+        return buildDictionary(dataType, new MultipleDictionaryValueEnumerator(dataType, dictList));
+    }
+
+    private static class StringTrieDictBuilder implements IDictionaryBuilder {
+        int baseId;
+        TrieDictionaryBuilder builder;
+
+        @Override
+        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
+            this.baseId = baseId;
+            this.builder = new TrieDictionaryBuilder(new StringBytesConverter());
+        }
+
+        @Override
+        public boolean addValue(String value) {
+            if (value == null)
+                return false;
+
+            builder.addValue(value);
+            return true;
+        }
+
+        @Override
+        public Dictionary<String> build() throws IOException {
+            return builder.build(baseId);
+        }
+
+        @Override
+        public void clear() {
+
+        }
+    }
+
+    private static class StringTrieDictForestBuilder implements IDictionaryBuilder {
+        TrieDictionaryForestBuilder builder;
+
+        @Override
+        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
+            builder = new TrieDictionaryForestBuilder(new StringBytesConverter(), baseId);
+        }
+
+        @Override
+        public boolean addValue(String value) {
+            if (value == null)
+                return false;
+
+            builder.addValue(value);
+            return true;
+        }
+
+        @Override
+        public Dictionary<String> build() throws IOException {
+            return builder.build();
+        }
+
+        @Override
+        public void clear() {
+
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    private static class NumberTrieDictBuilder implements IDictionaryBuilder {
+        int baseId;
+        NumberDictionaryBuilder builder;
+
+        @Override
+        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
+            this.baseId = baseId;
+            this.builder = new NumberDictionaryBuilder();
+        }
+
+        @Override
+        public boolean addValue(String value) {
+            if (StringUtils.isBlank(value)) // empty string is treated as null
+                return false;
+
+            builder.addValue(value);
+            return true;
+        }
+
+        @Override
+        public Dictionary<String> build() throws IOException {
+            return builder.build(baseId);
+        }
+
+        @Override
+        public void clear() {
+
+        }
+    }
+
+    private static class NumberTrieDictForestBuilder implements IDictionaryBuilder {
+        NumberDictionaryForestBuilder builder;
+
+        @Override
+        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
+            builder = new NumberDictionaryForestBuilder(baseId);
+        }
+
+        @Override
+        public boolean addValue(String value) {
+            if (StringUtils.isBlank(value)) // empty string is treated as null
+                return false;
+
+            builder.addValue(value);
+            return true;
+        }
+
+        @Override
+        public Dictionary<String> build() throws IOException {
+            return builder.build();
+        }
+
+        @Override
+        public void clear() {
+
+        }
+    }
 
 }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/ILookupTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ILookupTable.java
similarity index 96%
rename from core-cube/src/main/java/org/apache/kylin/cube/ILookupTable.java
rename to core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ILookupTable.java
index 794b1e5..dccb7c4 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/ILookupTable.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ILookupTable.java
@@ -16,7 +16,7 @@
  * limitations under the License.
 */
 
-package org.apache.kylin.cube;
+package org.apache.kylin.dict.lookup;
 
 import org.apache.kylin.common.util.Array;
 
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
index c72681b..9d591b5 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -63,7 +63,7 @@ public class SnapshotManager {
                 SnapshotManager.logger.info("Snapshot with resource path {} is removed due to {}",
                         notification.getKey(), notification.getCause());
             }
-        }).maximumSize(100L)//
+        }).maximumSize(config.getCachedSnapshotMaxEntrySize())//
                 .expireAfterWrite(1, TimeUnit.DAYS).build(new CacheLoader<String, SnapshotTable>() {
                     @Override
                     public SnapshotTable load(String key) throws Exception {
@@ -173,11 +173,11 @@ public class SnapshotManager {
             KylinConfig cubeConfig) throws IOException {
         String dup = checkDupByInfo(snapshot);
 
-//        if ((float) snapshot.getSignature().getSize() / 1024 / 1024 > cubeConfig.getTableSnapshotMaxMB()) {
-//            throw new IllegalStateException(
-//                    "Table snapshot should be no greater than " + cubeConfig.getTableSnapshotMaxMB() //
-//                            + " MB, but " + tableDesc + " size is " + snapshot.getSignature().getSize());
-//        }
+        if ((float) snapshot.getSignature().getSize() / 1024 / 1024 > cubeConfig.getTableSnapshotMaxMB()) {
+            throw new IllegalStateException(
+                    "Table snapshot should be no greater than " + cubeConfig.getTableSnapshotMaxMB() //
+                            + " MB, but " + tableDesc + " size is " + snapshot.getSignature().getSize());
+        }
 
         if (dup != null) {
             logger.info("Identical input {}, reuse existing snapshot at {}", table.getSignature(), dup);
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 31cc081..2c4bc6a 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.job;
 
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -31,6 +32,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.RowKeyColDesc;
+import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.JoinDesc;
@@ -111,16 +113,16 @@ public class JoinedFlatTable {
             kylinConfig = (flatDesc.getSegment()).getConfig();
         }
 
-//        if (kylinConfig.isAdvancedFlatTableUsed()) {
-//            try {
-//                Class advancedFlatTable = Class.forName(kylinConfig.getAdvancedFlatTableClass());
-//                Method method = advancedFlatTable.getMethod("generateInsertDataStatement", IJoinedFlatTableDesc.class,
-//                        JobEngineConfig.class);
-//                return (String) method.invoke(null, flatDesc);
-//            } catch (Exception e) {
-//                throw new RuntimeException(e);
-//            }
-//        }
+        if (kylinConfig.isAdvancedFlatTableUsed()) {
+            try {
+                Class advancedFlatTable = Class.forName(kylinConfig.getAdvancedFlatTableClass());
+                Method method = advancedFlatTable.getMethod("generateInsertDataStatement", IJoinedFlatTableDesc.class,
+                        JobEngineConfig.class);
+                return (String) method.invoke(null, flatDesc);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
 
         return "INSERT OVERWRITE TABLE " + quoteIdentifier(flatDesc.getTableName(), null) + " " + generateSelectDataStatement(flatDesc)
                 + ";\n";
diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/DimensionEncodingFactory.java b/core-metadata/src/main/java/org/apache/kylin/dimension/DimensionEncodingFactory.java
index 937c1ad..422a802 100644
--- a/core-metadata/src/main/java/org/apache/kylin/dimension/DimensionEncodingFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/dimension/DimensionEncodingFactory.java
@@ -32,7 +32,6 @@ import org.apache.kylin.shaded.com.google.common.base.Predicate;
 import org.apache.kylin.shaded.com.google.common.collect.Iterables;
 import org.apache.kylin.shaded.com.google.common.collect.Maps;
 
-@Deprecated
 public abstract class DimensionEncodingFactory {
 
     private static final Logger logger = LoggerFactory.getLogger(DimensionEncodingFactory.class);
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
index 55bdb4b..acdc2e9 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
@@ -20,6 +20,7 @@ package org.apache.kylin.storage;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.threadlocal.InternalThreadLocal;
+import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.ImplementationSwitch;
 import org.apache.kylin.metadata.model.IStorageAware;
 import org.apache.kylin.metadata.realization.IRealization;
@@ -31,7 +32,19 @@ public class StorageFactory {
     // Use thread-local because KylinConfig can be thread-local and implementation might be different among multiple threads.
     private static InternalThreadLocal<ImplementationSwitch<IStorage>> storages = new InternalThreadLocal<>();
 
+    private static IStorage configuredUseLocalStorage;
+
+    static {
+        String localStorageImpl = KylinConfig.getInstanceFromEnv().getLocalStorageImpl();
+        if (localStorageImpl != null){
+            configuredUseLocalStorage = (IStorage) ClassUtil.newInstance(localStorageImpl);
+        }
+    }
+
     public static IStorage storage(IStorageAware aware) {
+        if (configuredUseLocalStorage != null) {
+            return configuredUseLocalStorage;
+        }
         ImplementationSwitch<IStorage> current = storages.get();
         if (storages.get() == null) {
             current = new ImplementationSwitch<>(KylinConfig.getInstanceFromEnv().getStorageEngines(), IStorage.class);
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
index 3fa104d..e6e0737 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
@@ -28,6 +28,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
@@ -79,9 +80,9 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
             TupleFilter havingFilter, StorageContext context) {
         this.context = context;
 
-//        this.maxScanRanges = cubeSegment.getConfig().getQueryStorageVisitScanRangeMax();
-//        this.maxFuzzyKeysPerSplit = cubeSegment.getConfig().getQueryScanFuzzyKeyMax();
-//        this.maxFuzzyKeys = maxFuzzyKeysPerSplit * cubeSegment.getConfig().getQueryScanFuzzyKeySplitMax();
+        this.maxScanRanges = cubeSegment.getConfig().getQueryStorageVisitScanRangeMax();
+        this.maxFuzzyKeysPerSplit = cubeSegment.getConfig().getQueryScanFuzzyKeyMax();
+        this.maxFuzzyKeys = maxFuzzyKeysPerSplit * cubeSegment.getConfig().getQueryScanFuzzyKeySplitMax();
 
         this.cubeSegment = cubeSegment;
         this.cubeDesc = cubeSegment.getCubeDesc();
@@ -147,9 +148,9 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
      */
     public CubeScanRangePlanner(GTInfo info, TblColRef gtPartitionCol, TupleFilter gtFilter) {
 
-//        this.maxScanRanges = KylinConfig.getInstanceFromEnv().getQueryStorageVisitScanRangeMax();
-//        this.maxFuzzyKeysPerSplit = KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeyMax();
-//        this.maxFuzzyKeys = maxFuzzyKeysPerSplit * KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeySplitMax();
+        this.maxScanRanges = KylinConfig.getInstanceFromEnv().getQueryStorageVisitScanRangeMax();
+        this.maxFuzzyKeysPerSplit = KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeyMax();
+        this.maxFuzzyKeys = maxFuzzyKeysPerSplit * KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeySplitMax();
 
         this.gtInfo = info;
 
@@ -174,7 +175,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
                     .setRtAggrMetrics(gtRtAggrMetrics).setDynamicColumns(gtDynColumns)
                     .setExprsPushDown(tupleExpressionMap)//
                     .setAllowStorageAggregation(context.isNeedStorageAggregation())
-                    .setAggCacheMemThreshold(0)//
+                    .setAggCacheMemThreshold(cubeSegment.getConfig().getQueryCoprocessorMemGB())//
                     .setStoragePushDownLimit(context.getFinalPushDownLimit())
                     .setStorageLimitLevel(context.getStorageLimitLevel()).setHavingFilterPushDown(havingFilter)
                     .createGTScanRequest();
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
index b76fe47..3adbb8e 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
@@ -26,10 +26,12 @@ import java.util.Set;
 
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.dict.BuiltInFunctionTransformer;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.metadata.expression.TupleExpression;
+import org.apache.kylin.metadata.filter.ITupleFilterTransformer;
 import org.apache.kylin.metadata.filter.StringCodeSystem;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilterSerializer;
@@ -66,6 +68,10 @@ public class CubeSegmentScanner implements Iterable<GTRecord> {
         byte[] serialize = TupleFilterSerializer.serialize(originalfilter, StringCodeSystem.INSTANCE);
         TupleFilter filter = TupleFilterSerializer.deserialize(serialize, StringCodeSystem.INSTANCE);
 
+        // translate FunctionTupleFilter to IN clause
+        ITupleFilterTransformer translator = new BuiltInFunctionTransformer(cubeSeg.getDimensionEncodingMap());
+        filter = translator.transform(filter);
+
         CubeScanRangePlanner scanRangePlanner;
         try {
             scanRangePlanner = new CubeScanRangePlanner(cubeSeg, cuboid, filter, dimensions, groups, dynGroups,
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
index db08e62..4fb71bb 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
@@ -36,6 +36,7 @@ import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.cube.model.RowKeyColDesc;
 import org.apache.kylin.cube.model.RowKeyDesc;
+import org.apache.kylin.dict.lookup.ILookupTable;
 import org.apache.kylin.dimension.TimeDerivedColumnType;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
@@ -44,7 +45,6 @@ import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.tuple.Tuple;
 import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.cube.ILookupTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -97,7 +97,7 @@ public class CubeTupleConverter implements ITupleConverter {
         advMeasureFillers = Lists.newArrayListWithCapacity(1);
         advMeasureIndexInGTValues = Lists.newArrayListWithCapacity(1);
         usedLookupTables = Lists.newArrayList();
-//        eventTimezone = cubeSeg.getConfig().getStreamingDerivedTimeTimezone();
+        eventTimezone = cubeSeg.getConfig().getStreamingDerivedTimeTimezone();
         autoJustByTimezone = eventTimezone.length() > 0
                 && cubeSeg.getCubeDesc().getModel().getRootFactTable().getTableDesc().isStreamingTable();
         if (autoJustByTimezone) {
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 63ef36d..804ce3f 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -33,7 +33,6 @@ import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.ILookupTable;
 import org.apache.kylin.cube.RawQueryLastHacker;
 import org.apache.kylin.cube.common.SegmentPruner;
 import org.apache.kylin.cube.cuboid.Cuboid;
@@ -42,6 +41,7 @@ import org.apache.kylin.cube.gridtable.CuboidToGridTableMappingExt;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.cube.model.RowKeyColDesc;
+import org.apache.kylin.dict.lookup.ILookupTable;
 import org.apache.kylin.gridtable.StorageLimitLevel;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.measure.bitmap.BitmapMeasureType;
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
index fea4550..9bfdd76 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
@@ -25,9 +25,9 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Array;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.ILookupTable;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.cube.model.CubeDesc.DeriveType;
+import org.apache.kylin.dict.lookup.ILookupTable;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.datatype.DataTypeOrder;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
index 47a3ba7..574bb9f 100644
--- a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
+++ b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
@@ -18,673 +18,721 @@
 
 package org.apache.kylin.storage.gtrecord;
 
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.BytesSerializer;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.gridtable.CubeCodeSystem;
+import org.apache.kylin.dict.NumberDictionaryForestBuilder;
+import org.apache.kylin.dict.StringBytesConverter;
+import org.apache.kylin.dict.TrieDictionaryBuilder;
+import org.apache.kylin.dimension.DictionaryDimEnc;
+import org.apache.kylin.dimension.DimensionEncoding;
+import org.apache.kylin.gridtable.GTBuilder;
+import org.apache.kylin.gridtable.GTFilterScanner.FilterResultCache;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTInfo.Builder;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRange;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
+import org.apache.kylin.gridtable.GTUtil;
+import org.apache.kylin.gridtable.GridTable;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.memstore.GTSimpleMemStore;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.ExtractTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.model.TblColRef.InnerDataTypeEnum;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.kylin.shaded.com.google.common.collect.Lists;
 
 public class DictGridTableTest extends LocalFileMetadataTestCase {
-//
-//    private GridTable table;
-//    private GTInfo info;
-//    private CompareTupleFilter timeComp0;
-//    private CompareTupleFilter timeComp1;
-//    private CompareTupleFilter timeComp2;
-//    private CompareTupleFilter timeComp3;
-//    private CompareTupleFilter timeComp4;
-//    private CompareTupleFilter timeComp5;
-//    private CompareTupleFilter timeComp6;
-//    private CompareTupleFilter timeComp7;
-//    private CompareTupleFilter ageComp1;
-//    private CompareTupleFilter ageComp2;
-//    private CompareTupleFilter ageComp3;
-//    private CompareTupleFilter ageComp4;
-//
-//    @After
-//    public void after() throws Exception {
-//
-//        this.cleanupTestMetadata();
-//    }
-//
-//    @Before
-//    public void setup() throws IOException {
-//
-//        this.createTestMetadata();
-//
-//        table = newTestTable();
-//        info = table.getInfo();
-//
-//        timeComp0 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-14"));
-//        timeComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
-//        timeComp2 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-13"));
-//        timeComp3 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-15"));
-//        timeComp4 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-15"));
-//        timeComp5 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-15"));
-//        timeComp6 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-14"));
-//        timeComp7 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "1970-01-01"));
-//        ageComp1 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "10"));
-//        ageComp2 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "20"));
-//        ageComp3 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "30"));
-//        ageComp4 = compare(info.colRef(1), FilterOperatorEnum.NEQ, enc(info, 1, "30"));
-//
-//    }
-//
-//    @Test
-//    public void verifySegmentSkipping() {
-//
-//        ByteArray segmentStart = enc(info, 0, "2015-01-14");
-//        ByteArray segmentStartX = enc(info, 0, "2015-01-14 00:00:00");//when partition col is dict encoded, time format will be free
-//        assertEquals(segmentStart, segmentStartX);
-//
-//        {
-//            LogicalTupleFilter filter = and(timeComp0, ageComp1);
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals(1, r.size());//scan range are [close,close]
-//            assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
-//            assertEquals(1, r.get(0).fuzzyKeys.size());
-//            assertEquals("[[null, 10, null, null, null]]", r.get(0).fuzzyKeys.toString());
-//        }
-//        {
-//            LogicalTupleFilter filter = and(timeComp2, ageComp1);
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals(1, r.size());
-//        }
-//        {
-//            LogicalTupleFilter filter = and(timeComp4, ageComp1);
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals(1, r.size());
-//        }
-//        {
-//            LogicalTupleFilter filter = and(timeComp5, ageComp1);
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals(1, r.size());
-//        }
-//        {
-//            LogicalTupleFilter filter = or(and(timeComp2, ageComp1), and(timeComp1, ageComp1),
-//                    and(timeComp6, ageComp1));
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals(2, r.size());
-//            assertEquals("[1421193600000, 10]-[null, 10]", r.get(1).toString());
-//            assertEquals("[[null, 10, null, null, null], [1421193600000, 10, null, null, null]]",
-//                    r.get(1).fuzzyKeys.toString());
-//        }
-//        {
-//            LogicalTupleFilter filter = or(and(timeComp3, ageComp3), and(timeComp7, ageComp1));
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals("[[0, 10]-[1421280000000, 30]]", r.toString());
-//        }
-//        {
-//            LogicalTupleFilter filter = or(timeComp2, timeComp1, timeComp6);
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals(2, r.size());
-//            assertEquals("[1421193600000, null]-[null, null]", r.get(1).toString());
-//            assertEquals(0, r.get(1).fuzzyKeys.size());
-//        }
-//        {
-//            //skip FALSE filter
-//            LogicalTupleFilter filter = and(ageComp1, ConstantTupleFilter.FALSE);
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals(0, r.size());
-//        }
-//        {
-//            //TRUE or FALSE filter
-//            LogicalTupleFilter filter = or(ConstantTupleFilter.TRUE, ConstantTupleFilter.FALSE);
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals(1, r.size());
-//            assertEquals("[null, null]-[null, null]", r.get(0).toString());
-//        }
-//        {
-//            //TRUE or other filter
-//            LogicalTupleFilter filter = or(ageComp1, ConstantTupleFilter.TRUE);
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals(1, r.size());
-//            assertEquals("[null, null]-[null, null]", r.get(0).toString());
-//        }
-//    }
-//
-//    @Test
-//    public void verifySegmentSkipping2() {
-//        {
-//            LogicalTupleFilter filter = and(timeComp0, ageComp1);
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals(1, r.size());//scan range are [close,close]
-//            assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
-//            assertEquals(1, r.get(0).fuzzyKeys.size());
-//            assertEquals("[[null, 10, null, null, null]]", r.get(0).fuzzyKeys.toString());
-//        }
-//
-//        {
-//            LogicalTupleFilter filter = and(timeComp5, ageComp1);
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals(1, r.size());//scan range are [close,close]
-//        }
-//    }
-//
-//    @Test
-//    public void verifyScanRangePlanner() {
-//
-//        // flatten or-and & hbase fuzzy value
-//        {
-//            LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2));
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals(1, r.size());
-//            assertEquals("[1421193600000, 10]-[null, 20]", r.get(0).toString());
-//            assertEquals("[[null, 10, null, null, null], [null, 20, null, null, null]]", r.get(0).fuzzyKeys.toString());
-//        }
-//
-//        // pre-evaluate ever false
-//        {
-//            LogicalTupleFilter filter = and(timeComp1, timeComp2);
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals(0, r.size());
-//        }
-//
-//        // pre-evaluate ever true
-//        {
-//            LogicalTupleFilter filter = or(timeComp1, ageComp4);
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals("[[null, null]-[null, null]]", r.toString());
-//        }
-//
-//        // merge overlap range
-//        {
-//            LogicalTupleFilter filter = or(timeComp1, timeComp3);
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals("[[null, null]-[null, null]]", r.toString());
-//        }
-//
-//        // merge too many ranges
-//        {
-//            LogicalTupleFilter filter = or(and(timeComp4, ageComp1), and(timeComp4, ageComp2),
-//                    and(timeComp4, ageComp3));
-//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
-//            List<GTScanRange> r = planner.planScanRanges();
-//            assertEquals(3, r.size());
-//            assertEquals("[1421280000000, 10]-[1421280000000, 10]", r.get(0).toString());
-//            assertEquals("[1421280000000, 20]-[1421280000000, 20]", r.get(1).toString());
-//            assertEquals("[1421280000000, 30]-[1421280000000, 30]", r.get(2).toString());
-//            planner.setMaxScanRanges(2);
-//            List<GTScanRange> r2 = planner.planScanRanges();
-//            assertEquals("[[1421280000000, 10]-[1421280000000, 30]]", r2.toString());
-//        }
-//    }
-//
-//    @Test
-//    public void verifyFirstRow() throws IOException {
-//        doScanAndVerify(table,
-//                new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null)
-//                        .setFilterPushDown(null).createGTScanRequest(),
-//                "[1421193600000, 30, Yang, 10, 10.5]", //
-//                "[1421193600000, 30, Luke, 10, 10.5]", //
-//                "[1421280000000, 20, Dong, 10, 10.5]", //
-//                "[1421280000000, 20, Jason, 10, 10.5]", //
-//                "[1421280000000, 30, Xu, 10, 10.5]", //
-//                "[1421366400000, 20, Mahone, 10, 10.5]", //
-//                "[1421366400000, 20, Qianhao, 10, 10.5]", //
-//                "[1421366400000, 30, George, 10, 10.5]", //
-//                "[1421366400000, 30, Shaofeng, 10, 10.5]", //
-//                "[1421452800000, 10, Kejia, 10, 10.5]");
-//    }
-//
-//    //for testing GTScanRequest serialization and deserialization
-//    public static GTScanRequest useDeserializedGTScanRequest(GTScanRequest origin) {
-//        ByteBuffer buffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);
-//        GTScanRequest.serializer.serialize(origin, buffer);
-//        buffer.flip();
-//        GTScanRequest sGTScanRequest = GTScanRequest.serializer.deserialize(buffer);
-//
-//        Assert.assertArrayEquals(origin.getAggrMetricsFuncs(), sGTScanRequest.getAggrMetricsFuncs());
-//        Assert.assertEquals(origin.getAggCacheMemThreshold(), sGTScanRequest.getAggCacheMemThreshold(), 0.01);
-//        return sGTScanRequest;
-//    }
-//
-//    @Test
-//    public void verifyScanWithUnevaluatableFilter() throws IOException {
-//        GTInfo info = table.getInfo();
-//
-//        CompareTupleFilter fComp = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
-//        ExtractTupleFilter fUnevaluatable = unevaluatable(info.colRef(1));
-//        LogicalTupleFilter fNotPlusUnevaluatable = not(unevaluatable(info.colRef(1)));
-//        LogicalTupleFilter filter = and(fComp, fUnevaluatable, fNotPlusUnevaluatable);
-//
-//        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
-//                .setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[] { "sum" })
-//                .setFilterPushDown(filter).createGTScanRequest();
-//
-//        // note the unEvaluatable column 1 in filter is added to group by
-//        assertEquals(
-//                "GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [], []], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]",
-//                req.toString());
-//
-//        doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 20, null]",
-//                "[1421280000000, 30, null, 10, null]", "[1421366400000, 20, null, 20, null]",
-//                "[1421366400000, 30, null, 20, null]", "[1421452800000, 10, null, 10, null]");
-//    }
-//
-//    @Test
-//    public void verifyScanWithEvaluatableFilter() throws IOException {
-//        GTInfo info = table.getInfo();
-//
-//        CompareTupleFilter fComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
-//        CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10"));
-//        LogicalTupleFilter filter = and(fComp1, fComp2);
-//
-//        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
-//                .setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[] { "sum" })
-//                .setFilterPushDown(filter).createGTScanRequest();
-//        // note the evaluatable column 1 in filter is added to returned columns but not in group by
-//        assertEquals(
-//                "GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]",
-//                req.toString());
-//
-//        doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 30, null]",
-//                "[1421366400000, 20, null, 40, null]");
-//    }
-//
-//    @Test
-//    public void verifyAggregateAndHavingFilter() throws IOException {
-//        GTInfo info = table.getInfo();
-//
-//        TblColRef havingCol = TblColRef.newInnerColumn("SUM_OF_BIGDECIMAL", InnerDataTypeEnum.LITERAL);
-//        havingCol.getColumnDesc().setId("1"); // point to the first aggregated measure
-//        CompareTupleFilter havingFilter = compare(havingCol, FilterOperatorEnum.GT, "20");
-//
-//        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
-//                .setAggrGroupBy(setOf(1)).setAggrMetrics(setOf(4)).setAggrMetricsFuncs(new String[] { "sum" })
-//                .setHavingFilterPushDown(havingFilter).createGTScanRequest();
-//
-//        doScanAndVerify(table, useDeserializedGTScanRequest(req), "[null, 20, null, null, 42.0]",
-//                "[null, 30, null, null, 52.5]");
-//    }
-//
-//    @SuppressWarnings("unused")
-//    private void testFilterScannerPerfInner(GridTable table, GTInfo info, LogicalTupleFilter filter)
-//            throws IOException {
-//        long start = System.currentTimeMillis();
-//        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
-//                .setFilterPushDown(filter).createGTScanRequest();
-//        int i = 0;
-//        try (IGTScanner scanner = table.scan(req)) {
-//            for (GTRecord r : scanner) {
-//                i++;
-//            }
-//        }
-//        long end = System.currentTimeMillis();
-//        System.out.println(
-//                (end - start) + "ms with filter cache enabled=" + FilterResultCache.DEFAULT_OPTION + ", " + i + " rows");
-//    }
-//
-//    @Test
-//    public void verifyConvertFilterConstants1() {
-//        GTInfo info = table.getInfo();
-//
-//        TableDesc extTable = TableDesc.mockup("ext");
-//        TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
-//        TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
-//
-//        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
-//        CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.EQ, "10");
-//        LogicalTupleFilter filter = and(fComp1, fComp2);
-//
-//        List<TblColRef> colMapping = Lists.newArrayList();
-//        colMapping.add(extColA);
-//        colMapping.add(extColB);
-//
-//        TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-//        assertEquals(
-//                "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 EQ [\\x00]]",
-//                newFilter.toString());
-//    }
-//
-//    @Test
-//    public void verifyConvertFilterConstants2() {
-//        GTInfo info = table.getInfo();
-//
-//        TableDesc extTable = TableDesc.mockup("ext");
-//        TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
-//        TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
-//
-//        List<TblColRef> colMapping = Lists.newArrayList();
-//        colMapping.add(extColA);
-//        colMapping.add(extColB);
-//
-//        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
-//
-//        // $1<"9" round down to FALSE
-//        {
-//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LT, "9"));
-//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-//            assertEquals(
-//                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 ISNOTNULL []]",
-//                    newFilter.toString());
-//        }
-//
-//        // $1<"10" needs no rounding
-//        {
-//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LT, "10"));
-//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-//            assertEquals(
-//                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LT [\\x00]]",
-//                    newFilter.toString());
-//        }
-//
-//        // $1<"11" round down to <="10"
-//        {
-//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LT, "11"));
-//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-//            assertEquals(
-//                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LTE [\\x00]]",
-//                    newFilter.toString());
-//        }
-//
-//        // $1<="9" round down to FALSE
-//        {
-//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LTE, "9"));
-//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-//            assertEquals(ConstantTupleFilter.FALSE, newFilter);
-//        }
-//
-//        // $1<="10" needs no rounding
-//        {
-//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LTE, "10"));
-//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-//            assertEquals(
-//                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LTE [\\x00]]",
-//                    newFilter.toString());
-//        }
-//
-//        // $1<="11" round down to <="10"
-//        {
-//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LTE, "11"));
-//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-//            assertEquals(
-//                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LTE [\\x00]]",
-//                    newFilter.toString());
-//        }
-//    }
-//
-//    @Test
-//    public void verifyConvertFilterConstants3() {
-//        GTInfo info = table.getInfo();
-//
-//        TableDesc extTable = TableDesc.mockup("ext");
-//        TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
-//        TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
-//
-//        List<TblColRef> colMapping = Lists.newArrayList();
-//        colMapping.add(extColA);
-//        colMapping.add(extColB);
-//
-//        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
-//
-//        // $1>"101" round up to FALSE
-//        {
-//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GT, "101"));
-//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-//            assertEquals("AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 ISNOTNULL []]",
-//                    newFilter.toString());
-//        }
-//
-//        // $1>"100" needs no rounding
-//        {
-//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GT, "100"));
-//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-//            assertEquals(
-//                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GT [\\x09]]",
-//                    newFilter.toString());
-//        }
-//
-//        // $1>"99" round up to >="100"
-//        {
-//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GT, "99"));
-//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-//            assertEquals(
-//                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GTE [\\x09]]",
-//                    newFilter.toString());
-//        }
-//
-//        // $1>="101" round up to FALSE
-//        {
-//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GTE, "101"));
-//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-//            assertEquals(ConstantTupleFilter.FALSE, newFilter);
-//        }
-//
-//        // $1>="100" needs no rounding
-//        {
-//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GTE, "100"));
-//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-//            assertEquals(
-//                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GTE [\\x09]]",
-//                    newFilter.toString());
-//        }
-//
-//        // $1>="99" round up to >="100"
-//        {
-//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GTE, "99"));
-//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-//            assertEquals(
-//                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GTE [\\x09]]",
-//                    newFilter.toString());
-//        }
-//    }
-//
-//    @Test
-//    public void verifyConvertFilterConstants4() {
-//        GTInfo info = table.getInfo();
-//
-//        TableDesc extTable = TableDesc.mockup("ext");
-//        TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
-//        TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
-//
-//        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
-//        CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.IN, "9", "10", "15");
-//        LogicalTupleFilter filter = and(fComp1, fComp2);
-//
-//        List<TblColRef> colMapping = Lists.newArrayList();
-//        colMapping.add(extColA);
-//        colMapping.add(extColB);
-//
-//        // $1 in ("9", "10", "15") has only "10" left
-//        TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-//        assertEquals(
-//                "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 IN [\\x00]]",
-//                newFilter.toString());
-//    }
-//
-//    private void doScanAndVerify(GridTable table, GTScanRequest req, String... verifyRows) throws IOException {
-//        System.out.println(req);
-//        try (IGTScanner scanner = table.scan(req)) {
-//            int i = 0;
-//            for (GTRecord r : scanner) {
-//                System.out.println(r);
-//                if (verifyRows == null || i >= verifyRows.length) {
-//                    Assert.fail();
-//                }
-//                assertEquals(verifyRows[i], r.toString());
-//                i++;
-//            }
-//        }
-//    }
-//
-//    public static ByteArray enc(GTInfo info, int col, String value) {
-//        ByteBuffer buf = ByteBuffer.allocate(info.getMaxColumnLength());
-//        info.getCodeSystem().encodeColumnValue(col, value, buf);
-//        return ByteArray.copyOf(buf.array(), buf.arrayOffset(), buf.position());
-//    }
-//
-//    public static ExtractTupleFilter unevaluatable(TblColRef col) {
-//        ExtractTupleFilter r = new ExtractTupleFilter(FilterOperatorEnum.EXTRACT);
-//        r.addChild(new ColumnTupleFilter(col));
-//        return r;
-//    }
-//
-//    public static CompareTupleFilter compare(TblColRef col, FilterOperatorEnum op, Object... value) {
-//        CompareTupleFilter result = new CompareTupleFilter(op);
-//        result.addChild(new ColumnTupleFilter(col));
-//        result.addChild(new ConstantTupleFilter(Arrays.asList(value)));
-//        return result;
-//    }
-//
-//    public static LogicalTupleFilter and(TupleFilter... children) {
-//        return logic(FilterOperatorEnum.AND, children);
-//    }
-//
-//    public static LogicalTupleFilter or(TupleFilter... children) {
-//        return logic(FilterOperatorEnum.OR, children);
-//    }
-//
-//    public static LogicalTupleFilter not(TupleFilter child) {
-//        return logic(FilterOperatorEnum.NOT, child);
-//    }
-//
-//    public static LogicalTupleFilter logic(FilterOperatorEnum op, TupleFilter... children) {
-//        LogicalTupleFilter result = new LogicalTupleFilter(op);
-//        for (TupleFilter c : children) {
-//            result.addChild(c);
-//        }
-//        return result;
-//    }
-//
-//    public static GridTable newTestTable() throws IOException {
-//        GTInfo info = newInfo();
-//        GTSimpleMemStore store = new GTSimpleMemStore(info);
-//        GridTable table = new GridTable(info, store);
-//
-//        GTRecord r = new GTRecord(table.getInfo());
-//        GTBuilder builder = table.rebuild();
-//
-//        builder.write(r.setValues("2015-01-14", "30", "Yang", new Long(10), new BigDecimal("10.5")));
-//        builder.write(r.setValues("2015-01-14", "30", "Luke", new Long(10), new BigDecimal("10.5")));
-//        builder.write(r.setValues("2015-01-15", "20", "Dong", new Long(10), new BigDecimal("10.5")));
-//        builder.write(r.setValues("2015-01-15", "20", "Jason", new Long(10), new BigDecimal("10.5")));
-//        builder.write(r.setValues("2015-01-15", "30", "Xu", new Long(10), new BigDecimal("10.5")));
-//        builder.write(r.setValues("2015-01-16", "20", "Mahone", new Long(10), new BigDecimal("10.5")));
-//        builder.write(r.setValues("2015-01-16", "20", "Qianhao", new Long(10), new BigDecimal("10.5")));
-//        builder.write(r.setValues("2015-01-16", "30", "George", new Long(10), new BigDecimal("10.5")));
-//        builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new Long(10), new BigDecimal("10.5")));
-//        builder.write(r.setValues("2015-01-17", "10", "Kejia", new Long(10), new BigDecimal("10.5")));
-//        builder.close();
-//
-//        return table;
-//    }
-//
-//    static GridTable newTestPerfTable() throws IOException {
-//        GTInfo info = newInfo();
-//        GTSimpleMemStore store = new GTSimpleMemStore(info);
-//        GridTable table = new GridTable(info, store);
-//
-//        GTRecord r = new GTRecord(table.getInfo());
-//        GTBuilder builder = table.rebuild();
-//
-//        for (int i = 0; i < 100000; i++) {
-//            for (int j = 0; j < 10; j++)
-//                builder.write(r.setValues("2015-01-14", "30", "Yang", new Long(10), new BigDecimal("10.5")));
-//
-//            for (int j = 0; j < 10; j++)
-//                builder.write(r.setValues("2015-01-14", "30", "Luke", new Long(10), new BigDecimal("10.5")));
-//
-//            for (int j = 0; j < 10; j++)
-//                builder.write(r.setValues("2015-01-15", "20", "Dong", new Long(10), new BigDecimal("10.5")));
-//
-//            for (int j = 0; j < 10; j++)
-//                builder.write(r.setValues("2015-01-15", "20", "Jason", new Long(10), new BigDecimal("10.5")));
-//
-//            for (int j = 0; j < 10; j++)
-//                builder.write(r.setValues("2015-01-15", "30", "Xu", new Long(10), new BigDecimal("10.5")));
-//
-//            for (int j = 0; j < 10; j++)
-//                builder.write(r.setValues("2015-01-16", "20", "Mahone", new Long(10), new BigDecimal("10.5")));
-//
-//            for (int j = 0; j < 10; j++)
-//                builder.write(r.setValues("2015-01-16", "20", "Qianhao", new Long(10), new BigDecimal("10.5")));
-//
-//            for (int j = 0; j < 10; j++)
-//                builder.write(r.setValues("2015-01-16", "30", "George", new Long(10), new BigDecimal("10.5")));
-//
-//            for (int j = 0; j < 10; j++)
-//                builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new Long(10), new BigDecimal("10.5")));
-//
-//            for (int j = 0; j < 10; j++)
-//                builder.write(r.setValues("2015-01-17", "10", "Kejia", new Long(10), new BigDecimal("10.5")));
-//        }
-//        builder.close();
-//
-//        return table;
-//    }
-//
-//    static GTInfo newInfo() {
-//        Builder builder = GTInfo.builder();
-//        builder.setCodeSystem(newDictCodeSystem());
-//        builder.setColumns(//
-//                DataType.getType("timestamp"), //
-//                DataType.getType("integer"), //
-//                DataType.getType("varchar(10)"), //
-//                DataType.getType("bigint"), //
-//                DataType.getType("decimal") //
-//        );
-//        builder.setPrimaryKey(setOf(0, 1));
-//        builder.setColumnPreferIndex(setOf(0));
-//        builder.enableColumnBlock(new ImmutableBitSet[] { setOf(0, 1), setOf(2), setOf(3, 4) });
-//        builder.enableRowBlock(4);
-//        GTInfo info = builder.build();
-//        return info;
-//    }
-//
-//    private static CubeCodeSystem newDictCodeSystem() {
-//        DimensionEncoding[] dimEncs = new DimensionEncoding[3];
-//        dimEncs[1] = new DictionaryDimEnc(newDictionaryOfInteger());
-//        dimEncs[2] = new DictionaryDimEnc(newDictionaryOfString());
-//        return new CubeCodeSystem(dimEncs);
-//    }
-//
-//    private static Dictionary newDictionaryOfString() {
-//        TrieDictionaryBuilder<String> builder = new TrieDictionaryBuilder<>(new StringBytesConverter());
-//        builder.addValue("Dong");
-//        builder.addValue("George");
-//        builder.addValue("Jason");
-//        builder.addValue("Kejia");
-//        builder.addValue("Luke");
-//        builder.addValue("Mahone");
-//        builder.addValue("Qianhao");
-//        builder.addValue("Shaofeng");
-//        builder.addValue("Xu");
-//        builder.addValue("Yang");
-//        return builder.build(0);
-//    }
-//
-//    private static Dictionary newDictionaryOfInteger() {
-//        NumberDictionaryForestBuilder builder = new NumberDictionaryForestBuilder();
-//        builder.addValue("10");
-//        builder.addValue("20");
-//        builder.addValue("30");
-//        builder.addValue("40");
-//        builder.addValue("50");
-//        builder.addValue("60");
-//        builder.addValue("70");
-//        builder.addValue("80");
-//        builder.addValue("90");
-//        builder.addValue("100");
-//        return builder.build();
-//    }
-//
-//    public static ImmutableBitSet setOf(int... values) {
-//        BitSet set = new BitSet();
-//        for (int i : values)
-//            set.set(i);
-//        return new ImmutableBitSet(set);
-//    }
+
+    private GridTable table;
+    private GTInfo info;
+    private CompareTupleFilter timeComp0;
+    private CompareTupleFilter timeComp1;
+    private CompareTupleFilter timeComp2;
+    private CompareTupleFilter timeComp3;
+    private CompareTupleFilter timeComp4;
+    private CompareTupleFilter timeComp5;
+    private CompareTupleFilter timeComp6;
+    private CompareTupleFilter timeComp7;
+    private CompareTupleFilter ageComp1;
+    private CompareTupleFilter ageComp2;
+    private CompareTupleFilter ageComp3;
+    private CompareTupleFilter ageComp4;
+
+    @After
+    public void after() throws Exception {
+
+        this.cleanupTestMetadata();
+    }
+
+    @Before
+    public void setup() throws IOException {
+
+        this.createTestMetadata();
+
+        table = newTestTable();
+        info = table.getInfo();
+
+        timeComp0 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-14"));
+        timeComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+        timeComp2 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-13"));
+        timeComp3 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-15"));
+        timeComp4 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-15"));
+        timeComp5 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-15"));
+        timeComp6 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-14"));
+        timeComp7 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "1970-01-01"));
+        ageComp1 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "10"));
+        ageComp2 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "20"));
+        ageComp3 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "30"));
+        ageComp4 = compare(info.colRef(1), FilterOperatorEnum.NEQ, enc(info, 1, "30"));
+
+    }
+
+    @Test
+    public void verifySegmentSkipping() {
+
+        ByteArray segmentStart = enc(info, 0, "2015-01-14");
+        ByteArray segmentStartX = enc(info, 0, "2015-01-14 00:00:00");//when partition col is dict encoded, time format will be free
+        assertEquals(segmentStart, segmentStartX);
+
+        {
+            LogicalTupleFilter filter = and(timeComp0, ageComp1);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(1, r.size());//scan range are [close,close]
+            assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
+            assertEquals(1, r.get(0).fuzzyKeys.size());
+            assertEquals("[[null, 10, null, null, null]]", r.get(0).fuzzyKeys.toString());
+        }
+        {
+            LogicalTupleFilter filter = and(timeComp2, ageComp1);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(1, r.size());
+        }
+        {
+            LogicalTupleFilter filter = and(timeComp4, ageComp1);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(1, r.size());
+        }
+        {
+            LogicalTupleFilter filter = and(timeComp5, ageComp1);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(1, r.size());
+        }
+        {
+            LogicalTupleFilter filter = or(and(timeComp2, ageComp1), and(timeComp1, ageComp1),
+                    and(timeComp6, ageComp1));
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(2, r.size());
+            assertEquals("[1421193600000, 10]-[null, 10]", r.get(1).toString());
+            assertEquals("[[null, 10, null, null, null], [1421193600000, 10, null, null, null]]",
+                    r.get(1).fuzzyKeys.toString());
+        }
+        {
+            LogicalTupleFilter filter = or(and(timeComp3, ageComp3), and(timeComp7, ageComp1));
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals("[[0, 10]-[1421280000000, 30]]", r.toString());
+        }
+        {
+            LogicalTupleFilter filter = or(timeComp2, timeComp1, timeComp6);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(2, r.size());
+            assertEquals("[1421193600000, null]-[null, null]", r.get(1).toString());
+            assertEquals(0, r.get(1).fuzzyKeys.size());
+        }
+        {
+            //skip FALSE filter
+            LogicalTupleFilter filter = and(ageComp1, ConstantTupleFilter.FALSE);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(0, r.size());
+        }
+        {
+            //TRUE or FALSE filter
+            LogicalTupleFilter filter = or(ConstantTupleFilter.TRUE, ConstantTupleFilter.FALSE);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(1, r.size());
+            assertEquals("[null, null]-[null, null]", r.get(0).toString());
+        }
+        {
+            //TRUE or other filter
+            LogicalTupleFilter filter = or(ageComp1, ConstantTupleFilter.TRUE);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(1, r.size());
+            assertEquals("[null, null]-[null, null]", r.get(0).toString());
+        }
+    }
+
+    @Test
+    public void verifySegmentSkipping2() {
+        {
+            LogicalTupleFilter filter = and(timeComp0, ageComp1);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(1, r.size());//scan range are [close,close]
+            assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
+            assertEquals(1, r.get(0).fuzzyKeys.size());
+            assertEquals("[[null, 10, null, null, null]]", r.get(0).fuzzyKeys.toString());
+        }
+
+        {
+            LogicalTupleFilter filter = and(timeComp5, ageComp1);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(1, r.size());//scan range are [close,close]
+        }
+    }
+
+    @Test
+    public void verifyScanRangePlanner() {
+
+        // flatten or-and & hbase fuzzy value
+        {
+            LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2));
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(1, r.size());
+            assertEquals("[1421193600000, 10]-[null, 20]", r.get(0).toString());
+            assertEquals("[[null, 10, null, null, null], [null, 20, null, null, null]]", r.get(0).fuzzyKeys.toString());
+        }
+
+        // pre-evaluate ever false
+        {
+            LogicalTupleFilter filter = and(timeComp1, timeComp2);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(0, r.size());
+        }
+
+        // pre-evaluate ever true
+        {
+            LogicalTupleFilter filter = or(timeComp1, ageComp4);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals("[[null, null]-[null, null]]", r.toString());
+        }
+
+        // merge overlap range
+        {
+            LogicalTupleFilter filter = or(timeComp1, timeComp3);
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals("[[null, null]-[null, null]]", r.toString());
+        }
+
+        // merge too many ranges
+        {
+            LogicalTupleFilter filter = or(and(timeComp4, ageComp1), and(timeComp4, ageComp2),
+                    and(timeComp4, ageComp3));
+            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
+            List<GTScanRange> r = planner.planScanRanges();
+            assertEquals(3, r.size());
+            assertEquals("[1421280000000, 10]-[1421280000000, 10]", r.get(0).toString());
+            assertEquals("[1421280000000, 20]-[1421280000000, 20]", r.get(1).toString());
+            assertEquals("[1421280000000, 30]-[1421280000000, 30]", r.get(2).toString());
+            planner.setMaxScanRanges(2);
+            List<GTScanRange> r2 = planner.planScanRanges();
+            assertEquals("[[1421280000000, 10]-[1421280000000, 30]]", r2.toString());
+        }
+    }
+
+    @Test
+    public void verifyFirstRow() throws IOException {
+        doScanAndVerify(table,
+                new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null)
+                        .setFilterPushDown(null).createGTScanRequest(),
+                "[1421193600000, 30, Yang, 10, 10.5]", //
+                "[1421193600000, 30, Luke, 10, 10.5]", //
+                "[1421280000000, 20, Dong, 10, 10.5]", //
+                "[1421280000000, 20, Jason, 10, 10.5]", //
+                "[1421280000000, 30, Xu, 10, 10.5]", //
+                "[1421366400000, 20, Mahone, 10, 10.5]", //
+                "[1421366400000, 20, Qianhao, 10, 10.5]", //
+                "[1421366400000, 30, George, 10, 10.5]", //
+                "[1421366400000, 30, Shaofeng, 10, 10.5]", //
+                "[1421452800000, 10, Kejia, 10, 10.5]");
+    }
+
+    //for testing GTScanRequest serialization and deserialization
+    public static GTScanRequest useDeserializedGTScanRequest(GTScanRequest origin) {
+        ByteBuffer buffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);
+        GTScanRequest.serializer.serialize(origin, buffer);
+        buffer.flip();
+        GTScanRequest sGTScanRequest = GTScanRequest.serializer.deserialize(buffer);
+
+        Assert.assertArrayEquals(origin.getAggrMetricsFuncs(), sGTScanRequest.getAggrMetricsFuncs());
+        Assert.assertEquals(origin.getAggCacheMemThreshold(), sGTScanRequest.getAggCacheMemThreshold(), 0.01);
+        return sGTScanRequest;
+    }
+
+    @Test
+    public void verifyScanWithUnevaluatableFilter() throws IOException {
+        GTInfo info = table.getInfo();
+
+        CompareTupleFilter fComp = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+        ExtractTupleFilter fUnevaluatable = unevaluatable(info.colRef(1));
+        LogicalTupleFilter fNotPlusUnevaluatable = not(unevaluatable(info.colRef(1)));
+        LogicalTupleFilter filter = and(fComp, fUnevaluatable, fNotPlusUnevaluatable);
+
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
+                .setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[] { "sum" })
+                .setFilterPushDown(filter).createGTScanRequest();
+
+        // note the unEvaluatable column 1 in filter is added to group by
+        assertEquals(
+                "GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [], []], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]",
+                req.toString());
+
+        doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 20, null]",
+                "[1421280000000, 30, null, 10, null]", "[1421366400000, 20, null, 20, null]",
+                "[1421366400000, 30, null, 20, null]", "[1421452800000, 10, null, 10, null]");
+    }
+
+    @Test
+    public void verifyScanWithEvaluatableFilter() throws IOException {
+        GTInfo info = table.getInfo();
+
+        CompareTupleFilter fComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+        CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10"));
+        LogicalTupleFilter filter = and(fComp1, fComp2);
+
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
+                .setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[] { "sum" })
+                .setFilterPushDown(filter).createGTScanRequest();
+        // note the evaluatable column 1 in filter is added to returned columns but not in group by
+        assertEquals(
+                "GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]",
+                req.toString());
+
+        doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 30, null]",
+                "[1421366400000, 20, null, 40, null]");
+    }
+
+    @Test
+    public void verifyAggregateAndHavingFilter() throws IOException {
+        GTInfo info = table.getInfo();
+
+        TblColRef havingCol = TblColRef.newInnerColumn("SUM_OF_BIGDECIMAL", InnerDataTypeEnum.LITERAL);
+        havingCol.getColumnDesc().setId("1"); // point to the first aggregated measure
+        CompareTupleFilter havingFilter = compare(havingCol, FilterOperatorEnum.GT, "20");
+
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
+                .setAggrGroupBy(setOf(1)).setAggrMetrics(setOf(4)).setAggrMetricsFuncs(new String[] { "sum" })
+                .setHavingFilterPushDown(havingFilter).createGTScanRequest();
+
+        doScanAndVerify(table, useDeserializedGTScanRequest(req), "[null, 20, null, null, 42.0]",
+                "[null, 30, null, null, 52.5]");
+    }
+
+    @SuppressWarnings("unused")
+    private void testFilterScannerPerfInner(GridTable table, GTInfo info, LogicalTupleFilter filter)
+            throws IOException {
+        long start = System.currentTimeMillis();
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
+                .setFilterPushDown(filter).createGTScanRequest();
+        int i = 0;
+        try (IGTScanner scanner = table.scan(req)) {
+            for (GTRecord r : scanner) {
+                i++;
+            }
+        }
+        long end = System.currentTimeMillis();
+        System.out.println(
+                (end - start) + "ms with filter cache enabled=" + FilterResultCache.DEFAULT_OPTION + ", " + i + " rows");
+    }
+
+    @Test
+    public void verifyConvertFilterConstants1() {
+        GTInfo info = table.getInfo();
+
+        TableDesc extTable = TableDesc.mockup("ext");
+        TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
+        TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
+
+        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+        CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.EQ, "10");
+        LogicalTupleFilter filter = and(fComp1, fComp2);
+
+        List<TblColRef> colMapping = Lists.newArrayList();
+        colMapping.add(extColA);
+        colMapping.add(extColB);
+
+        TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+        assertEquals(
+                "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 EQ [\\x00]]",
+                newFilter.toString());
+    }
+
+    @Test
+    public void verifyConvertFilterConstants2() {
+        GTInfo info = table.getInfo();
+
+        TableDesc extTable = TableDesc.mockup("ext");
+        TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
+        TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
+
+        List<TblColRef> colMapping = Lists.newArrayList();
+        colMapping.add(extColA);
+        colMapping.add(extColB);
+        
+        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+        
+        // $1<"9" round down to FALSE
+        {
+            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LT, "9"));
+            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+            assertEquals(
+                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 ISNOTNULL []]",
+                    newFilter.toString());
+        }
+
+        // $1<"10" needs no rounding
+        {
+            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LT, "10"));
+            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+            assertEquals(
+                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LT [\\x00]]",
+                    newFilter.toString());
+        }
+        
+        // $1<"11" round down to <="10"
+        {
+            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LT, "11"));
+            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+            assertEquals(
+                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LTE [\\x00]]",
+                    newFilter.toString());
+        }
+        
+        // $1<="9" round down to FALSE
+        {
+            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LTE, "9"));
+            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+            assertEquals(ConstantTupleFilter.FALSE, newFilter);
+        }
+        
+        // $1<="10" needs no rounding
+        {
+            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LTE, "10"));
+            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+            assertEquals(
+                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LTE [\\x00]]",
+                    newFilter.toString());
+        }
+        
+        // $1<="11" round down to <="10"
+        {
+            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LTE, "11"));
+            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+            assertEquals(
+                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LTE [\\x00]]",
+                    newFilter.toString());
+        }
+    }
+
+    @Test
+    public void verifyConvertFilterConstants3() {
+        GTInfo info = table.getInfo();
+
+        TableDesc extTable = TableDesc.mockup("ext");
+        TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
+        TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
+
+        List<TblColRef> colMapping = Lists.newArrayList();
+        colMapping.add(extColA);
+        colMapping.add(extColB);
+        
+        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+        
+        // $1>"101" round up to FALSE
+        {
+            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GT, "101"));
+            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+            assertEquals("AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 ISNOTNULL []]",
+                    newFilter.toString());
+        }
+
+        // $1>"100" needs no rounding
+        {
+            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GT, "100"));
+            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+            assertEquals(
+                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GT [\\x09]]",
+                    newFilter.toString());
+        }
+        
+        // $1>"99" round up to >="100"
+        {
+            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GT, "99"));
+            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+            assertEquals(
+                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GTE [\\x09]]",
+                    newFilter.toString());
+        }
+        
+        // $1>="101" round up to FALSE
+        {
+            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GTE, "101"));
+            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+            assertEquals(ConstantTupleFilter.FALSE, newFilter);
+        }
+        
+        // $1>="100" needs no rounding
+        {
+            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GTE, "100"));
+            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+            assertEquals(
+                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GTE [\\x09]]",
+                    newFilter.toString());
+        }
+        
+        // $1>="99" round up to >="100"
+        {
+            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GTE, "99"));
+            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+            assertEquals(
+                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GTE [\\x09]]",
+                    newFilter.toString());
+        }
+    }
+
+    @Test
+    public void verifyConvertFilterConstants4() {
+        GTInfo info = table.getInfo();
+
+        TableDesc extTable = TableDesc.mockup("ext");
+        TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
+        TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
+
+        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+        CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.IN, "9", "10", "15");
+        LogicalTupleFilter filter = and(fComp1, fComp2);
+
+        List<TblColRef> colMapping = Lists.newArrayList();
+        colMapping.add(extColA);
+        colMapping.add(extColB);
+
+        // $1 in ("9", "10", "15") has only "10" left
+        TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+        assertEquals(
+                "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 IN [\\x00]]",
+                newFilter.toString());
+    }
+
+    private void doScanAndVerify(GridTable table, GTScanRequest req, String... verifyRows) throws IOException {
+        System.out.println(req);
+        try (IGTScanner scanner = table.scan(req)) {
+            int i = 0;
+            for (GTRecord r : scanner) {
+                System.out.println(r);
+                if (verifyRows == null || i >= verifyRows.length) {
+                    Assert.fail();
+                }
+                assertEquals(verifyRows[i], r.toString());
+                i++;
+            }
+        }
+    }
+
+    public static ByteArray enc(GTInfo info, int col, String value) {
+        ByteBuffer buf = ByteBuffer.allocate(info.getMaxColumnLength());
+        info.getCodeSystem().encodeColumnValue(col, value, buf);
+        return ByteArray.copyOf(buf.array(), buf.arrayOffset(), buf.position());
+    }
+
+    public static ExtractTupleFilter unevaluatable(TblColRef col) {
+        ExtractTupleFilter r = new ExtractTupleFilter(FilterOperatorEnum.EXTRACT);
+        r.addChild(new ColumnTupleFilter(col));
+        return r;
+    }
+
+    public static CompareTupleFilter compare(TblColRef col, FilterOperatorEnum op, Object... value) {
+        CompareTupleFilter result = new CompareTupleFilter(op);
+        result.addChild(new ColumnTupleFilter(col));
+        result.addChild(new ConstantTupleFilter(Arrays.asList(value)));
+        return result;
+    }
+
+    public static LogicalTupleFilter and(TupleFilter... children) {
+        return logic(FilterOperatorEnum.AND, children);
+    }
+
+    public static LogicalTupleFilter or(TupleFilter... children) {
+        return logic(FilterOperatorEnum.OR, children);
+    }
+
+    public static LogicalTupleFilter not(TupleFilter child) {
+        return logic(FilterOperatorEnum.NOT, child);
+    }
+
+    public static LogicalTupleFilter logic(FilterOperatorEnum op, TupleFilter... children) {
+        LogicalTupleFilter result = new LogicalTupleFilter(op);
+        for (TupleFilter c : children) {
+            result.addChild(c);
+        }
+        return result;
+    }
+
+    public static GridTable newTestTable() throws IOException {
+        GTInfo info = newInfo();
+        GTSimpleMemStore store = new GTSimpleMemStore(info);
+        GridTable table = new GridTable(info, store);
+
+        GTRecord r = new GTRecord(table.getInfo());
+        GTBuilder builder = table.rebuild();
+
+        builder.write(r.setValues("2015-01-14", "30", "Yang", new Long(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-14", "30", "Luke", new Long(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-15", "20", "Dong", new Long(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-15", "20", "Jason", new Long(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-15", "30", "Xu", new Long(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-16", "20", "Mahone", new Long(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-16", "20", "Qianhao", new Long(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-16", "30", "George", new Long(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new Long(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-17", "10", "Kejia", new Long(10), new BigDecimal("10.5")));
+        builder.close();
+
+        return table;
+    }
+
+    static GridTable newTestPerfTable() throws IOException {
+        GTInfo info = newInfo();
+        GTSimpleMemStore store = new GTSimpleMemStore(info);
+        GridTable table = new GridTable(info, store);
+
+        GTRecord r = new GTRecord(table.getInfo());
+        GTBuilder builder = table.rebuild();
+
+        for (int i = 0; i < 100000; i++) {
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-14", "30", "Yang", new Long(10), new BigDecimal("10.5")));
+
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-14", "30", "Luke", new Long(10), new BigDecimal("10.5")));
+
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-15", "20", "Dong", new Long(10), new BigDecimal("10.5")));
+
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-15", "20", "Jason", new Long(10), new BigDecimal("10.5")));
+
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-15", "30", "Xu", new Long(10), new BigDecimal("10.5")));
+
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-16", "20", "Mahone", new Long(10), new BigDecimal("10.5")));
+
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-16", "20", "Qianhao", new Long(10), new BigDecimal("10.5")));
+
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-16", "30", "George", new Long(10), new BigDecimal("10.5")));
+
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new Long(10), new BigDecimal("10.5")));
+
+            for (int j = 0; j < 10; j++)
+                builder.write(r.setValues("2015-01-17", "10", "Kejia", new Long(10), new BigDecimal("10.5")));
+        }
+        builder.close();
+
+        return table;
+    }
+
+    static GTInfo newInfo() {
+        Builder builder = GTInfo.builder();
+        builder.setCodeSystem(newDictCodeSystem());
+        builder.setColumns(//
+                DataType.getType("timestamp"), //
+                DataType.getType("integer"), //
+                DataType.getType("varchar(10)"), //
+                DataType.getType("bigint"), //
+                DataType.getType("decimal") //
+        );
+        builder.setPrimaryKey(setOf(0, 1));
+        builder.setColumnPreferIndex(setOf(0));
+        builder.enableColumnBlock(new ImmutableBitSet[] { setOf(0, 1), setOf(2), setOf(3, 4) });
+        builder.enableRowBlock(4);
+        GTInfo info = builder.build();
+        return info;
+    }
+
+    private static CubeCodeSystem newDictCodeSystem() {
+        DimensionEncoding[] dimEncs = new DimensionEncoding[3];
+        dimEncs[1] = new DictionaryDimEnc(newDictionaryOfInteger());
+        dimEncs[2] = new DictionaryDimEnc(newDictionaryOfString());
+        return new CubeCodeSystem(dimEncs);
+    }
+
+    private static Dictionary newDictionaryOfString() {
+        TrieDictionaryBuilder<String> builder = new TrieDictionaryBuilder<>(new StringBytesConverter());
+        builder.addValue("Dong");
+        builder.addValue("George");
+        builder.addValue("Jason");
+        builder.addValue("Kejia");
+        builder.addValue("Luke");
+        builder.addValue("Mahone");
+        builder.addValue("Qianhao");
+        builder.addValue("Shaofeng");
+        builder.addValue("Xu");
+        builder.addValue("Yang");
+        return builder.build(0);
+    }
+
+    private static Dictionary newDictionaryOfInteger() {
+        NumberDictionaryForestBuilder builder = new NumberDictionaryForestBuilder();
+        builder.addValue("10");
+        builder.addValue("20");
+        builder.addValue("30");
+        builder.addValue("40");
+        builder.addValue("50");
+        builder.addValue("60");
+        builder.addValue("70");
+        builder.addValue("80");
+        builder.addValue("90");
+        builder.addValue("100");
+        return builder.build();
+    }
+
+    public static ImmutableBitSet setOf(int... values) {
+        BitSet set = new BitSet();
+        for (int i : values)
+            set.set(i);
+        return new ImmutableBitSet(set);
+    }
 }
diff --git a/pom.xml b/pom.xml
index 3fe15f2..75d33a9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -284,11 +284,11 @@
         <artifactId>kylin-core-metadata</artifactId>
         <version>${project.version}</version>
       </dependency>
-<!--      <dependency>-->
-<!--        <groupId>org.apache.kylin</groupId>-->
-<!--        <artifactId>kylin-core-dictionary</artifactId>-->
-<!--        <version>${project.version}</version>-->
-<!--      </dependency>-->
+      <dependency>
+        <groupId>org.apache.kylin</groupId>
+        <artifactId>kylin-core-dictionary</artifactId>
+        <version>${project.version}</version>
+      </dependency>
       <dependency>
         <groupId>org.apache.kylin</groupId>
         <artifactId>kylin-core-cube</artifactId>
@@ -1530,7 +1530,7 @@
     <module>external</module>
     <module>core-common</module>
     <module>core-metadata</module>
-<!--    <module>core-dictionary</module>-->
+    <module>core-dictionary</module>
     <module>core-cube</module>
     <module>core-job</module>
     <module>core-storage</module>
@@ -1602,7 +1602,7 @@
 
   <profiles>
     <profile>
-      <id>spark2</id>
+      <id>sandbox</id>
       <activation>
         <activeByDefault>true</activeByDefault>
         <property>
@@ -1745,6 +1745,114 @@
         </plugins>
       </build>
     </profile>
+    <profile>
+      <id>cdh5.7</id>
+      <properties>
+        <hadoop2.version>2.6.0-cdh5.7.0</hadoop2.version>
+        <yarn.version>2.6.0-cdh5.7.0</yarn.version>
+        <hive.version>1.1.0-cdh5.7.0</hive.version>
+        <hive-hcatalog.version>1.1.0-cdh5.7.0</hive-hcatalog.version>
+        <hbase-hadoop2.version>1.2.0-cdh5.7.0</hbase-hadoop2.version>
+        <zookeeper.version>3.4.5-cdh5.7.0</zookeeper.version>
+      </properties>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <fork>true</fork>
+              <meminitial>1024m</meminitial>
+              <maxmem>2048m</maxmem>
+            </configuration>
+          </plugin>
+
+    <plugin>
+      <groupId>org.apache.maven.plugins</groupId>
+      <artifactId>maven-dependency-plugin</artifactId>
+      <executions>
+        <execution>
+          <id>copy-jamm</id>
+          <goals>
+            <goal>copy</goal>
+          </goals>
+          <phase>generate-test-resources</phase>
+          <configuration>
+            <artifactItems>
+              <artifactItem>
+                <groupId>com.github.jbellis</groupId>
+                <artifactId>jamm</artifactId>
+                <outputDirectory>${project.build.testOutputDirectory}
+                </outputDirectory>
+                <destFileName>jamm.jar</destFileName>
+              </artifactItem>
+            </artifactItems>
+          </configuration>
+        </execution>
+      </executions>
+    </plugin>
+
+    <plugin>
+      <groupId>org.jacoco</groupId>
+      <artifactId>jacoco-maven-plugin</artifactId>
+      <configuration>
+        <append>true</append>
+        <destFile>
+          ${sonar.jacoco.reportPaths}
+        </destFile>
+      </configuration>
+      <executions>
+        <execution>
+          <id>pre-test</id>
+          <goals>
+            <goal>prepare-agent</goal>
+          </goals>
+          <configuration>
+            <propertyName>surefireArgLine</propertyName>
+          </configuration>
+        </execution>
+        <execution>
+          <id>post-test</id>
+          <phase>test</phase>
+          <goals>
+            <goal>report</goal>
+          </goals>
+        </execution>
+      </executions>
+    </plugin>
+    <plugin>
+      <groupId>org.apache.maven.plugins</groupId>
+      <artifactId>maven-surefire-plugin</artifactId>
+      <version>2.21.0</version>
+      <configuration>
+        <reportsDirectory>${project.basedir}/../target/surefire-reports
+        </reportsDirectory>
+        <excludes>
+          <exclude>**/IT*.java</exclude>
+          <exclude>org.apache.kylin.engine.spark2.NManualBuildAndQueryCuboidTest</exclude>
+          <exclude>org.apache.kylin.engine.spark2.NBuildAndQueryTest</exclude>
+          <exclude>org.apache.kylin.engine.spark2.NBadQueryAndPushDownTest</exclude>
+        </excludes>
+        <systemProperties>
+          <property>
+            <name>buildCubeUsingProvidedData</name>
+            <value>false</value>
+          </property>
+          <property>
+            <name>log4j.configuration</name>
+            <value>
+              file:${project.basedir}/../build/conf/kylin-tools-log4j.properties
+            </value>
+          </property>
+        </systemProperties>
+        <argLine>-javaagent:${project.build.testOutputDirectory}/jamm.jar
+          ${argLine} ${surefireArgLine}
+        </argLine>
+      </configuration>
+    </plugin>
+  </plugins>
+</build>
+</profile>
 <profile>
 <!-- This profile adds/overrides few features of the 'apache-release'
      profile in the parent pom. -->
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java b/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java
new file mode 100644
index 0000000..ad2e20c
--- /dev/null
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query.enumerator;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.kylin.common.debug.BackdoorToggles;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.RealizationEntry;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.tuple.Tuple;
+import org.apache.kylin.query.relnode.OLAPContext;
+import org.apache.kylin.query.schema.OLAPTable;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class LookupTableEnumerator implements Enumerator<Object[]> {
+    private final static Logger logger = LoggerFactory.getLogger(LookupTableEnumerator.class);
+
+    private final ILookupTable lookupTable;
+    private final List<ColumnDesc> colDescs;
+    private final Object[] current;
+    private Iterator<String[]> iterator;
+
+    public LookupTableEnumerator(OLAPContext olapContext) {
+
+        //TODO: assuming LookupTableEnumerator is handled by a cube
+        CubeInstance cube = null;
+
+        if (olapContext.realization instanceof CubeInstance) {
+            cube = (CubeInstance) olapContext.realization;
+            ProjectInstance project = cube.getProjectInstance();
+            List<RealizationEntry> realizationEntries = project.getRealizationEntries();
+            String lookupTableName = olapContext.firstTableScan.getTableName();
+            CubeManager cubeMgr = CubeManager.getInstance(cube.getConfig());
+
+            // Make force hit cube in lookup table
+            String forceHitCubeName = BackdoorToggles.getForceHitCube();
+            if (!StringUtil.isEmpty(forceHitCubeName)) {
+                String forceHitCubeNameLower = forceHitCubeName.toLowerCase(Locale.ROOT);
+                String[] forceHitCubeNames = forceHitCubeNameLower.split(",");
+                final Set<String> forceHitCubeNameSet = new HashSet<String>(Arrays.asList(forceHitCubeNames));
+                cube = cubeMgr.findLatestSnapshot(
+                        (List<RealizationEntry>) realizationEntries.stream()
+                                .filter(x -> forceHitCubeNameSet.contains(x.getRealization().toLowerCase(Locale.ROOT))),
+                        lookupTableName, cube);
+                olapContext.realization = cube;
+            } else {
+                cube = cubeMgr.findLatestSnapshot(realizationEntries, lookupTableName, cube);
+                olapContext.realization = cube;
+            }
+        } else if (olapContext.realization instanceof HybridInstance) {
+            final HybridInstance hybridInstance = (HybridInstance) olapContext.realization;
+            final IRealization latestRealization = hybridInstance.getLatestRealization();
+            if (latestRealization instanceof CubeInstance) {
+                cube = (CubeInstance) latestRealization;
+            } else {
+                throw new IllegalStateException();
+            }
+        }
+
+        String lookupTableName = olapContext.firstTableScan.getTableName();
+        DimensionDesc dim = cube.getDescriptor().findDimensionByTable(lookupTableName);
+        if (dim == null)
+            throw new IllegalStateException("No dimension with derived columns found for lookup table " + lookupTableName + ", cube desc " + cube.getDescriptor());
+
+        CubeManager cubeMgr = CubeManager.getInstance(cube.getConfig());
+        this.lookupTable = cubeMgr.getLookupTable(cube.getLatestReadySegment(), dim.getJoin());
+
+        OLAPTable olapTable = (OLAPTable) olapContext.firstTableScan.getOlapTable();
+        this.colDescs = olapTable.getSourceColumns();
+        this.current = new Object[colDescs.size()];
+
+        reset();
+    }
+
+    @Override
+    public boolean moveNext() {
+        boolean hasNext = iterator.hasNext();
+        if (hasNext) {
+            String[] row = iterator.next();
+            for (int i = 0, n = colDescs.size(); i < n; i++) {
+                ColumnDesc colDesc = colDescs.get(i);
+                int colIdx = colDesc.getZeroBasedIndex();
+                if (colIdx >= 0) {
+                    current[i] = Tuple.convertOptiqCellValue(row[colIdx], colDesc.getUpgradedType().getName());
+                } else {
+                    current[i] = null; // fake column
+                }
+            }
+        }
+        return hasNext;
+    }
+
+    @Override
+    public Object[] current() {
+        // NOTE if without the copy, sql_lookup/query03.sql will yields messy result. Very weird coz other lookup queries are all good.
+        return Arrays.copyOf(current, current.length);
+    }
+
+    @Override
+    public void reset() {
+        this.iterator = lookupTable.iterator();
+    }
+
+    @Override
+    public void close() {
+        try {
+            lookupTable.close();
+        } catch (IOException e) {
+            logger.error("error when close lookup table", e);
+        }
+    }
+
+}
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
index 6c0b5b1..c094ff5 100644
--- a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
@@ -64,6 +64,10 @@ public class OLAPQuery extends AbstractEnumerable<Object[]> implements Enumerabl
         case OLAP:
             return BackdoorToggles.getPrepareOnly() ? new EmptyEnumerator()
                     : new OLAPEnumerator(olapContext, optiqContext);
+        case LOOKUP_TABLE:
+            return BackdoorToggles.getPrepareOnly() ? new EmptyEnumerator() : new LookupTableEnumerator(olapContext);
+        case COL_DICT:
+            return BackdoorToggles.getPrepareOnly() ? new EmptyEnumerator() : new DictionaryEnumerator(olapContext);
         case HIVE:
             return BackdoorToggles.getPrepareOnly() ? new EmptyEnumerator() : new HiveEnumerator(olapContext);
         default:
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
index 6ab86a8..bfd6c4d 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
@@ -38,6 +38,7 @@ import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexProgram;
 import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.metadata.filter.FilterOptimizeTransformer;
 import org.apache.kylin.metadata.filter.LogicalTupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter;
@@ -54,7 +55,7 @@ public class OLAPFilterRel extends Filter implements OLAPRel {
 
     ColumnRowType columnRowType;
     OLAPContext context;
-    boolean autoJustTimezone = false;
+    boolean autoJustTimezone = KylinConfig.getInstanceFromEnv().getStreamingDerivedTimeTimezone().length() > 0;
 
     public OLAPFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
         super(cluster, traits, child, condition);
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java b/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java
index dd79a4f..ffff10b 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java
@@ -37,6 +37,7 @@ import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.calcite.util.NlsString;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.metadata.filter.CaseTupleFilter;
@@ -59,6 +60,7 @@ import java.util.GregorianCalendar;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TimeZone;
 
 public class TupleFilterVisitor extends RexVisitorImpl<TupleFilter> {
 
@@ -68,7 +70,8 @@ public class TupleFilterVisitor extends RexVisitorImpl<TupleFilter> {
 
     // is the fact table is a streamingv2 table
     private boolean autoJustByTimezone = false;
-    private static final long TIME_ZONE_OFFSET = 0;
+    private static final long TIME_ZONE_OFFSET = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getStreamingDerivedTimeTimezone())
+            .getRawOffset();
 
     public TupleFilterVisitor(ColumnRowType inputRowType) {
         super(true);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index a8285ae..00370e3 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -36,7 +36,7 @@ import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.cube.CubeInstance;
-//import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.cuboid.TreeCuboidScheduler;
@@ -308,20 +308,20 @@ public class CubeController extends BasicController {
      *
      * @throws IOException
      */
-//    @RequestMapping(value = "/{cubeName}/segs/{segmentName}/refresh_lookup", method = {
-//            RequestMethod.PUT }, produces = { "application/json" })
-//    @ResponseBody
-//    public CubeInstance rebuildLookupSnapshot(@PathVariable String cubeName, @PathVariable String segmentName,
-//            @RequestParam(value = "lookupTable") String lookupTable) {
-//        try {
-//            final CubeManager cubeMgr = cubeService.getCubeManager();
-//            final CubeInstance cube = cubeMgr.getCube(cubeName);
-//            return cubeService.rebuildLookupSnapshot(cube, segmentName, lookupTable);
-//        } catch (IOException e) {
-//            logger.error(e.getLocalizedMessage(), e);
-//            throw new InternalErrorException(e.getLocalizedMessage(), e);
-//        }
-//    }
+    @RequestMapping(value = "/{cubeName}/segs/{segmentName}/refresh_lookup", method = {
+            RequestMethod.PUT }, produces = { "application/json" })
+    @ResponseBody
+    public CubeInstance rebuildLookupSnapshot(@PathVariable String cubeName, @PathVariable String segmentName,
+            @RequestParam(value = "lookupTable") String lookupTable) {
+        try {
+            final CubeManager cubeMgr = cubeService.getCubeManager();
+            final CubeInstance cube = cubeMgr.getCube(cubeName);
+            return cubeService.rebuildLookupSnapshot(cube, segmentName, lookupTable);
+        } catch (IOException e) {
+            logger.error(e.getLocalizedMessage(), e);
+            throw new InternalErrorException(e.getLocalizedMessage(), e);
+        }
+    }
 
     /**
      * Delete a cube segment
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index dd45599..bd30109 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -64,7 +64,7 @@ import org.apache.kylin.metadata.model.IStorageAware;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
-//import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.project.RealizationEntry;
@@ -592,20 +592,20 @@ public class CubeService extends BasicService implements InitializingBean {
         return getCubeManager().updateCube(update);
     }
 
-//    public CubeInstance rebuildLookupSnapshot(CubeInstance cube, String segmentName, String lookupTable)
-//            throws IOException {
-//        aclEvaluate.checkProjectOperationPermission(cube);
-//        Message msg = MsgPicker.getMsg();
-//        TableDesc tableDesc = getTableManager().getTableDesc(lookupTable, cube.getProject());
-//        if (tableDesc.isView()) {
-//            throw new BadRequestException(
-//                    String.format(Locale.ROOT, msg.getREBUILD_SNAPSHOT_OF_VIEW(), tableDesc.getName()));
-//        }
-//        CubeSegment seg = cube.getSegment(segmentName, SegmentStatusEnum.READY);
-//        getCubeManager().buildSnapshotTable(seg, lookupTable, null);
-//
-//        return cube;
-//    }
+    public CubeInstance rebuildLookupSnapshot(CubeInstance cube, String segmentName, String lookupTable)
+            throws IOException {
+        aclEvaluate.checkProjectOperationPermission(cube);
+        Message msg = MsgPicker.getMsg();
+        TableDesc tableDesc = getTableManager().getTableDesc(lookupTable, cube.getProject());
+        if (tableDesc.isView()) {
+            throw new BadRequestException(
+                    String.format(Locale.ROOT, msg.getREBUILD_SNAPSHOT_OF_VIEW(), tableDesc.getName()));
+        }
+        CubeSegment seg = cube.getSegment(segmentName, SegmentStatusEnum.READY);
+        getCubeManager().buildSnapshotTable(seg, lookupTable, null);
+
+        return cube;
+    }
 
     public CubeInstance deleteSegmentById(CubeInstance cube, String uuid) throws IOException {
         aclEvaluate.checkProjectWritePermission(cube);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
index 966fd1f..6f282ff 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -47,6 +47,9 @@ import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.dict.lookup.SnapshotManager;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.engine.spark.source.CsvSource;
 import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.CsvColumnDesc;
@@ -60,6 +63,8 @@ import org.apache.kylin.rest.msg.MsgPicker;
 import org.apache.kylin.rest.response.TableDescResponse;
 import org.apache.kylin.rest.response.TableSnapshotResponse;
 import org.apache.kylin.rest.util.AclEvaluate;
+import org.apache.kylin.source.IReadableTable;
+import org.apache.kylin.source.IReadableTable.TableSignature;
 import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.ISourceMetadataExplorer;
 import org.apache.kylin.source.SourceManager;
@@ -357,32 +362,38 @@ public class TableService extends BasicService {
 //    }
 
     public List<TableSnapshotResponse> getLookupTableSnapshots(String project, String tableName) throws IOException {
-        return Lists.newArrayList();
+        TableDesc tableDesc = getTableManager().getTableDesc(tableName, project);
+        if (SourceManager.getSource(tableDesc).getClass() == CsvSource.class) {
+            return new ArrayList<>();
+        }
+        IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc, null);
+        TableSignature signature = hiveTable.getSignature();
+        return internalGetLookupTableSnapshots(tableName, signature);
+    }
+
+    List<TableSnapshotResponse> internalGetLookupTableSnapshots(String tableName, TableSignature signature)
+            throws IOException {
+        SnapshotManager snapshotManager = SnapshotManager.getInstance(getConfig());
+        List<SnapshotTable> metaStoreTableSnapshots = snapshotManager.getSnapshots(tableName, signature);
+
+        Map<String, List<String>> snapshotUsageMap = getSnapshotUsages();
+
+        List<TableSnapshotResponse> result = Lists.newArrayList();
+
+        for (SnapshotTable metaStoreTableSnapshot : metaStoreTableSnapshots) {
+            TableSnapshotResponse response = new TableSnapshotResponse();
+            response.setSnapshotID(metaStoreTableSnapshot.getId());
+            response.setSnapshotType(TableSnapshotResponse.TYPE_INNER);
+            response.setLastBuildTime(metaStoreTableSnapshot.getLastBuildTime());
+            response.setStorageType(SnapshotTable.STORAGE_TYPE_METASTORE);
+            response.setSourceTableSize(metaStoreTableSnapshot.getSignature().getSize());
+            response.setSourceTableLastModifyTime(metaStoreTableSnapshot.getSignature().getLastModifiedTime());
+            response.setCubesAndSegmentsUsage(snapshotUsageMap.get(metaStoreTableSnapshot.getResourcePath()));
+            result.add(response);
+        }
+
+        return result;
     }
-//
-//    List<TableSnapshotResponse> internalGetLookupTableSnapshots(String tableName, TableSignature signature)
-//            throws IOException {
-//        SnapshotManager snapshotManager = SnapshotManager.getInstance(getConfig());
-//        List<SnapshotTable> metaStoreTableSnapshots = snapshotManager.getSnapshots(tableName, signature);
-//
-//        Map<String, List<String>> snapshotUsageMap = getSnapshotUsages();
-//
-//        List<TableSnapshotResponse> result = Lists.newArrayList();
-//
-//        for (SnapshotTable metaStoreTableSnapshot : metaStoreTableSnapshots) {
-//            TableSnapshotResponse response = new TableSnapshotResponse();
-//            response.setSnapshotID(metaStoreTableSnapshot.getId());
-//            response.setSnapshotType(TableSnapshotResponse.TYPE_INNER);
-//            response.setLastBuildTime(metaStoreTableSnapshot.getLastBuildTime());
-//            response.setStorageType(SnapshotTable.STORAGE_TYPE_METASTORE);
-//            response.setSourceTableSize(metaStoreTableSnapshot.getSignature().getSize());
-//            response.setSourceTableLastModifyTime(metaStoreTableSnapshot.getSignature().getLastModifiedTime());
-//            response.setCubesAndSegmentsUsage(snapshotUsageMap.get(metaStoreTableSnapshot.getResourcePath()));
-//            result.add(response);
-//        }
-//
-//        return result;
-//    }
 
     /**
      * @return Map of SnapshotID, CubeName or SegmentName list
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index 6236ef9..8ff5719 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Base64;
+import java.util.Objects;
 import java.util.Set;
 import java.util.Locale;
 import java.util.Collections;
@@ -92,15 +93,15 @@ public class HiveInputBase {
 
             // create global dict
             KylinConfig dictConfig = (flatDesc.getSegment()).getConfig();
-//            String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumns();
-//            if (mrHiveDictColumns.length > 0) {
-//                String globalDictDatabase = dictConfig.getMrHiveDictDB();
-//                if (null == globalDictDatabase) {
-//                    throw new IllegalArgumentException("Mr-Hive Global dict database is null.");
-//                }
-//                String globalDictTable = cubeName + dictConfig.getMrHiveDictTableSuffix();
-//                addStepPhase1_DoCreateMrHiveGlobalDict(jobFlow, mrHiveDictColumns, globalDictDatabase, globalDictTable);
-//            }
+            String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumns();
+            if (mrHiveDictColumns.length > 0) {
+                String globalDictDatabase = dictConfig.getMrHiveDictDB();
+                if (null == globalDictDatabase) {
+                    throw new IllegalArgumentException("Mr-Hive Global dict database is null.");
+                }
+                String globalDictTable = cubeName + dictConfig.getMrHiveDictTableSuffix();
+                addStepPhase1_DoCreateMrHiveGlobalDict(jobFlow, mrHiveDictColumns, globalDictDatabase, globalDictTable);
+            }
 
             // then count and redistribute
             if (cubeConfig.isHiveRedistributeEnabled()) {
@@ -288,13 +289,13 @@ public class HiveInputBase {
             deleteTables.add(getIntermediateTableIdentity());
 
             // mr-hive dict and inner table do not need delete hdfs
-//            String[] mrHiveDicts = flatDesc.getSegment().getConfig().getMrHiveDictColumns();
-//            if (Objects.nonNull(mrHiveDicts) && mrHiveDicts.length > 0) {
-//                String dictDb = flatDesc.getSegment().getConfig().getMrHiveDictDB();
-//                String tableName = dictDb + "." + flatDesc.getTableName() + "_"
-//                        + MRHiveDictUtil.DictHiveType.GroupBy.getName();
-//                deleteTables.add(tableName);
-//            }
+            String[] mrHiveDicts = flatDesc.getSegment().getConfig().getMrHiveDictColumns();
+            if (Objects.nonNull(mrHiveDicts) && mrHiveDicts.length > 0) {
+                String dictDb = flatDesc.getSegment().getConfig().getMrHiveDictDB();
+                String tableName = dictDb + "." + flatDesc.getTableName() + "_"
+                        + MRHiveDictUtil.DictHiveType.GroupBy.getName();
+                deleteTables.add(tableName);
+            }
             step.setIntermediateTables(deleteTables);
 
             step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir)));