You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/07/05 12:59:11 UTC

[kylin] branch kylin-on-parquet-v2 updated: Remove unused property, class and maven module which is for Kylin 3 only

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
     new e9291e3  Remove unused property, class and maven module which is for Kylin 3 only
e9291e3 is described below

commit e9291e3b232cb628bbfa8dc9fc21fbab1d491a19
Author: XiaoxiangYu <xx...@apache.org>
AuthorDate: Mon Jul 5 20:28:37 2021 +0800

    Remove unused property, class and maven module which is for Kylin 3 only
---
 .../kylin/engine/mr/SortedColumnDFSFile.java       |    6 +-
 .../engine/mr/common/MapReduceExecutable.java      |    6 -
 .../kylin/engine/mr/common/MapReduceUtil.java      |  267 ++--
 .../org/apache/kylin/common/KylinConfigBase.java   |  803 ++++--------
 .../apache/kylin/common/annotation/ConfigTag.java  |    3 +
 .../org/apache/kylin/common/util/HadoopUtil.java   |    4 -
 .../org/apache/kylin/common/KylinConfigTest.java   |   20 +-
 core-cube/pom.xml                                  |    8 +-
 .../java/org/apache/kylin/cube/CubeManager.java    |  341 +++--
 .../java/org/apache/kylin/cube/CubeSegment.java    |   34 +-
 .../java/org/apache/kylin/cube}/ILookupTable.java  |    2 +-
 .../kylin/cube/cli/DictionaryGeneratorCLI.java     |  187 ---
 .../apache/kylin/cube/cli/DumpDictionaryCLI.java   |   62 -
 .../java/org/apache/kylin/cube/model/CubeDesc.java |   46 +-
 .../apache/kylin/cube/model/SnapshotTableDesc.java |   10 +-
 .../cube/model/validation/rule/DictionaryRule.java |    6 -
 .../org/apache/kylin/cube/util/CubingUtils.java    |  284 ++--
 .../model/validation/rule/DictionaryRuleTest.java  |   14 -
 .../org/apache/kylin/dict/DictionaryGenerator.java |  382 +++---
 .../apache/kylin/dict/lookup/SnapshotManager.java  |   12 +-
 .../java/org/apache/kylin/job/JoinedFlatTable.java |   22 +-
 .../kylin/dimension/DimensionEncodingFactory.java  |    1 +
 .../org/apache/kylin/storage/StorageFactory.java   |   13 -
 .../storage/gtrecord/CubeScanRangePlanner.java     |   15 +-
 .../kylin/storage/gtrecord/CubeSegmentScanner.java |    6 -
 .../kylin/storage/gtrecord/CubeTupleConverter.java |    4 +-
 .../storage/gtrecord/GTCubeStorageQueryBase.java   |    2 +-
 .../storage/translate/DerivedFilterTranslator.java |    2 +-
 .../kylin/storage/gtrecord/DictGridTableTest.java  | 1380 ++++++++++----------
 pom.xml                                            |  122 +-
 .../query/enumerator/LookupTableEnumerator.java    |  147 ---
 .../apache/kylin/query/enumerator/OLAPQuery.java   |    4 -
 .../apache/kylin/query/relnode/OLAPFilterRel.java  |    3 +-
 .../query/relnode/visitor/TupleFilterVisitor.java  |    5 +-
 .../kylin/rest/controller/CubeController.java      |   30 +-
 .../org/apache/kylin/rest/service/CubeService.java |   30 +-
 .../apache/kylin/rest/service/TableService.java    |   61 +-
 .../apache/kylin/source/hive/HiveInputBase.java    |   33 +-
 38 files changed, 1709 insertions(+), 2668 deletions(-)

diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java
index bcf4b98..507898d 100644
--- a/build-engine/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java
+++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/SortedColumnDFSFile.java
@@ -25,8 +25,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.dict.ByteComparator;
-import org.apache.kylin.dict.StringBytesConverter;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.source.IReadableTable;
 import org.slf4j.Logger;
@@ -92,9 +90,9 @@ public class SortedColumnDFSFile implements IReadableTable {
     }
 
     private Comparator<String> getComparatorByType(DataType type) {
-        Comparator<String> comparator;
+        Comparator<String> comparator = null;
         if (!type.isNumberFamily()) {
-            comparator = new ByteComparator<>(new StringBytesConverter());
+//            comparator = new ByteComparator<>(new StringBytesConverter());
         } else if (type.isIntegerFamily()) {
             comparator = new Comparator<String>() {
                 @Override
diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index 4978fa0..c6c80f4 100755
--- a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -500,12 +500,6 @@ public class MapReduceExecutable extends AbstractExecutable {
         for (Map.Entry<String, String> entry : configOverride.getMRConfigOverride().entrySet()) {
             conf.set(entry.getKey(), entry.getValue());
         }
-        if (conf.get("mapreduce.job.is-mem-hungry") != null
-                && Boolean.parseBoolean(conf.get("mapreduce.job.is-mem-hungry"))) {
-            for (Map.Entry<String, String> entry : configOverride.getMemHungryConfigOverride().entrySet()) {
-                conf.set(entry.getKey(), entry.getValue());
-            }
-        }
 
         if (StringUtils.isNotBlank(cubeName)) {
             remainingArgs.add("-" + BatchConstants.ARG_CUBE_NAME);
diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
index ecde4aa..4fa025b 100644
--- a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
+++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
@@ -18,147 +18,130 @@
 
 package org.apache.kylin.engine.mr.common;
 
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.job.exception.JobException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Sets;
-
 public class MapReduceUtil {
-
-    private static final Logger logger = LoggerFactory.getLogger(MapReduceUtil.class);
-
-    /**
-     * @return reducer number for calculating hll
-     */
-    public static int getCuboidHLLCounterReducerNum(CubeInstance cube) {
-        int nCuboids = cube.getCuboidScheduler().getAllCuboidIds().size();
-        int shardBase = (nCuboids - 1) / cube.getConfig().getHadoopJobPerReducerHLLCuboidNumber() + 1;
-
-        int hllMaxReducerNumber = cube.getConfig().getHadoopJobHLLMaxReducerNumber();
-        if (shardBase > hllMaxReducerNumber) {
-            shardBase = hllMaxReducerNumber;
-        }
-        return shardBase;
-    }
-
-    /**
-     * @param cuboidScheduler specified can provide more flexibility
-     * */
-    public static int getLayeredCubingReduceTaskNum(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler,
-            double totalMapInputMB, int level)
-            throws ClassNotFoundException, IOException, InterruptedException, JobException {
-        CubeDesc cubeDesc = cubeSegment.getCubeDesc();
-        KylinConfig kylinConfig = cubeDesc.getConfig();
-
-        double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
-        double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
-        logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level "
-                + level);
-
-        CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, cuboidScheduler, kylinConfig);
-
-        double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst;
-
-        if (level == -1) {
-            //merge case
-            double estimatedSize = cubeStatsReader.estimateCubeSize();
-            adjustedCurrentLayerSizeEst = estimatedSize > totalMapInputMB ? totalMapInputMB : estimatedSize;
-            logger.debug("estimated size {}, input size {}, adjustedCurrentLayerSizeEst: {}", estimatedSize,
-                    totalMapInputMB, adjustedCurrentLayerSizeEst);
-        } else if (level == 0) {
-            //base cuboid case TODO: the estimation could be very WRONG because it has no correction
-            adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0);
-            logger.debug("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst);
-        } else {
-            parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1);
-            currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level);
-            adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst;
-            logger.debug(
-                    "totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}",
-                    totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst);
-        }
-
-        // number of reduce tasks
-        int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / perReduceInputMB * reduceCountRatio + 0.99);
-
-        // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance
-        if (cubeDesc.hasMemoryHungryMeasures()) {
-            logger.debug("Multiply reducer num by 4 to boost performance for memory hungry measures");
-            numReduceTasks = numReduceTasks * 4;
-        }
-
-        // at least 1 reducer by default
-        numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
-        // no more than 500 reducer by default
-        numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
-
-        return numReduceTasks;
-    }
-
-    public static int getInmemCubingReduceTaskNum(CubeSegment cubeSeg, CuboidScheduler cuboidScheduler)
-            throws IOException {
-        KylinConfig kylinConfig = cubeSeg.getConfig();
-
-        Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, cuboidScheduler, kylinConfig).getCuboidSizeMap();
-        double totalSizeInM = 0;
-        for (Double cuboidSize : cubeSizeMap.values()) {
-            totalSizeInM += cuboidSize;
-        }
-        return getReduceTaskNum(totalSizeInM, kylinConfig);
-    }
-
-    // @return the first indicates the total reducer number, the second indicates the reducer number for base cuboid
-    public static Pair<Integer, Integer> getConvergeCuboidDataReduceTaskNums(CubeSegment cubeSeg) throws IOException {
-        long baseCuboidId = cubeSeg.getCuboidScheduler().getBaseCuboidId();
-
-        Set<Long> overlapCuboids = Sets.newHashSet(cubeSeg.getCuboidScheduler().getAllCuboidIds());
-        overlapCuboids.retainAll(cubeSeg.getCubeInstance().getCuboidsRecommend());
-        overlapCuboids.add(baseCuboidId);
-
-        Pair<Map<Long, Long>, Long> cuboidStats = CuboidStatsReaderUtil
-                .readCuboidStatsWithSourceFromSegment(overlapCuboids, cubeSeg);
-        Map<Long, Double> cubeSizeMap = CubeStatsReader.getCuboidSizeMapFromRowCount(cubeSeg, cuboidStats.getFirst(),
-                cuboidStats.getSecond());
-        double totalSizeInM = 0;
-        for (Double cuboidSize : cubeSizeMap.values()) {
-            totalSizeInM += cuboidSize;
-        }
-
-        double baseSizeInM = cubeSizeMap.get(baseCuboidId);
-
-        KylinConfig kylinConfig = cubeSeg.getConfig();
-        int nBase = getReduceTaskNum(baseSizeInM, kylinConfig);
-        int nOther = getReduceTaskNum(totalSizeInM - baseSizeInM, kylinConfig);
-        return new Pair<>(nBase + nOther, nBase);
-    }
-
-    private static int getReduceTaskNum(double totalSizeInM, KylinConfig kylinConfig) {
-        double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
-        double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
-
-        // number of reduce tasks
-        int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB * reduceCountRatio);
-
-        // at least 1 reducer by default
-        numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
-        // no more than 500 reducer by default
-        numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
-
-        logger.info("Having total map input MB " + Math.round(totalSizeInM));
-        logger.info("Having per reduce MB " + perReduceInputMB);
-        logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks);
-        return numReduceTasks;
-    }
+//
+//    private static final Logger logger = LoggerFactory.getLogger(MapReduceUtil.class);
+//
+//    /**
+//     * @return reducer number for calculating hll
+//     */
+//    public static int getCuboidHLLCounterReducerNum(CubeInstance cube) {
+//        int nCuboids = cube.getCuboidScheduler().getAllCuboidIds().size();
+//        int shardBase = (nCuboids - 1) / cube.getConfig().getHadoopJobPerReducerHLLCuboidNumber() + 1;
+//
+//        int hllMaxReducerNumber = cube.getConfig().getHadoopJobHLLMaxReducerNumber();
+//        if (shardBase > hllMaxReducerNumber) {
+//            shardBase = hllMaxReducerNumber;
+//        }
+//        return shardBase;
+//    }
+//
+//    /**
+//     * @param cuboidScheduler specified can provide more flexibility
+//     * */
+//    public static int getLayeredCubingReduceTaskNum(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler,
+//            double totalMapInputMB, int level)
+//            throws ClassNotFoundException, IOException, InterruptedException, JobException {
+//        CubeDesc cubeDesc = cubeSegment.getCubeDesc();
+//        KylinConfig kylinConfig = cubeDesc.getConfig();
+//
+//        double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
+//        double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
+//        logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level "
+//                + level);
+//
+//        CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, cuboidScheduler, kylinConfig);
+//
+//        double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst;
+//
+//        if (level == -1) {
+//            //merge case
+//            double estimatedSize = cubeStatsReader.estimateCubeSize();
+//            adjustedCurrentLayerSizeEst = estimatedSize > totalMapInputMB ? totalMapInputMB : estimatedSize;
+//            logger.debug("estimated size {}, input size {}, adjustedCurrentLayerSizeEst: {}", estimatedSize,
+//                    totalMapInputMB, adjustedCurrentLayerSizeEst);
+//        } else if (level == 0) {
+//            //base cuboid case TODO: the estimation could be very WRONG because it has no correction
+//            adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0);
+//            logger.debug("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst);
+//        } else {
+//            parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1);
+//            currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level);
+//            adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst;
+//            logger.debug(
+//                    "totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}",
+//                    totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst);
+//        }
+//
+//        // number of reduce tasks
+//        int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / perReduceInputMB * reduceCountRatio + 0.99);
+//
+//        // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance
+//        if (cubeDesc.hasMemoryHungryMeasures()) {
+//            logger.debug("Multiply reducer num by 4 to boost performance for memory hungry measures");
+//            numReduceTasks = numReduceTasks * 4;
+//        }
+//
+//        // at least 1 reducer by default
+//        numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
+//        // no more than 500 reducer by default
+//        numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
+//
+//        return numReduceTasks;
+//    }
+//
+//    public static int getInmemCubingReduceTaskNum(CubeSegment cubeSeg, CuboidScheduler cuboidScheduler)
+//            throws IOException {
+//        KylinConfig kylinConfig = cubeSeg.getConfig();
+//
+//        Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, cuboidScheduler, kylinConfig).getCuboidSizeMap();
+//        double totalSizeInM = 0;
+//        for (Double cuboidSize : cubeSizeMap.values()) {
+//            totalSizeInM += cuboidSize;
+//        }
+//        return getReduceTaskNum(totalSizeInM, kylinConfig);
+//    }
+//
+//    // @return the first indicates the total reducer number, the second indicates the reducer number for base cuboid
+//    public static Pair<Integer, Integer> getConvergeCuboidDataReduceTaskNums(CubeSegment cubeSeg) throws IOException {
+//        long baseCuboidId = cubeSeg.getCuboidScheduler().getBaseCuboidId();
+//
+//        Set<Long> overlapCuboids = Sets.newHashSet(cubeSeg.getCuboidScheduler().getAllCuboidIds());
+//        overlapCuboids.retainAll(cubeSeg.getCubeInstance().getCuboidsRecommend());
+//        overlapCuboids.add(baseCuboidId);
+//
+//        Pair<Map<Long, Long>, Long> cuboidStats = CuboidStatsReaderUtil
+//                .readCuboidStatsWithSourceFromSegment(overlapCuboids, cubeSeg);
+//        Map<Long, Double> cubeSizeMap = CubeStatsReader.getCuboidSizeMapFromRowCount(cubeSeg, cuboidStats.getFirst(),
+//                cuboidStats.getSecond());
+//        double totalSizeInM = 0;
+//        for (Double cuboidSize : cubeSizeMap.values()) {
+//            totalSizeInM += cuboidSize;
+//        }
+//
+//        double baseSizeInM = cubeSizeMap.get(baseCuboidId);
+//
+//        KylinConfig kylinConfig = cubeSeg.getConfig();
+//        int nBase = getReduceTaskNum(baseSizeInM, kylinConfig);
+//        int nOther = getReduceTaskNum(totalSizeInM - baseSizeInM, kylinConfig);
+//        return new Pair<>(nBase + nOther, nBase);
+//    }
+//
+//    private static int getReduceTaskNum(double totalSizeInM, KylinConfig kylinConfig) {
+//        double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
+//        double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
+//
+//        // number of reduce tasks
+//        int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB * reduceCountRatio);
+//
+//        // at least 1 reducer by default
+//        numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
+//        // no more than 500 reducer by default
+//        numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
+//
+//        logger.info("Having total map input MB " + Math.round(totalSizeInM));
+//        logger.info("Having per reduce MB " + perReduceInputMB);
+//        logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks);
+//        return numReduceTasks;
+//    }
 }
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 39f8eae..fc76506 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -292,6 +292,14 @@ public abstract class KylinConfigBase implements Serializable {
                 || "LOCAL".equals(getOptional("kylin.env", "DEV"));
     }
 
+    public boolean isUTEnv() {
+        return "UT".equals(getDeployEnv());
+    }
+
+    public boolean isLocalEnv() {
+        return "LOCAL".equals(getDeployEnv());
+    }
+
     public String getDeployEnv() {
         return getOptional("kylin.env", "DEV");
     }
@@ -343,7 +351,7 @@ public abstract class KylinConfigBase implements Serializable {
         String root = getOptional("kylin.env.hdfs-metastore-bigcell-dir");
 
         if (root == null) {
-            return getJdbcHdfsWorkingDirectory();
+            return getHdfsWorkingDirectory();
         }
 
         Path path = new Path(root);
@@ -352,12 +360,8 @@ public abstract class KylinConfigBase implements Serializable {
                     "kylin.env.hdfs-metastore-bigcell-dir must be absolute, but got " + root);
 
         // make sure path is qualified
-        try {
-            FileSystem fs = HadoopUtil.getReadFileSystem();
-            path = fs.makeQualified(path);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
+        FileSystem fs = HadoopUtil.getWorkingFileSystem();
+        path = fs.makeQualified(path);
 
         root = new Path(path, StringUtils.replaceChars(getMetadataUrlPrefix(), ':', '-')).toString();
 
@@ -374,28 +378,6 @@ public abstract class KylinConfigBase implements Serializable {
         return cachedBigCellDirectory;
     }
 
-    public String getReadHdfsWorkingDirectory() {
-        if (StringUtils.isNotEmpty(getHBaseClusterFs())) {
-            Path workingDir = new Path(getHdfsWorkingDirectory());
-            return new Path(getHBaseClusterFs(), Path.getPathWithoutSchemeAndAuthority(workingDir)).toString() + "/";
-        }
-
-        return getHdfsWorkingDirectory();
-    }
-
-    private String getJdbcHdfsWorkingDirectory() {
-        if (StringUtils.isNotEmpty(getJdbcFileSystem())) {
-            Path workingDir = new Path(getReadHdfsWorkingDirectory());
-            return new Path(getJdbcFileSystem(), Path.getPathWithoutSchemeAndAuthority(workingDir)).toString() + "/";
-        }
-
-        return getReadHdfsWorkingDirectory();
-    }
-
-    private String getJdbcFileSystem() {
-        return getOptional("kylin.storage.columnar.jdbc.file-system", "");
-    }
-
     public String getHdfsWorkingDirectory(String project) {
         if (isProjectIsolationEnabled() && project != null) {
             return new Path(getHdfsWorkingDirectory(), project).toString() + "/";
@@ -479,7 +461,7 @@ public abstract class KylinConfigBase implements Serializable {
         Map<String, String> r = Maps.newLinkedHashMap();
         // ref constants in ISourceAware
         r.put("", "org.apache.kylin.common.persistence.FileResourceStore");
-        r.put("hbase", "org.apache.kylin.storage.hbase.HBaseResourceStore");
+//        r.put("hbase", "org.apache.kylin.storage.hbase.HBaseResourceStore");
         r.put("hdfs", "org.apache.kylin.common.persistence.HDFSResourceStore");
         r.put("ifile", "org.apache.kylin.common.persistence.IdentifierFileResourceStore");
         r.put("jdbc", "org.apache.kylin.common.persistence.JDBCResourceStore");
@@ -534,6 +516,7 @@ public abstract class KylinConfigBase implements Serializable {
         return (DistributedLockFactory) ClassUtil.newInstance(clsName);
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHBaseMappingAdapter() {
         return getOptional("kylin.metadata.hbasemapping-adapter");
     }
@@ -542,14 +525,17 @@ public abstract class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.metadata.check-copy-on-write", FALSE));
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHbaseClientScannerTimeoutPeriod() {
         return getOptional("kylin.metadata.hbase-client-scanner-timeout-period", "10000");
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHbaseRpcTimeout() {
         return getOptional("kylin.metadata.hbase-rpc-timeout", "5000");
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHbaseClientRetriesNumber() {
         return getOptional("kylin.metadata.hbase-client-retries-number", "1");
     }
@@ -562,120 +548,32 @@ public abstract class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.metadata.able-change-string-to-datetime", "false"));
     }
 
-    // ============================================================================
-    // DICTIONARY & SNAPSHOT
-    // ============================================================================
-
-    public boolean isUseForestTrieDictionary() {
-        return Boolean.parseBoolean(getOptional("kylin.dictionary.use-forest-trie", TRUE));
-    }
-
-    public long getTrieDictionaryForestMaxTrieSizeMB() {
-        return Integer.parseInt(getOptional("kylin.dictionary.forest-trie-max-mb", "500"));
-    }
-
-    public long getCachedDictMaxEntrySize() {
-        return Long.parseLong(getOptional("kylin.dictionary.max-cache-entry", "3000"));
-    }
-
-    public int getCachedDictMaxSize() {
-        return Integer.parseInt(getOptional("kylin.dictionary.max-cache-size", "-1"));
-    }
-
-    public boolean isGrowingDictEnabled() {
-        return Boolean.parseBoolean(this.getOptional("kylin.dictionary.growing-enabled", FALSE));
-    }
-
-    public boolean isDictResuable() {
-        return Boolean.parseBoolean(this.getOptional("kylin.dictionary.resuable", FALSE));
-    }
-
-    public long getCachedDictionaryMaxEntrySize() {
-        return Long.parseLong(getOptional("kylin.dictionary.cached-dict-max-cache-entry", "50000"));
-    }
-
-    public int getAppendDictEntrySize() {
-        return Integer.parseInt(getOptional("kylin.dictionary.append-entry-size", "10000000"));
-    }
-
-    public int getAppendDictMaxVersions() {
-        return Integer.parseInt(getOptional("kylin.dictionary.append-max-versions", "3"));
-    }
-
-    public int getAppendDictVersionTTL() {
-        return Integer.parseInt(getOptional("kylin.dictionary.append-version-ttl", "259200000"));
-    }
-
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
-    public int getCachedSnapshotMaxEntrySize() {
-        return Integer.parseInt(getOptional("kylin.snapshot.max-cache-entry", "500"));
-    }
-
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
-    public int getTableSnapshotMaxMB() {
-        return Integer.parseInt(getOptional("kylin.snapshot.max-mb", "300"));
-    }
-
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
-    public int getExtTableSnapshotShardingMB() {
-        return Integer.parseInt(getOptional("kylin.snapshot.ext.shard-mb", "500"));
-    }
-
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
-    public String getExtTableSnapshotLocalCachePath() {
-        return getOptional("kylin.snapshot.ext.local.cache.path", "lookup_cache");
-    }
-
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
-    public double getExtTableSnapshotLocalCacheMaxSizeGB() {
-        return Double.parseDouble(getOptional("kylin.snapshot.ext.local.cache.max-size-gb", "200"));
-    }
-
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
-    public long getExtTableSnapshotLocalCacheCheckVolatileRange() {
-        return Long.parseLong(getOptional("kylin.snapshot.ext.local.cache.check.volatile", "3600000"));
+    public String getMetadataDialect() {
+        return getOptional("kylin.metadata.jdbc.dialect", "mysql");
     }
 
-    @ConfigTag({ConfigTag.Tag.DEPRECATED, ConfigTag.Tag.CUBE_LEVEL})
-    public boolean isShrunkenDictFromGlobalEnabled() {
-        return Boolean.parseBoolean(this.getOptional("kylin.dictionary.shrunken-from-global-enabled", TRUE));
+    public boolean isJsonAlwaysSmallCell() {
+        return Boolean.parseBoolean(getOptional("kylin.metadata.jdbc.json-always-small-cell", TRUE));
     }
 
-
-    // ============================================================================
-    // Hive Global Dictionary
-    //
-    // ============================================================================
-
-    /**
-     * @return if mr-hive dict not enabled, return empty array
-     * else return array contains "{TABLE_NAME}_{COLUMN_NAME}"
-     */
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
-    public String[] getMrHiveDictColumns() {
-        String columnStr = getOptional("kylin.dictionary.mr-hive.columns", "");
-        if (!columnStr.equals("")) {
-            return columnStr.split(",");
-        }
-        return new String[0];
+    public int getSmallCellMetadataWarningThreshold() {
+        return Integer.parseInt(
+                getOptional("kylin.metadata.jdbc.small-cell-meta-size-warning-threshold", String.valueOf(100 << 20))); //100mb
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
-    public String getMrHiveDictDB() {
-        return getOptional("kylin.dictionary.mr-hive.database", getHiveDatabaseForIntermediateTable());
+    public int getSmallCellMetadataErrorThreshold() {
+        return Integer.parseInt(
+                getOptional("kylin.metadata.jdbc.small-cell-meta-size-error-threshold", String.valueOf(1 << 30))); // 1gb
     }
 
-    @ConfigTag(ConfigTag.Tag.DEPRECATED)
-    public String getMrHiveDictTableSuffix() {
-        return getOptional("kylin.dictionary.mr-hive.table.suffix", "_global_dict");
+    public int getJdbcResourceStoreMaxCellSize() {
+        return Integer.parseInt(getOptional("kylin.metadata.jdbc.max-cell-size", "1048576")); // 1mb
     }
 
     // ============================================================================
-    // Distributed/Spark Global dictionary
-    // Add wiki link here
+    // DICTIONARY & SNAPSHOT
     // ============================================================================
 
-
     public int getGlobalDictV2MinHashPartitions() {
         return Integer.parseInt(getOptional("kylin.dictionary.globalV2-min-hash-partitions", "10"));
     }
@@ -708,6 +606,42 @@ public abstract class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.dictionary.globalV2-check", "true"));
     }
 
+    /*
+     * Detect dataset skew in dictionary encode step.
+     * */
+    public boolean detectDataSkewInDictEncodingEnabled() {
+        return Boolean.valueOf(getOptional("kylin.dictionary.detect.data.skew.in.encoding", "false"));
+    }
+
+    /*
+     * In some data skew cases, the repartition step during dictionary encoding will be slow.
+     * We can choose to sample from the dataset to detect skewed. This configuration is used to set the sample rate.
+     * */
+    public double sampleRateInEncodingSkewDetection() {
+        return Double.valueOf(getOptional("kylin.dictionary.detect.data.skew.sample.rate", "0.1"));
+    }
+
+    /*
+     * In KYLIN4, dictionaries are hashed into several buckets, column data are repartitioned by the same hash algorithm
+     * during encoding step too. In data skew cases, the repartition step will be very slow. Kylin will automatically
+     * sample from the source to detect skewed data and repartition these skewed data to random partitions.
+     * This configuration is used to set the skew data threshhold, valued from 0 to 1.
+     * e.g.
+     *   if you set this value to 0.05, for each value that takes up more than 5% percent of the total will be regarded
+     *   as skew data, as a result the skewed data will be no more than 20 records
+     * */
+    public double skewPercentageThreshHold() {
+        return Double.valueOf(getOptional("kylin.dictionary.data.skew.percentage.threshhold", "0.05"));
+    }
+
+    public boolean isSnapshotParallelBuildEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.snapshot.parallel-build-enabled", "true"));
+    }
+
+    public int snapshotParallelBuildTimeoutSeconds() {
+        return Integer.parseInt(getOptional("kylin.snapshot.parallel-build-timeout-seconds", "3600"));
+    }
+
     // ============================================================================
     // CUBE
     // ============================================================================
@@ -1039,14 +973,6 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.job.error-record-threshold", "0"));
     }
 
-    public boolean isAdvancedFlatTableUsed() {
-        return Boolean.parseBoolean(getOptional("kylin.job.use-advanced-flat-table", FALSE));
-    }
-
-    public String getAdvancedFlatTableClass() {
-        return getOptional("kylin.job.advanced-flat-table.class");
-    }
-
     public String getJobTrackingURLPattern() {
         return getOptional("kylin.job.tracking-url-pattern", "");
     }
@@ -1106,7 +1032,7 @@ public abstract class KylinConfigBase implements Serializable {
     /**
      * was for route to hive, not used any more
      */
-    @Deprecated
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHiveUrl() {
         return getOptional("kylin.source.hive.connection-url", "");
     }
@@ -1114,7 +1040,7 @@ public abstract class KylinConfigBase implements Serializable {
     /**
      * was for route to hive, not used any more
      */
-    @Deprecated
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHiveUser() {
         return getOptional("kylin.source.hive.connection-user", "");
     }
@@ -1122,71 +1048,87 @@ public abstract class KylinConfigBase implements Serializable {
     /**
      * was for route to hive, not used any more
      */
-    @Deprecated
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHivePassword() {
         return getOptional("kylin.source.hive.connection-password", "");
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public Map<String, String> getHiveConfigOverride() {
         return getPropertiesByPrefix("kylin.source.hive.config-override.");
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getOverrideHiveTableLocation(String table) {
         return getOptional("kylin.source.hive.table-location." + table.toUpperCase(Locale.ROOT));
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isHiveKeepFlatTable() {
         return Boolean.parseBoolean(this.getOptional("kylin.source.hive.keep-flat-table", FALSE));
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHiveDatabaseForIntermediateTable() {
         return CliCommandExecutor.checkHiveProperty(this.getOptional("kylin.source.hive.database-for-flat-table", DEFAULT));
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getFlatTableStorageFormat() {
         return this.getOptional("kylin.source.hive.flat-table-storage-format", "SEQUENCEFILE");
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getFlatTableFieldDelimiter() {
         return this.getOptional("kylin.source.hive.flat-table-field-delimiter", "\u001F");
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isHiveRedistributeEnabled() {
         return Boolean.parseBoolean(this.getOptional("kylin.source.hive.redistribute-flat-table", TRUE));
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHiveClientMode() {
         return getOptional("kylin.source.hive.client", "cli");
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHiveBeelineShell() {
         return getOptional("kylin.source.hive.beeline-shell", "beeline");
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHiveBeelineParams() {
         return getOptional("kylin.source.hive.beeline-params", "");
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean getEnableSparkSqlForTableOps() {
         return Boolean.parseBoolean(getOptional("kylin.source.hive.enable-sparksql-for-table-ops", FALSE));
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getSparkSqlBeelineShell() {
         return getOptional("kylin.source.hive.sparksql-beeline-shell", "");
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getSparkSqlBeelineParams() {
         return getOptional("kylin.source.hive.sparksql-beeline-params", "");
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean getHiveTableDirCreateFirst() {
         return Boolean.parseBoolean(getOptional("kylin.source.hive.table-dir-create-first", FALSE));
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getFlatHiveTableClusterByDictColumn() {
         return getOptional("kylin.source.hive.flat-table-cluster-by-dict-column");
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getHiveRedistributeColumnCount() {
         return Integer.parseInt(getOptional("kylin.source.hive.redistribute-column-count", "3"));
     }
@@ -1243,7 +1185,7 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
     // ============================================================================
-    // SOURCE.KAFKA
+    // SOURCE.KAFKA(Removed)
     // ============================================================================
 
     @ConfigTag(ConfigTag.Tag.NOT_IMPLEMENTED)
@@ -1252,7 +1194,7 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
     // ============================================================================
-    // SOURCE.JDBC
+    // SOURCE.JDBC(Removed)
     // ============================================================================
 
     @ConfigTag(ConfigTag.Tag.NOT_IMPLEMENTED)
@@ -1311,7 +1253,7 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
     // ============================================================================
-    // STORAGE.HBASE
+    // STORAGE.HBASE(Removed)
     // ============================================================================
 
     public Map<Integer, String> getStorageEngines() {
@@ -1521,7 +1463,7 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
     // ============================================================================
-    // ENGINE.MR
+    // ENGINE.MR(Removed)
     // ============================================================================
 
     public Map<Integer, String> getJobEngines() {
@@ -1540,8 +1482,6 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.engine.default", "6"));
     }
 
-
-
     public String getKylinJobJarPath() {
         final String jobJar = getOptional(KYLIN_ENGINE_MR_JOB_JAR);
         if (StringUtils.isNotEmpty(jobJar)) {
@@ -1559,76 +1499,63 @@ public abstract class KylinConfigBase implements Serializable {
         System.setProperty(KYLIN_ENGINE_MR_JOB_JAR, path);
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getKylinJobMRLibDir() {
         return getOptional("kylin.engine.mr.lib-dir", "");
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public Map<String, String> getMRConfigOverride() {
         return getPropertiesByPrefix("kylin.engine.mr.config-override.");
     }
 
-    // used for some mem-hungry step
-    public Map<String, String> getMemHungryConfigOverride() {
-        return getPropertiesByPrefix("kylin.engine.mr.mem-hungry-config-override.");
-    }
-
-    public Map<String, String> getUHCMRConfigOverride() {
-        return getPropertiesByPrefix("kylin.engine.mr.uhc-config-override.");
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    public int getHadoopJobMinReducerNumber() {
+        return Integer.parseInt(getOptional("kylin.engine.mr.min-reducer-number", "1"));
     }
 
-    public Map<String, String> getBaseCuboidMRConfigOverride() {
-        return getPropertiesByPrefix("kylin.engine.mr.base-cuboid-config-override.");
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    public int getHadoopJobMaxReducerNumber() {
+        return Integer.parseInt(getOptional("kylin.engine.mr.max-reducer-number", "500"));
     }
 
-    public Map<String, String> getSparkConfigOverride() {
-        return getPropertiesByPrefix("kylin.engine.spark-conf.");
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    public int getHadoopJobMapperInputRows() {
+        return Integer.parseInt(getOptional("kylin.engine.mr.mapper-input-rows", "1000000"));
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public Map<String, String> getFlinkConfigOverride() {
         return getPropertiesByPrefix("kylin.engine.flink-conf.");
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public Map<String, String> getSparkConfigOverrideWithSpecificName(String configName) {
         return getPropertiesByPrefix("kylin.engine.spark-conf-" + configName + ".");
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public Map<String, String> getFlinkConfigOverrideWithSpecificName(String configName) {
         return getPropertiesByPrefix("kylin.engine.flink-conf-" + configName + ".");
     }
 
-    public double getDefaultHadoopJobReducerInputMB() {
-        return Double.parseDouble(getOptional("kylin.engine.mr.reduce-input-mb", "500"));
-    }
-
-    public double getDefaultHadoopJobReducerCountRatio() {
-        return Double.parseDouble(getOptional("kylin.engine.mr.reduce-count-ratio", "1.0"));
-    }
-
-    public int getHadoopJobMinReducerNumber() {
-        return Integer.parseInt(getOptional("kylin.engine.mr.min-reducer-number", "1"));
-    }
-
-    public int getHadoopJobMaxReducerNumber() {
-        return Integer.parseInt(getOptional("kylin.engine.mr.max-reducer-number", "500"));
-    }
-
-    public int getHadoopJobMapperInputRows() {
-        return Integer.parseInt(getOptional("kylin.engine.mr.mapper-input-rows", "1000000"));
-    }
-
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getCuboidStatsCalculatorMaxNumber() {
         // set 1 to disable multi-thread statistics calculation
         return Integer.parseInt(getOptional("kylin.engine.mr.max-cuboid-stats-calculator-number", "1"));
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getCuboidNumberPerStatsCalculator() {
         return Integer.parseInt(getOptional("kylin.engine.mr.cuboid-number-per-stats-calculator", "100"));
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getHadoopJobPerReducerHLLCuboidNumber() {
         return Integer.parseInt(getOptional("kylin.engine.mr.per-reducer-hll-cuboid-number", "100"));
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getHadoopJobHLLMaxReducerNumber() {
         // by default multi-reducer hll calculation is disabled
         return Integer.parseInt(getOptional("kylin.engine.mr.hll-max-reducer-number", "1"));
@@ -1639,22 +1566,27 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.engine.mr.uhc-reducer-count", "3"));
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isBuildUHCDictWithMREnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.mr.build-uhc-dict-in-additional-step", FALSE));
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isBuildDictInReducerEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.mr.build-dict-in-reducer", TRUE));
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getYarnStatusCheckUrl() {
         return getOptional("kylin.engine.mr.yarn-check-status-url", null);
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getYarnStatusCheckIntervalSeconds() {
         return Integer.parseInt(getOptional("kylin.engine.mr.yarn-check-interval-seconds", "10"));
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isUseLocalClasspathEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.mr.use-local-classpath", TRUE));
     }
@@ -1663,122 +1595,114 @@ public abstract class KylinConfigBase implements Serializable {
      * different version hive use different UNION style
      * https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Union
      */
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getHiveUnionStyle() {
         return getOptional("kylin.hive.union.style", "UNION");
     }
 
     // ============================================================================
-    // ENGINE.SPARK
+    // ENGINE.SPARK (DEPRECATED)
     // ============================================================================
 
     public String getHadoopConfDir() {
         return getOptional("kylin.env.hadoop-conf-dir", "");
     }
 
-    /**
-     * Get the sparder app name, default value is: 'sparder_on_localhost-7070'
-     */
-    public String getSparderAppName() {
-        String customSparderAppName = getOptional("kylin.query.sparder-context.app-name", "");
-        if (StringUtils.isEmpty(customSparderAppName)) {
-            customSparderAppName =
-                    "sparder_on_" + getServerRestAddress().replaceAll(":", "-");
-        }
-        return customSparderAppName;
-    }
-
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getSparkAdditionalJars() {
         return getOptional("kylin.engine.spark.additional-jars", "");
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getFlinkAdditionalJars() {
         return getOptional("kylin.engine.flink.additional-jars", "");
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public float getSparkRDDPartitionCutMB() {
         return Float.parseFloat(getOptional("kylin.engine.spark.rdd-partition-cut-mb", "10.0"));
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public float getFlinkPartitionCutMB() {
         return Float.parseFloat(getOptional("kylin.engine.flink.partition-cut-mb", "10.0"));
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getSparkMinPartition() {
         return Integer.parseInt(getOptional("kylin.engine.spark.min-partition", "1"));
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getFlinkMinPartition() {
         return Integer.parseInt(getOptional("kylin.engine.flink.min-partition", "1"));
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getSparkMaxPartition() {
         return Integer.parseInt(getOptional("kylin.engine.spark.max-partition", "5000"));
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getFlinkMaxPartition() {
         return Integer.parseInt(getOptional("kylin.engine.flink.max-partition", "5000"));
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getSparkStorageLevel() {
         return getOptional("kylin.engine.spark.storage-level", "MEMORY_AND_DISK_SER");
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isSparkSanityCheckEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.spark.sanity-check-enabled", FALSE));
     }
 
-    public boolean isSparkFactDistinctEnable() {
-        return Boolean.parseBoolean(getOptional("kylin.engine.spark-fact-distinct", "false"));
-    }
-
-    public boolean isSparkUHCDictionaryEnable() {
-        return Boolean.parseBoolean(getOptional("kylin.engine.spark-udc-dictionary", "false"));
-    }
-
-    public boolean isSparkCardinalityEnabled() {
-        return Boolean.parseBoolean(getOptional("kylin.engine.spark-cardinality", "false"));
-    }
-
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getSparkOutputMaxSize() {
         return Integer.valueOf(getOptional("kylin.engine.spark.output.max-size", "10485760"));
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isSparkDimensionDictionaryEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.spark-dimension-dictionary", "false"));
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isFlinkSanityCheckEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.flink.sanity-check-enabled", FALSE));
     }
 
-    public boolean isSparCreateHiveTableViaSparkEnable() {
-        return Boolean.parseBoolean(getOptional("kylin.engine.spark-create-table-enabled", "false"));
-    }
-
     // ============================================================================
     // ENGINE.LIVY
     // ============================================================================
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isLivyEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.livy-conf.livy-enabled", FALSE));
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getLivyRestApiBacktick() {
         return getOptional("kylin.engine.livy.backtick.quote", "");
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getLivyUrl() {
         return getOptional("kylin.engine.livy-conf.livy-url");
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public Map<String, String> getLivyKey() {
         return getPropertiesByPrefix("kylin.engine.livy-conf.livy-key.");
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public Map<String, String> getLivyArr() {
         return getPropertiesByPrefix("kylin.engine.livy-conf.livy-arr.");
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public Map<String, String> getLivyMap() {
         return getPropertiesByPrefix("kylin.engine.livy-conf.livy-map.");
     }
@@ -1839,6 +1763,7 @@ public abstract class KylinConfigBase implements Serializable {
 
     // check KYLIN-3358, need deploy coprocessor if enabled
     // finally should be deprecated
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public boolean isDynamicColumnEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.query.enable-dynamic-column", FALSE));
     }
@@ -1893,11 +1818,13 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.query.project-concurrent-running-threshold", "0"));
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public long getQueryMaxScanBytes() {
         long value = Long.parseLong(getOptional("kylin.query.max-scan-bytes", "0"));
         return value > 0 ? value : Long.MAX_VALUE;
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public long getQueryMaxReturnRows() {
         return Integer.parseInt(this.getOptional("kylin.query.max-return-rows", "5000000"));
     }
@@ -2015,6 +1942,7 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.query.max-dimension-count-distinct", "5000000"));
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public Map<String, String> getUDFs() {
         Map<String, String> udfMap = Maps.newLinkedHashMap();
         udfMap.put("version", "org.apache.kylin.query.udf.VersionUDF");
@@ -2047,10 +1975,12 @@ public abstract class KylinConfigBase implements Serializable {
         return this.getOptional("kylin.query.schema-factory", "org.apache.kylin.query.schema.OLAPSchemaFactory");
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getPushDownRunnerClassName() {
         return getOptional("kylin.query.pushdown.runner-class-name", "");
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public List<String> getPushDownRunnerIds() {
         List<String> ids = Lists.newArrayList();
         String idsStr = getOptional("kylin.query.pushdown.runner.ids", "");
@@ -2062,6 +1992,7 @@ public abstract class KylinConfigBase implements Serializable {
         return ids;
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String[] getPushDownConverterClassNames() {
         return getOptionalStringArray("kylin.query.pushdown.converter-class-names",
                 new String[]{"org.apache.kylin.source.adhocquery.HivePushDownConverter"});
@@ -2071,6 +2002,7 @@ public abstract class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(this.getOptional("kylin.query.pushdown.cache-enabled", FALSE));
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getJdbcUrl(String id) {
         if (null == id) {
             return getOptional("kylin.query.pushdown.jdbc.url", "");
@@ -2079,6 +2011,7 @@ public abstract class KylinConfigBase implements Serializable {
         }
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getJdbcDriverClass(String id) {
         if (null == id) {
             return getOptional("kylin.query.pushdown.jdbc.driver", "");
@@ -2087,6 +2020,7 @@ public abstract class KylinConfigBase implements Serializable {
         }
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getJdbcUsername(String id) {
         if (null == id) {
             return getOptional("kylin.query.pushdown.jdbc.username", "");
@@ -2095,6 +2029,7 @@ public abstract class KylinConfigBase implements Serializable {
         }
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public String getJdbcPassword(String id) {
         if (null == id) {
             return getOptional("kylin.query.pushdown.jdbc.password", "");
@@ -2103,6 +2038,7 @@ public abstract class KylinConfigBase implements Serializable {
         }
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getPoolMaxTotal(String id) {
         if (null == id) {
             return Integer.parseInt(
@@ -2115,6 +2051,7 @@ public abstract class KylinConfigBase implements Serializable {
         }
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getPoolMaxIdle(String id) {
         if (null == id) {
             return Integer.parseInt(
@@ -2127,6 +2064,7 @@ public abstract class KylinConfigBase implements Serializable {
         }
     }
 
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
     public int getPoolMinIdle(String id) {
         if (null == id) {
             return Integer.parseInt(
@@ -2440,217 +2378,19 @@ public abstract class KylinConfigBase implements Serializable {
         return getOptional("kylin.tool.auto-migrate-cube.dest-config", "");
     }
 
-    // ============================================================================
-    // jdbc metadata resource store
-    // ============================================================================
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    public String getJdbcSourceAdaptor() {
+        return getOptional("kylin.source.jdbc.adaptor");
+    }
 
-    public String getMetadataDialect() {
-        return getOptional("kylin.metadata.jdbc.dialect", "mysql");
-    }
-
-    public boolean isJsonAlwaysSmallCell() {
-        return Boolean.parseBoolean(getOptional("kylin.metadata.jdbc.json-always-small-cell", TRUE));
-    }
-
-    public int getSmallCellMetadataWarningThreshold() {
-        return Integer.parseInt(
-                getOptional("kylin.metadata.jdbc.small-cell-meta-size-warning-threshold", String.valueOf(100 << 20))); //100mb
-    }
-
-    public int getSmallCellMetadataErrorThreshold() {
-        return Integer.parseInt(
-                getOptional("kylin.metadata.jdbc.small-cell-meta-size-error-threshold", String.valueOf(1 << 30))); // 1gb
-    }
-
-    public int getJdbcResourceStoreMaxCellSize() {
-        return Integer.parseInt(getOptional("kylin.metadata.jdbc.max-cell-size", "1048576")); // 1mb
-    }
-
-    public String getJdbcSourceAdaptor() {
-        return getOptional("kylin.source.jdbc.adaptor");
-    }
-
-    public boolean isLimitPushDownEnabled() {
-        return Boolean.parseBoolean(getOptional("kylin.storage.limit-push-down-enabled", TRUE));
+    @ConfigTag(ConfigTag.Tag.DEPRECATED)
+    public boolean isLimitPushDownEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.storage.limit-push-down-enabled", TRUE));
     }
 
     // ============================================================================
-    // Realtime streaming
+    // Realtime streaming (Removed)
     // ============================================================================
-    public String getStreamingStoreClass() {
-        return getOptional("kylin.stream.store.class",
-                "org.apache.kylin.stream.core.storage.columnar.ColumnarSegmentStore");
-    }
-
-    public String getStreamingBasicCuboidJobDFSBlockSize() {
-        return getOptional("kylin.stream.job.dfs.block.size", String.valueOf(16 * 1024 * 1024));
-    }
-
-    public String getStreamingIndexPath() {
-        return getOptional("kylin.stream.index.path", "stream_index");
-    }
-
-    public int getStreamingCubeConsumerTasksNum() {
-        return Integer.parseInt(getOptional("kylin.stream.cube-num-of-consumer-tasks", "3"));
-    }
-
-    public int getStreamingCubeWindowInSecs() {
-        return Integer.parseInt(getOptional("kylin.stream.cube.window", "3600"));
-    }
-
-    public int getStreamingCubeDurationInSecs() {
-        return Integer.parseInt(getOptional("kylin.stream.cube.duration", "7200"));
-    }
-
-    public int getStreamingCubeMaxDurationInSecs() {
-        return Integer.parseInt(getOptional("kylin.stream.cube.duration.max", "43200"));
-    }
-
-    public int getStreamingCheckPointFileMaxNum() {
-        return Integer.parseInt(getOptional("kylin.stream.checkpoint.file.max.num", "5"));
-    }
-
-    public int getStreamingCheckPointIntervalsInSecs() {
-        return Integer.parseInt(getOptional("kylin.stream.index.checkpoint.intervals", "300"));
-    }
-
-    public int getStreamingIndexMaxRows() {
-        return Integer.parseInt(getOptional("kylin.stream.index.maxrows", "50000"));
-    }
-
-    public int getStreamingMaxImmutableSegments() {
-        return Integer.parseInt(getOptional("kylin.stream.immutable.segments.max.num", "100"));
-    }
-
-    public boolean isStreamingConsumeFromLatestOffsets() {
-        return Boolean.parseBoolean(getOptional("kylin.stream.consume.offsets.latest", "true"));
-    }
-
-    public String getStreamingNode() {
-        return getOptional("kylin.stream.node", null);
-    }
-
-    public Map<String, String> getStreamingNodeProperties() {
-        return getPropertiesByPrefix("kylin.stream.node");
-    }
-
-    public String getStreamingMetadataStoreType() {
-        return getOptional("kylin.stream.metadata.store.type", "zk");
-    }
-
-    public String getStreamingSegmentRetentionPolicy() {
-        return getOptional("kylin.stream.segment.retention.policy", "fullBuild");
-    }
-
-    public String getStreamingAssigner() {
-        return getOptional("kylin.stream.assigner", "DefaultAssigner");
-    }
-
-    public int getCoordinatorHttpClientTimeout() {
-        return Integer.parseInt(getOptional("kylin.stream.coordinator.client.timeout.millsecond", "5000"));
-    }
-
-    public int getReceiverHttpClientTimeout() {
-        return Integer.parseInt(getOptional("kylin.stream.receiver.client.timeout.millsecond", "5000"));
-    }
-
-    public int getStreamingReceiverHttpMaxThreads() {
-        return Integer.parseInt(getOptional("kylin.stream.receiver.http.max.threads", "200"));
-    }
-
-    public int getStreamingReceiverHttpMinThreads() {
-        return Integer.parseInt(getOptional("kylin.stream.receiver.http.min.threads", "10"));
-    }
-
-    public int getStreamingReceiverQueryCoreThreads() {
-        return Integer.parseInt(getOptional("kylin.stream.receiver.query-core-threads", "50"));
-    }
-
-    public int getStreamingReceiverQueryMaxThreads() {
-        return Integer.parseInt(getOptional("kylin.stream.receiver.query-max-threads", "200"));
-    }
-
-    public int getStreamingReceiverUseThreadsPerQuery() {
-        return Integer.parseInt(getOptional("kylin.stream.receiver.use-threads-per-query", "8"));
-    }
-
-    public int getStreamingRPCHttpConnTimeout() {
-        return Integer.parseInt(getOptional("kylin.stream.rpc.http.connect.timeout", "10000"));
-    }
-
-    public int getStreamingRPCHttpReadTimeout() {
-        return Integer.parseInt(getOptional("kylin.stream.rpc.http.read.timeout", "60000"));
-    }
-
-    public boolean isStreamingBuildAdditionalCuboids() {
-        return Boolean.parseBoolean(getOptional("kylin.stream.build.additional.cuboids", "false"));
-    }
-
-    public Map<String, String> getStreamingSegmentRetentionPolicyProperties(String policyName) {
-        return getPropertiesByPrefix("kylin.stream.segment.retention.policy." + policyName + ".");
-    }
-
-    public int getStreamingMaxFragmentsInSegment() {
-        return Integer.parseInt(getOptional("kylin.stream.segment-max-fragments", "50"));
-    }
-
-    public int getStreamingMinFragmentsInSegment() {
-        return Integer.parseInt(getOptional("kylin.stream.segment-min-fragments", "15"));
-    }
-
-    public int getStreamingMaxFragmentSizeInMb() {
-        return Integer.parseInt(getOptional("kylin.stream.max-fragment-size-mb", "300"));
-    }
-
-    public boolean isStreamingFragmentsAutoMergeEnabled() {
-        return Boolean.parseBoolean(getOptional("kylin.stream.fragments-auto-merge-enable", "true"));
-    }
-
-    public boolean isStreamingConcurrentScanEnabled() {
-        return Boolean.parseBoolean(getOptional("kylin.stream.segment.concurrent.scan", "false"));
-    }
-
-    public boolean isStreamingStandAloneMode() {
-        return Boolean.parseBoolean(getOptional("kylin.stream.stand-alone.mode", "false"));
-    }
-
-    public boolean isNewCoordinatorEnabled() {
-        return Boolean.parseBoolean(getOptional("kylin.stream.new.coordinator-enabled", "true"));
-    }
-
-    public String getLocalStorageImpl() {
-        return getOptional("kylin.stream.settled.storage", null);
-    }
-
-    public String getStreamMetrics() {
-        return getOptional("kylin.stream.metrics.option", "");
-    }
-
-    /**
-     * whether to print encode integer value for count distinct string value, only for debug/test purpose
-     */
-    public boolean isPrintRealtimeDictEnabled() {
-        return Boolean.parseBoolean(getOptional("kylin.stream.print-realtime-dict-enabled", "false"));
-    }
-
-    public long getStreamMetricsInterval() {
-        return Long.parseLong(getOptional("kylin.stream.metrics.interval", "5"));
-    }
-
-    /**
-     * whether realtime query should add timezone offset by kylin's web-timezone, please refer to KYLIN-4010 for detail
-     */
-    public String getStreamingDerivedTimeTimezone() {
-        return (getOptional("kylin.stream.event.timezone", ""));
-    }
-
-    public boolean isAutoResubmitDiscardJob() {
-        return Boolean.parseBoolean(getOptional("kylin.stream.auto-resubmit-after-discard-enabled", "true"));
-    }
-
-    public String getHiveDatabaseLambdaCube() {
-        return this.getOptional("kylin.stream.hive.database-for-lambda-cube", DEFAULT);
-    }
 
     // ============================================================================
     // Health Check CLI
@@ -2677,7 +2417,7 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
     // ============================================================================
-    // Kylin 4.x related
+    // ENGINE.SPARK (Kylin 4)
     // ============================================================================
 
     public String getKylinParquetJobJarPath() {
@@ -2776,22 +2516,106 @@ public abstract class KylinConfigBase implements Serializable {
         return new Path(path);
     }
 
-    public boolean isSnapshotParallelBuildEnabled() {
-        return Boolean.parseBoolean(getOptional("kylin.snapshot.parallel-build-enabled", "true"));
+    public Map<String, String> getSparkConfigOverride() {
+        return getPropertiesByPrefix("kylin.engine.spark-conf.");
     }
 
-    public boolean isUTEnv() {
-        return "UT".equals(getDeployEnv());
+    public boolean isAutoSetSparkConf() {
+        return Boolean.parseBoolean(getOptional("kylin.spark-conf.auto.prior", "true"));
     }
 
-    public boolean isLocalEnv() {
-        return "LOCAL".equals(getDeployEnv());
+    public String getBuildConf() {
+        return getOptional("kylin.engine.submit-hadoop-conf-dir", "");
     }
 
-    public int snapshotParallelBuildTimeoutSeconds() {
-        return Integer.parseInt(getOptional("kylin.snapshot.parallel-build-timeout-seconds", "3600"));
+    public boolean isJobLogPrintEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.job.log-print-enabled", "true"));
+    }
+
+    @ConfigTag(ConfigTag.Tag.DEBUG_HACK)
+    public String getClusterInfoFetcherClassName() {
+        return getOptional("kylin.engine.spark.cluster-info-fetcher-class-name",
+                "org.apache.kylin.cluster.YarnInfoFetcher");
+    }
+
+    @ConfigTag(ConfigTag.Tag.DEBUG_HACK)
+    public String getSparkMergeClassName() {
+        return getOptional("kylin.engine.spark.merge-class-name", "org.apache.kylin.engine.spark.job.CubeMergeJob");
+    }
+
+    public String getParentDatasetStorageLevel() {
+        return getOptional("kylin.engine.spark.parent-dataset.storage.level", "NONE");
+    }
+
+    public int getMaxParentDatasetPersistCount() {
+        return Integer.parseInt(getOptional("kylin.engine.spark.parent-dataset.max.persist.count", "1"));
+    }
+
+    public int getRepartitionNumAfterEncode() {
+        return Integer.valueOf(getOptional("kylin.engine.spark.dataset.repartition.num.after.encoding", "0"));
+    }
+
+
+    public int getSparkEngineMaxRetryTime() {
+        return Integer.parseInt(getOptional("kylin.engine.max-retry-time", "3"));
+    }
+
+    public double getSparkEngineRetryMemoryGradient() {
+        return Double.parseDouble(getOptional("kylin.engine.retry-memory-gradient", "1.5"));
+    }
+
+    public double getSparkEngineRetryOverheadMemoryGradient() {
+        return Double.parseDouble(getOptional("kylin.engine.retry-overheadMemory-gradient", "0.2"));
+    }
+
+    public Double getMaxAllocationResourceProportion() {
+        return Double.parseDouble(getOptional("kylin.engine.max-allocation-proportion", "0.9"));
+    }
+
+    public int getSparkEngineBaseExecutorInstances() {
+        return Integer.parseInt(getOptional("kylin.engine.base-executor-instance", "5"));
+    }
+
+    public String getSparkEngineRequiredTotalCores() {
+        return getOptional("kylin.engine.spark.required-cores", "1");
+    }
+
+    public String getSparkEngineExecutorInstanceStrategy() {
+        return getOptional("kylin.engine.executor-instance-strategy", "100,2,500,3,1000,4");
+    }
+
+    public int getSnapshotShardSizeMB() {
+        return Integer.parseInt(getOptional("kylin.snapshot.shard-size-mb", "128"));
+    }
+
+    /***
+     * Global dictionary will be split into several buckets. To encode a column to int value more
+     * efficiently, source dataset will be repartitioned by the to-be encoded column to the same
+     * amount of partitions as the dictionary's bucket size.
+     *
+     * It sometimes bring side effect, because repartitioning by a single column is more likely to cause
+     * serious data skew, causing one task takes the majority of time in first layer's cuboid building.
+     *
+     * When faced with this case, you can try repartitioning encoded dataset by all
+     * RowKey columns to avoid data skew. The repartition size is default to max bucket
+     * size of all dictionaries, but you can also set to other flexible value by this option:
+     * 'kylin.engine.spark.dataset.repartition.num.after.encoding'
+     ***/
+    public boolean rePartitionEncodedDatasetWithRowKey() {
+        return Boolean.valueOf(getOptional("kylin.engine.spark.repartition.encoded.dataset", "false"));
+    }
+
+    /**
+     * If we should calculate cuboid statistics for each segment, which is needed for cube planner phase two
+     */
+    public boolean isSegmentStatisticsEnabled() {
+        return Boolean.parseBoolean(this.getOptional("kylin.engine.segment-statistics-enabled", "false"));
     }
 
+    // ============================================================================
+    // STORAGE.PARQUET
+    // ============================================================================
+
     public String getStorageProvider() {
         return getOptional("kylin.storage.provider", "org.apache.kylin.common.storage.DefaultStorageProvider");
     }
@@ -2842,61 +2666,21 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.valueOf(getOptional("kylin.storage.columnar.dfs-replication", "3"));
     }
 
-    public boolean isAutoSetSparkConf() {
-        return Boolean.parseBoolean(getOptional("kylin.spark-conf.auto.prior", "true"));
-    }
-
-    public String getBuildConf() {
-        return getOptional("kylin.engine.submit-hadoop-conf-dir", "");
-    }
-
-    public boolean isJobLogPrintEnabled() {
-        return Boolean.parseBoolean(getOptional("kylin.job.log-print-enabled", "true"));
-    }
-
-    @ConfigTag(ConfigTag.Tag.DEBUG_HACK)
-    public String getClusterInfoFetcherClassName() {
-        return getOptional("kylin.engine.spark.cluster-info-fetcher-class-name",
-                "org.apache.kylin.cluster.YarnInfoFetcher");
-    }
-
-    @ConfigTag(ConfigTag.Tag.DEBUG_HACK)
-    public String getSparkMergeClassName() {
-        return getOptional("kylin.engine.spark.merge-class-name", "org.apache.kylin.engine.spark.job.CubeMergeJob");
-    }
-
-    public int getSparkEngineMaxRetryTime() {
-        return Integer.parseInt(getOptional("kylin.engine.max-retry-time", "3"));
-    }
-
-    public double getSparkEngineRetryMemoryGradient() {
-        return Double.parseDouble(getOptional("kylin.engine.retry-memory-gradient", "1.5"));
-    }
-
-    public double getSparkEngineRetryOverheadMemoryGradient() {
-        return Double.parseDouble(getOptional("kylin.engine.retry-overheadMemory-gradient", "0.2"));
-    }
-
-    public Double getMaxAllocationResourceProportion() {
-        return Double.parseDouble(getOptional("kylin.engine.max-allocation-proportion", "0.9"));
-    }
-
-    public int getSparkEngineBaseExecutorInstances() {
-        return Integer.parseInt(getOptional("kylin.engine.base-executor-instance", "5"));
-    }
-
-    public String getSparkEngineRequiredTotalCores() {
-        return getOptional("kylin.engine.spark.required-cores", "1");
-    }
-
-    public String getSparkEngineExecutorInstanceStrategy() {
-        return getOptional("kylin.engine.executor-instance-strategy", "100,2,500,3,1000,4");
-    }
+    // ============================================================================
+    // Query Engine (Sparder)
+    // ============================================================================
 
-    public int getSnapshotShardSizeMB() {
-        return Integer.parseInt(getOptional("kylin.snapshot.shard-size-mb", "128"));
+    /**
+     * Get the sparder app name, default value is: 'sparder_on_localhost-7070'
+     */
+    public String getSparderAppName() {
+        String customSparderAppName = getOptional("kylin.query.sparder-context.app-name", "");
+        if (StringUtils.isEmpty(customSparderAppName)) {
+            customSparderAppName =
+                    "sparder_on_" + getServerRestAddress().replaceAll(":", "-");
+        }
+        return customSparderAppName;
     }
-
     /**
      * driver memory that can be used by join(mostly BHJ)
      */
@@ -3068,25 +2852,22 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.parseInt(this.getOptional("kylin.canary.sparder-context-period-min", "3"));
     }
 
-    /**
-     * If we should calculate cuboid statistics for each segment, which is needed for cube planner phase two
-     */
-    public boolean isSegmentStatisticsEnabled() {
-        return Boolean.parseBoolean(this.getOptional("kylin.engine.segment-statistics-enabled", "false"));
-    }
 
     // ============================================================================
     // Spark with Kerberos
     // ============================================================================
 
+    @ConfigTag(ConfigTag.Tag.NOT_CLEAR)
     public Boolean isKerberosEnabled() {
         return Boolean.valueOf(getOptional("kylin.kerberos.enabled", FALSE));
     }
 
+    @ConfigTag(ConfigTag.Tag.NOT_CLEAR)
     public String getKerberosKeytab() {
         return getOptional("kylin.kerberos.keytab", "");
     }
 
+    @ConfigTag(ConfigTag.Tag.NOT_CLEAR)
     public String getKerberosKeytabPath() {
         return KylinConfig.getKylinConfDir() + File.separator + getKerberosKeytab();
     }
@@ -3136,64 +2917,8 @@ public abstract class KylinConfigBase implements Serializable {
         return KylinConfig.getKylinConfDir() + File.separator + getKerberosJaasConf();
     }
 
+    @ConfigTag(ConfigTag.Tag.NOT_CLEAR)
     public String getKerberosPrincipal() {
         return getOptional("kylin.kerberos.principal");
     }
-
-    public String getParentDatasetStorageLevel() {
-        return getOptional("kylin.engine.spark.parent-dataset.storage.level", "NONE");
-    }
-
-    public int getMaxParentDatasetPersistCount() {
-        return Integer.parseInt(getOptional("kylin.engine.spark.parent-dataset.max.persist.count", "1"));
-    }
-
-    public int getRepartitionNumAfterEncode() {
-        return Integer.valueOf(getOptional("kylin.engine.spark.dataset.repartition.num.after.encoding", "0"));
-    }
-
-    /***
-     * Global dictionary will be split into several buckets. To encode a column to int value more
-     * efficiently, source dataset will be repartitioned by the to-be encoded column to the same
-     * amount of partitions as the dictionary's bucket size.
-     *
-     * It sometimes bring side effect, because repartitioning by a single column is more likely to cause
-     * serious data skew, causing one task takes the majority of time in first layer's cuboid building.
-     *
-     * When faced with this case, you can try repartitioning encoded dataset by all
-     * RowKey columns to avoid data skew. The repartition size is default to max bucket
-     * size of all dictionaries, but you can also set to other flexible value by this option:
-     * 'kylin.engine.spark.dataset.repartition.num.after.encoding'
-     ***/
-    public boolean rePartitionEncodedDatasetWithRowKey() {
-        return Boolean.valueOf(getOptional("kylin.engine.spark.repartition.encoded.dataset", "false"));
-    }
-
-    /*
-     * Detect dataset skew in dictionary encode step.
-     * */
-    public boolean detectDataSkewInDictEncodingEnabled() {
-        return Boolean.valueOf(getOptional("kylin.dictionary.detect.data.skew.in.encoding", "false"));
-    }
-
-    /*
-    * In some data skew cases, the repartition step during dictionary encoding will be slow.
-    * We can choose to sample from the dataset to detect skewed. This configuration is used to set the sample rate.
-    * */
-    public double sampleRateInEncodingSkewDetection() {
-        return Double.valueOf(getOptional("kylin.dictionary.detect.data.skew.sample.rate", "0.1"));
-    }
-
-    /*
-    * In KYLIN4, dictionaries are hashed into several buckets, column data are repartitioned by the same hash algorithm
-    * during encoding step too. In data skew cases, the repartition step will be very slow. Kylin will automatically
-    * sample from the source to detect skewed data and repartition these skewed data to random partitions.
-    * This configuration is used to set the skew data threshhold, valued from 0 to 1.
-    * e.g.
-    *   if you set this value to 0.05, for each value that takes up more than 5% percent of the total will be regarded
-    *   as skew data, as a result the skewed data will be no more than 20 records
-    * */
-    public double skewPercentageThreshHold() {
-        return Double.valueOf(getOptional("kylin.dictionary.data.skew.percentage.threshhold", "0.05"));
-    }
 }
diff --git a/core-common/src/main/java/org/apache/kylin/common/annotation/ConfigTag.java b/core-common/src/main/java/org/apache/kylin/common/annotation/ConfigTag.java
index 53a6967..965d252 100644
--- a/core-common/src/main/java/org/apache/kylin/common/annotation/ConfigTag.java
+++ b/core-common/src/main/java/org/apache/kylin/common/annotation/ConfigTag.java
@@ -39,6 +39,9 @@ public @interface ConfigTag {
          */
         DEPRECATED,
 
+        /**
+         * Not tested/verified
+         */
         NOT_CLEAR,
 
         /**
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index e5facf4..9b782de 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -101,10 +101,6 @@ public class HadoopUtil {
         return getFileSystem(workingPath, conf);
     }
 
-    public static FileSystem getReadFileSystem() throws IOException {
-        return getFileSystem(KylinConfig.getInstanceFromEnv().getReadHdfsWorkingDirectory());
-    }
-
     public static FileSystem getFileSystem(String path) {
         return getFileSystem(new Path(makeURI(path)));
     }
diff --git a/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java b/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java
index b4ac16b..29f92a6 100644
--- a/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/KylinConfigTest.java
@@ -76,16 +76,16 @@ public class KylinConfigTest extends HotLoadKylinPropertiesTestCase {
         assertEquals("1234", configExt.getOptional("1234"));
     }
 
-    @Test
-    public void testPropertiesHotLoad() {
-        KylinConfig config = KylinConfig.getInstanceFromEnv();
-        assertEquals("whoami@kylin.apache.org", config.getKylinOwner());
-
-        updateProperty("kylin.storage.hbase.owner-tag", "kylin@kylin.apache.org");
-        KylinConfig.getInstanceFromEnv().reloadFromSiteProperties();
-
-        assertEquals("kylin@kylin.apache.org", config.getKylinOwner());
-    }
+//    @Test
+//    public void testPropertiesHotLoad() {
+//        KylinConfig config = KylinConfig.getInstanceFromEnv();
+//        assertEquals("whoami@kylin.apache.org", config.getKylinOwner());
+//
+//        updateProperty("kylin.storage.hbase.owner-tag", "kylin@kylin.apache.org");
+//        KylinConfig.getInstanceFromEnv().reloadFromSiteProperties();
+//
+//        assertEquals("kylin@kylin.apache.org", config.getKylinOwner());
+//    }
 
     @Test
     public void testGetMetadataUrlPrefix() {
diff --git a/core-cube/pom.xml b/core-cube/pom.xml
index c024c06..91be01b 100644
--- a/core-cube/pom.xml
+++ b/core-cube/pom.xml
@@ -38,10 +38,10 @@
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-core-metadata</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-core-dictionary</artifactId>
-        </dependency>
+<!--        <dependency>-->
+<!--            <groupId>org.apache.kylin</groupId>-->
+<!--            <artifactId>kylin-core-dictionary</artifactId>-->
+<!--        </dependency>-->
         <dependency>
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-shaded-guava</artifactId>
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 585a37a..3e99268 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -27,7 +27,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -41,34 +40,23 @@ import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.persistence.WriteConflictException;
 import org.apache.kylin.common.util.AutoReadWriteLock;
 import org.apache.kylin.common.util.AutoReadWriteLock.AutoLock;
-import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeDescTiretreeGlobalDomainDictUtil;
 import org.apache.kylin.cube.model.SnapshotTableDesc;
-import org.apache.kylin.dict.DictionaryInfo;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.dict.lookup.ILookupTable;
-import org.apache.kylin.dict.lookup.LookupProviderFactory;
-import org.apache.kylin.dict.lookup.SnapshotManager;
-import org.apache.kylin.dict.lookup.SnapshotTable;
 import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
 import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
 import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
 import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.IEngineAware;
 import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.PartitionDesc;
 import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.SegmentRange.TSRange;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.Segments;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.project.RealizationEntry;
@@ -77,8 +65,6 @@ import org.apache.kylin.metadata.realization.IRealizationProvider;
 import org.apache.kylin.metadata.realization.RealizationRegistry;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.source.IReadableTable;
-import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.source.SourcePartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -129,7 +115,6 @@ public class CubeManager implements IRealizationProvider {
 
     // a few inner classes to group related methods
     private SegmentAssist segAssist = new SegmentAssist();
-    private DictionaryAssist dictAssist = new DictionaryAssist();
 
     private Random ran = new Random();
 
@@ -556,27 +541,24 @@ public class CubeManager implements IRealizationProvider {
     }
 
     public ILookupTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) {
-        String tableName = join.getPKSide().getTableIdentity();
-        CubeDesc cubeDesc = cubeSegment.getCubeDesc();
-        SnapshotTableDesc snapshotTableDesc = cubeDesc.getSnapshotTableDesc(tableName);
-        return getInMemLookupTable(cubeSegment, join, snapshotTableDesc);
+        return null;
     }
 
-    private ILookupTable getInMemLookupTable(CubeSegment cubeSegment, JoinDesc join,
-            SnapshotTableDesc snapshotTableDesc) {
-        String tableName = join.getPKSide().getTableIdentity();
-        String snapshotResPath = getSnapshotResPath(cubeSegment, tableName, snapshotTableDesc);
-        String[] pkCols = join.getPrimaryKey();
-
-        try {
-            SnapshotTable snapshot = getSnapshotManager().getSnapshotTable(snapshotResPath);
-            TableDesc tableDesc = getMetadataManager().getTableDesc(tableName, cubeSegment.getProject());
-            return LookupProviderFactory.getInMemLookupTable(tableDesc, pkCols, snapshot);
-        } catch (IOException e) {
-            throw new IllegalStateException(
-                    "Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e);
-        }
-    }
+//    private ILookupTable getInMemLookupTable(CubeSegment cubeSegment, JoinDesc join,
+//            SnapshotTableDesc snapshotTableDesc) {
+//        String tableName = join.getPKSide().getTableIdentity();
+//        String snapshotResPath = getSnapshotResPath(cubeSegment, tableName, snapshotTableDesc);
+//        String[] pkCols = join.getPrimaryKey();
+//
+//        try {
+//            SnapshotTable snapshot = getSnapshotManager().getSnapshotTable(snapshotResPath);
+//            TableDesc tableDesc = getMetadataManager().getTableDesc(tableName, cubeSegment.getProject());
+//            return LookupProviderFactory.getInMemLookupTable(tableDesc, pkCols, snapshot);
+//        } catch (IOException e) {
+//            throw new IllegalStateException(
+//                    "Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e);
+//        }
+//    }
 
     private String getSnapshotResPath(CubeSegment cubeSegment, String tableName, SnapshotTableDesc snapshotTableDesc) {
         String snapshotResPath;
@@ -594,20 +576,11 @@ public class CubeManager implements IRealizationProvider {
 
     @VisibleForTesting
     /*private*/ String generateStorageLocation(int engineType) {
-        String namePrefix = config.getHBaseTableNamePrefix();
-        String namespace = config.getHBaseStorageNameSpace();
         String tableName = "";
         do {
             StringBuffer sb = new StringBuffer();
             int identifierLength = HBASE_TABLE_LENGTH;
-            if (engineType != IEngineAware.ID_SPARK_II) {
-                if ((namespace.equals("default") || namespace.equals("")) == false) {
-                    sb.append(namespace).append(":");
-                }
-                sb.append(namePrefix);
-            } else {
                 identifierLength = PARQUET_IDENTIFIER_LENGTH;
-            }
             for (int i = 0; i < identifierLength; i++) {
                 sb.append(ALPHA_NUM.charAt(ran.nextInt(ALPHA_NUM.length())));
             }
@@ -628,14 +601,6 @@ public class CubeManager implements IRealizationProvider {
         return TableMetadataManager.getInstance(config);
     }
 
-    private DictionaryManager getDictionaryManager() {
-        return DictionaryManager.getInstance(config);
-    }
-
-    private SnapshotManager getSnapshotManager() {
-        return SnapshotManager.getInstance(config);
-    }
-
     private ResourceStore getStore() {
         return ResourceStore.getStore(this.config);
     }
@@ -1125,143 +1090,143 @@ public class CubeManager implements IRealizationProvider {
     // Dictionary/Snapshot related methods
     // ============================================================================
 
-    public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable)
-            throws IOException {
-        return dictAssist.buildDictionary(cubeSeg, col, inpTable);
-    }
-
-    public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable,
-            Dictionary<String> dict) throws IOException {
-        return dictAssist.saveDictionary(cubeSeg, col, inpTable, dict);
-    }
-
-    /**
-     * return null if no dictionary for given column
-     */
-    public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) {
-        return dictAssist.getDictionary(cubeSeg, col);
-    }
-
-    public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable, String uuid) throws IOException {
-        return dictAssist.buildSnapshotTable(cubeSeg, lookupTable, uuid);
-    }
-
-    private TableMetadataManager getMetadataManager() {
-        return TableMetadataManager.getInstance(config);
-    }
-
-    private class DictionaryAssist {
-        public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable)
-                throws IOException {
-            CubeDesc cubeDesc = cubeSeg.getCubeDesc();
-            if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
-                return null;
-
-            String builderClass = cubeDesc.getDictionaryBuilderClass(col);
-            DictionaryInfo dictInfo = getDictionaryManager().buildDictionary(col, inpTable, builderClass);
-
-            saveDictionaryInfo(cubeSeg, col, dictInfo);
-            return dictInfo;
-        }
-
-        public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable,
-                Dictionary<String> dict) throws IOException {
-            CubeDesc cubeDesc = cubeSeg.getCubeDesc();
-            if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
-                return null;
-
-            DictionaryInfo dictInfo = getDictionaryManager().saveDictionary(col, inpTable, dict);
-
-            saveDictionaryInfo(cubeSeg, col, dictInfo);
-            return dictInfo;
-        }
-
-        private void saveDictionaryInfo(CubeSegment cubeSeg, TblColRef col, DictionaryInfo dictInfo)
-                throws IOException {
-            if (dictInfo == null)
-                return;
-
-            // work on copy instead of cached objects
-            CubeInstance cubeCopy = cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy
-            CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid());
-
-            Dictionary<?> dict = dictInfo.getDictionaryObject();
-            segCopy.putDictResPath(col, dictInfo.getResourcePath());
-            segCopy.getRowkeyStats().add(new Object[] { col.getIdentity(), dict.getSize(), dict.getSizeOfId() });
-
-            CubeUpdate update = new CubeUpdate(cubeCopy);
-            update.setToUpdateSegs(segCopy);
-            updateCube(update);
-        }
-
-        /**
-         * return null if no dictionary for given column
-         */
-        @SuppressWarnings("unchecked")
-        public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) {
-            DictionaryInfo info = null;
-            String dictResPath = null;
-            try {
-                DictionaryManager dictMgr = getDictionaryManager();
-
-                //tiretree global domain dic
-                List<CubeDescTiretreeGlobalDomainDictUtil.GlobalDict> globalDicts = cubeSeg.getCubeDesc().listDomainDict();
-                if (!globalDicts.isEmpty()) {
-                    dictResPath = CubeDescTiretreeGlobalDomainDictUtil.globalReuseDictPath(cubeSeg.getConfig(), col, cubeSeg.getCubeDesc());
-                }
-
-                if (Objects.isNull(dictResPath)){
-                    dictResPath = cubeSeg.getDictResPath(col);
-                }
-
-                if (dictResPath == null)
-                    return null;
-
-                info = dictMgr.getDictionaryInfo(dictResPath);
-                if (info == null)
-                    throw new IllegalStateException("No dictionary found by " + dictResPath
-                            + ", invalid cube state; cube segment" + cubeSeg + ", col " + col);
-            } catch (IOException e) {
-                throw new IllegalStateException("Failed to get dictionary for cube segment" + cubeSeg + ", col" + col,
-                        e);
-            }
-            return (Dictionary<String>) info.getDictionaryObject();
-        }
-
-        public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable, String uuid)
-                throws IOException {
-            // work on copy instead of cached objects
-            CubeInstance cubeCopy = cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy
-            CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid());
-
-            TableMetadataManager metaMgr = getTableManager();
-            SnapshotManager snapshotMgr = getSnapshotManager();
-
-            TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable, segCopy.getProject()));
-            IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc, uuid);
-            SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc, cubeSeg.getConfig());
-
-            CubeDesc cubeDesc = cubeSeg.getCubeDesc();
-            if (!cubeDesc.isGlobalSnapshotTable(lookupTable)) {
-                segCopy.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
-                CubeUpdate update = new CubeUpdate(cubeCopy);
-                update.setToUpdateSegs(segCopy);
-                updateCube(update);
-
-                // Update the input cubeSeg after the resource store updated
-                cubeSeg.putSnapshotResPath(lookupTable, segCopy.getSnapshotResPath(lookupTable));
-            } else {
-                CubeUpdate cubeUpdate = new CubeUpdate(cubeCopy);
-                Map<String, String> map = Maps.newHashMap();
-                map.put(lookupTable, snapshot.getResourcePath());
-                cubeUpdate.setUpdateTableSnapshotPath(map);
-                updateCube(cubeUpdate);
-
-                cubeSeg.getCubeInstance().putSnapshotResPath(lookupTable, snapshot.getResourcePath());
-            }
-            return snapshot;
-        }
-    }
+//    public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable)
+//            throws IOException {
+//        return dictAssist.buildDictionary(cubeSeg, col, inpTable);
+//    }
+//
+//    public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable,
+//            Dictionary<String> dict) throws IOException {
+//        return dictAssist.saveDictionary(cubeSeg, col, inpTable, dict);
+//    }
+//
+//    /**
+//     * return null if no dictionary for given column
+//     */
+//    public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) {
+//        return dictAssist.getDictionary(cubeSeg, col);
+//    }
+//
+//    public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable, String uuid) throws IOException {
+//        return dictAssist.buildSnapshotTable(cubeSeg, lookupTable, uuid);
+//    }
+//
+//    private TableMetadataManager getMetadataManager() {
+//        return TableMetadataManager.getInstance(config);
+//    }
+//
+//    private class DictionaryAssist {
+//        public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable)
+//                throws IOException {
+//            CubeDesc cubeDesc = cubeSeg.getCubeDesc();
+//            if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
+//                return null;
+//
+//            String builderClass = cubeDesc.getDictionaryBuilderClass(col);
+//            DictionaryInfo dictInfo = getDictionaryManager().buildDictionary(col, inpTable, builderClass);
+//
+//            saveDictionaryInfo(cubeSeg, col, dictInfo);
+//            return dictInfo;
+//        }
+//
+//        public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable,
+//                Dictionary<String> dict) throws IOException {
+//            CubeDesc cubeDesc = cubeSeg.getCubeDesc();
+//            if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
+//                return null;
+//
+//            DictionaryInfo dictInfo = getDictionaryManager().saveDictionary(col, inpTable, dict);
+//
+//            saveDictionaryInfo(cubeSeg, col, dictInfo);
+//            return dictInfo;
+//        }
+//
+//        private void saveDictionaryInfo(CubeSegment cubeSeg, TblColRef col, DictionaryInfo dictInfo)
+//                throws IOException {
+//            if (dictInfo == null)
+//                return;
+//
+//            // work on copy instead of cached objects
+//            CubeInstance cubeCopy = cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy
+//            CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid());
+//
+//            Dictionary<?> dict = dictInfo.getDictionaryObject();
+//            segCopy.putDictResPath(col, dictInfo.getResourcePath());
+//            segCopy.getRowkeyStats().add(new Object[] { col.getIdentity(), dict.getSize(), dict.getSizeOfId() });
+//
+//            CubeUpdate update = new CubeUpdate(cubeCopy);
+//            update.setToUpdateSegs(segCopy);
+//            updateCube(update);
+//        }
+//
+//        /**
+//         * return null if no dictionary for given column
+//         */
+//        @SuppressWarnings("unchecked")
+//        public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) {
+//            DictionaryInfo info = null;
+//            String dictResPath = null;
+//            try {
+//                DictionaryManager dictMgr = getDictionaryManager();
+//
+//                //tiretree global domain dic
+//                List<CubeDescTiretreeGlobalDomainDictUtil.GlobalDict> globalDicts = cubeSeg.getCubeDesc().listDomainDict();
+//                if (!globalDicts.isEmpty()) {
+//                    dictResPath = CubeDescTiretreeGlobalDomainDictUtil.globalReuseDictPath(cubeSeg.getConfig(), col, cubeSeg.getCubeDesc());
+//                }
+//
+//                if (Objects.isNull(dictResPath)){
+//                    dictResPath = cubeSeg.getDictResPath(col);
+//                }
+//
+//                if (dictResPath == null)
+//                    return null;
+//
+//                info = dictMgr.getDictionaryInfo(dictResPath);
+//                if (info == null)
+//                    throw new IllegalStateException("No dictionary found by " + dictResPath
+//                            + ", invalid cube state; cube segment" + cubeSeg + ", col " + col);
+//            } catch (IOException e) {
+//                throw new IllegalStateException("Failed to get dictionary for cube segment" + cubeSeg + ", col" + col,
+//                        e);
+//            }
+//            return (Dictionary<String>) info.getDictionaryObject();
+//        }
+//
+//        public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable, String uuid)
+//                throws IOException {
+//            // work on copy instead of cached objects
+//            CubeInstance cubeCopy = cubeSeg.getCubeInstance().latestCopyForWrite(); // get a latest copy
+//            CubeSegment segCopy = cubeCopy.getSegmentById(cubeSeg.getUuid());
+//
+//            TableMetadataManager metaMgr = getTableManager();
+//            SnapshotManager snapshotMgr = getSnapshotManager();
+//
+//            TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable, segCopy.getProject()));
+//            IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc, uuid);
+//            SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc, cubeSeg.getConfig());
+//
+//            CubeDesc cubeDesc = cubeSeg.getCubeDesc();
+//            if (!cubeDesc.isGlobalSnapshotTable(lookupTable)) {
+//                segCopy.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
+//                CubeUpdate update = new CubeUpdate(cubeCopy);
+//                update.setToUpdateSegs(segCopy);
+//                updateCube(update);
+//
+//                // Update the input cubeSeg after the resource store updated
+//                cubeSeg.putSnapshotResPath(lookupTable, segCopy.getSnapshotResPath(lookupTable));
+//            } else {
+//                CubeUpdate cubeUpdate = new CubeUpdate(cubeCopy);
+//                Map<String, String> map = Maps.newHashMap();
+//                map.put(lookupTable, snapshot.getResourcePath());
+//                cubeUpdate.setUpdateTableSnapshotPath(map);
+//                updateCube(cubeUpdate);
+//
+//                cubeSeg.getCubeInstance().putSnapshotResPath(lookupTable, snapshot.getResourcePath());
+//            }
+//            return snapshot;
+//        }
+//    }
 
     /**
      * To keep "select * from LOOKUP_TABLE" has consistent and latest result, we manually choose
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index c32da70..5205cc5 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -371,26 +371,24 @@ public class CubeSegment implements IBuildable, ISegment, Serializable {
         this.storageLocationIdentifier = storageLocationIdentifier;
     }
 
-    public Map<TblColRef, Dictionary<String>> buildDictionaryMap() {
-        Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
-        for (TblColRef col : getCubeDesc().getAllColumnsHaveDictionary()) {
-            result.put(col, (Dictionary<String>) getDictionary(col));
-        }
-        return result;
-    }
-
-    public Map<TblColRef, Dictionary<String>> buildGlobalDictionaryMap(int globalColumnsSize) {
-        Map<TblColRef, Dictionary<String>> result = Maps.newHashMapWithExpectedSize(globalColumnsSize);
-        for (TblColRef col : getCubeDesc().getAllGlobalDictColumns()) {
-            result.put(col, getDictionary(col));
-        }
-        return result;
-    }
+//    public Map<TblColRef, Dictionary<String>> buildDictionaryMap() {
+//        Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
+//        for (TblColRef col : getCubeDesc().getAllColumnsHaveDictionary()) {
+//            result.put(col, (Dictionary<String>) getDictionary(col));
+//        }
+//        return result;
+//    }
+//
+//    public Map<TblColRef, Dictionary<String>> buildGlobalDictionaryMap(int globalColumnsSize) {
+//        Map<TblColRef, Dictionary<String>> result = Maps.newHashMapWithExpectedSize(globalColumnsSize);
+//        for (TblColRef col : getCubeDesc().getAllGlobalDictColumns()) {
+//            result.put(col, getDictionary(col));
+//        }
+//        return result;
+//    }
 
     public Dictionary<String> getDictionary(TblColRef col) {
-        TblColRef reuseCol = getCubeDesc().getDictionaryReuseColumn(col);
-        CubeManager cubeMgr = CubeManager.getInstance(this.getCubeInstance().getConfig());
-        return cubeMgr.getDictionary(this, reuseCol);
+        return null;
     }
 
     public CubeDimEncMap getDimensionEncodingMap() {
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ILookupTable.java b/core-cube/src/main/java/org/apache/kylin/cube/ILookupTable.java
similarity index 96%
rename from core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ILookupTable.java
rename to core-cube/src/main/java/org/apache/kylin/cube/ILookupTable.java
index dccb7c4..794b1e5 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ILookupTable.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/ILookupTable.java
@@ -16,7 +16,7 @@
  * limitations under the License.
 */
 
-package org.apache.kylin.dict.lookup;
+package org.apache.kylin.cube;
 
 import org.apache.kylin.common.util.Array;
 
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
deleted file mode 100644
index 0815942..0000000
--- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.cube.cli;
-
-import java.io.IOException;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.io.IOUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.DictionaryInfo;
-import org.apache.kylin.dict.DictionaryInfoSerializer;
-import org.apache.kylin.dict.DictionaryProvider;
-import org.apache.kylin.dict.DistinctColumnValuesProvider;
-import org.apache.kylin.dict.lookup.ILookupTable;
-import org.apache.kylin.dict.lookup.SnapshotTable;
-import org.apache.kylin.dict.lookup.SnapshotTableSerializer;
-import org.apache.kylin.metadata.model.JoinDesc;
-import org.apache.kylin.metadata.model.TableRef;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.IReadableTable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kylin.shaded.com.google.common.collect.Sets;
-
-public class DictionaryGeneratorCLI {
-
-    private DictionaryGeneratorCLI() {
-    }
-
-    private static final Logger logger = LoggerFactory.getLogger(DictionaryGeneratorCLI.class);
-
-    public static void processSegment(KylinConfig config, String cubeName, String segmentID, String uuid,
-            DistinctColumnValuesProvider factTableValueProvider, DictionaryProvider dictProvider) throws IOException {
-        CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
-        CubeSegment segment = cube.getSegmentById(segmentID);
-
-        int retryTime = 0;
-        while (retryTime < 3) {
-            if (retryTime > 0) {
-                logger.info("Rebuild dictionary and snapshot for Cube: {}, Segment: {}, {} times.", cubeName, segmentID,
-                        retryTime);
-            }
-
-            processSegment(config, segment, uuid, factTableValueProvider, dictProvider);
-
-            if (isAllDictsAndSnapshotsReady(config, cubeName, segmentID)) {
-                break;
-            }
-            retryTime++;
-        }
-
-        if (retryTime >= 3) {
-            logger.error("Not all dictionaries and snapshots ready for cube segment: {}", segmentID);
-        } else {
-            logger.info("Succeed to build all dictionaries and snapshots for cube segment: {}", segmentID);
-        }
-    }
-
-    private static void processSegment(KylinConfig config, CubeSegment cubeSeg, String uuid,
-            DistinctColumnValuesProvider factTableValueProvider, DictionaryProvider dictProvider) throws IOException {
-        CubeManager cubeMgr = CubeManager.getInstance(config);
-
-        // dictionary
-        for (TblColRef col : cubeSeg.getCubeDesc().getAllColumnsNeedDictionaryBuilt()) {
-            logger.info("Building dictionary for {}", col);
-            IReadableTable inpTable = factTableValueProvider.getDistinctValuesFor(col);
-
-            Dictionary<String> preBuiltDict = null;
-            if (dictProvider != null) {
-                preBuiltDict = dictProvider.getDictionary(col);
-            }
-
-            if (preBuiltDict != null) {
-                logger.debug("Dict for '{}' has already been built, save it", col.getName());
-                cubeMgr.saveDictionary(cubeSeg, col, inpTable, preBuiltDict);
-            } else {
-                logger.debug("Dict for '{}' not pre-built, build it from {}", col.getName(), inpTable);
-                cubeMgr.buildDictionary(cubeSeg, col, inpTable);
-            }
-        }
-
-        // snapshot
-        Set<String> toSnapshot = Sets.newHashSet();
-        Set<TableRef> toCheckLookup = Sets.newHashSet();
-        for (DimensionDesc dim : cubeSeg.getCubeDesc().getDimensions()) {
-            TableRef table = dim.getTableRef();
-            if (cubeSeg.getModel().isLookupTable(table)) {
-                // only the snapshot desc is not ext type, need to take snapshot
-                toSnapshot.add(table.getTableIdentity());
-                toCheckLookup.add(table);
-            }
-        }
-
-        for (String tableIdentity : toSnapshot) {
-            logger.info("Building snapshot of {}", tableIdentity);
-            cubeMgr.buildSnapshotTable(cubeSeg, tableIdentity, uuid);
-        }
-
-        CubeInstance updatedCube = cubeMgr.getCube(cubeSeg.getCubeInstance().getName());
-        cubeSeg = updatedCube.getSegmentById(cubeSeg.getUuid());
-        for (TableRef lookup : toCheckLookup) {
-            logger.info("Checking snapshot of {}", lookup);
-            try {
-                JoinDesc join = cubeSeg.getModel().getJoinsTree().getJoinByPKSide(lookup);
-                ILookupTable table = cubeMgr.getLookupTable(cubeSeg, join);
-                if (table != null) {
-                    IOUtils.closeStream(table);
-                }
-            } catch (Throwable th) {
-                throw new RuntimeException(String.format(Locale.ROOT, "Checking snapshot of %s failed.", lookup), th);
-            }
-        }
-    }
-
-    private static boolean isAllDictsAndSnapshotsReady(KylinConfig config, String cubeName, String segmentID) {
-        CubeInstance cube = CubeManager.getInstance(config).reloadCube(cubeName);
-        CubeSegment segment = cube.getSegmentById(segmentID);
-        ResourceStore store = ResourceStore.getStore(config);
-
-        // check dicts
-        logger.info("Begin to check if all dictionaries exist of Segment: {}", segmentID);
-        Map<String, String> dictionaries = segment.getDictionaries();
-        for (Map.Entry<String, String> entry : dictionaries.entrySet()) {
-            String dictResPath = entry.getValue();
-            String dictKey = entry.getKey();
-            try {
-                DictionaryInfo dictInfo = store.getResource(dictResPath, DictionaryInfoSerializer.INFO_SERIALIZER);
-                if (dictInfo == null) {
-                    logger.warn("Dictionary=[key: {}, resource path: {}] doesn't exist in resource store", dictKey,
-                            dictResPath);
-                    return false;
-                }
-            } catch (IOException e) {
-                logger.warn("Dictionary=[key: {}, path: {}] failed to check, details: {}", dictKey, dictResPath, e);
-                return false;
-            }
-        }
-
-        // check snapshots
-        logger.info("Begin to check if all snapshots exist of Segment: {}", segmentID);
-        Map<String, String> snapshots = segment.getSnapshots();
-        for (Map.Entry<String, String> entry : snapshots.entrySet()) {
-            String snapshotKey = entry.getKey();
-            String snapshotResPath = entry.getValue();
-            try {
-                SnapshotTable snapshot = store.getResource(snapshotResPath, SnapshotTableSerializer.INFO_SERIALIZER);
-                if (snapshot == null) {
-                    logger.info("SnapshotTable=[key: {}, resource path: {}] doesn't exist in resource store",
-                            snapshotKey, snapshotResPath);
-                    return false;
-                }
-            } catch (IOException e) {
-                logger.warn("SnapshotTable=[key: {}, resource path: {}]  failed to check, details: {}", snapshotKey,
-                        snapshotResPath, e);
-                return false;
-            }
-        }
-
-        logger.info("All dictionaries and snapshots exist checking succeed for Cube Segment: {}", segmentID);
-        return true;
-    }
-}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DumpDictionaryCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DumpDictionaryCLI.java
deleted file mode 100644
index 729a6da..0000000
--- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DumpDictionaryCLI.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.cube.cli;
-
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Date;
-
-import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.dict.DictionaryInfo;
-import org.apache.kylin.dict.DictionaryInfoSerializer;
-
-public class DumpDictionaryCLI {
-
-    public static void main(String[] args) throws IOException {
-        for (String path : args) {
-            dump(new File(path));
-        }
-    }
-
-    public static void dump(File f) throws IOException {
-        if (f.isDirectory()) {
-            File[] files = f.listFiles();
-            if (files == null) {
-                return;
-            }
-            for (File c : files)
-                dump(c);
-            return;
-        }
-
-        if (f.getName().endsWith(".dict")) {
-            DictionaryInfoSerializer ser = new DictionaryInfoSerializer();
-            DictionaryInfo dictInfo = ser.deserialize(new DataInputStream(new FileInputStream(f)));
-
-            System.out.println("============================================================================");
-            System.out.println("File: " + f.getAbsolutePath());
-            System.out.println(new Date(dictInfo.getLastModified()));
-            System.out.println(JsonUtil.writeValueAsIndentString(dictInfo));
-            dictInfo.getDictionaryObject().dump(System.out);
-            System.out.println();
-        }
-    }
-}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index c0c6882..6bf67db 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -60,8 +60,6 @@ import org.apache.kylin.common.util.Array;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.dict.GlobalDictionaryBuilder;
-import org.apache.kylin.dict.global.SegmentAppendTrieDictBuilder;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.measure.extendedcolumn.ExtendedColumnMeasureType;
 import org.apache.kylin.metadata.MetadataConstants;
@@ -1559,30 +1557,30 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware {
         return null;
     }
 
-    public List<TblColRef> getAllGlobalDictColumns() {
-        List<TblColRef> globalDictCols = new ArrayList<TblColRef>();
-        List<DictionaryDesc> dictionaryDescList = getDictionaries();
-
-        if (dictionaryDescList == null) {
-            return globalDictCols;
-        }
-
-        for (DictionaryDesc dictionaryDesc : dictionaryDescList) {
-            String cls = dictionaryDesc.getBuilderClass();
-            if (GlobalDictionaryBuilder.class.getName().equals(cls)
-                    || SegmentAppendTrieDictBuilder.class.getName().equals(cls))
-                globalDictCols.add(dictionaryDesc.getColumnRef());
-        }
-        return globalDictCols;
-    }
+//    public List<TblColRef> getAllGlobalDictColumns() {
+//        List<TblColRef> globalDictCols = new ArrayList<TblColRef>();
+//        List<DictionaryDesc> dictionaryDescList = getDictionaries();
+//
+//        if (dictionaryDescList == null) {
+//            return globalDictCols;
+//        }
+//
+//        for (DictionaryDesc dictionaryDesc : dictionaryDescList) {
+//            String cls = dictionaryDesc.getBuilderClass();
+//            if (GlobalDictionaryBuilder.class.getName().equals(cls)
+//                    || SegmentAppendTrieDictBuilder.class.getName().equals(cls))
+//                globalDictCols.add(dictionaryDesc.getColumnRef());
+//        }
+//        return globalDictCols;
+//    }
 
     // UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns
-    public List<TblColRef> getAllUHCColumns() {
-        List<TblColRef> uhcColumns = new ArrayList<>();
-        uhcColumns.addAll(getAllGlobalDictColumns());
-        uhcColumns.addAll(getShardByColumns());
-        return uhcColumns;
-    }
+//    public List<TblColRef> getAllUHCColumns() {
+//        List<TblColRef> uhcColumns = new ArrayList<>();
+//        uhcColumns.addAll(getAllGlobalDictColumns());
+//        uhcColumns.addAll(getShardByColumns());
+//        return uhcColumns;
+//    }
 
     public String getProject() {
         DataModelDesc modelDesc = getModel();
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/SnapshotTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/SnapshotTableDesc.java
index 30f533b..28861a1 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/SnapshotTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/SnapshotTableDesc.java
@@ -18,8 +18,6 @@
 
 package org.apache.kylin.cube.model;
 
-import org.apache.kylin.dict.lookup.SnapshotTable;
-
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -33,8 +31,8 @@ SnapshotTableDesc implements java.io.Serializable{
     private String tableName;
 
     @JsonProperty("storage_type")
-    private String storageType = SnapshotTable.STORAGE_TYPE_METASTORE;
-
+    private String storageType = "Parquet";
+    
     @JsonProperty("local_cache_enable")
     private boolean enableLocalCache = true;
 
@@ -65,10 +63,6 @@ SnapshotTableDesc implements java.io.Serializable{
         this.global = global;
     }
 
-    public boolean isExtSnapshotTable() {
-        return !SnapshotTable.STORAGE_TYPE_METASTORE.equals(storageType);
-    }
-
     public boolean isEnableLocalCache() {
         return enableLocalCache;
     }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
index 9023f28..04535fd 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
@@ -30,7 +30,6 @@ import org.apache.kylin.cube.model.RowKeyDesc;
 import org.apache.kylin.cube.model.validation.IValidatorRule;
 import org.apache.kylin.cube.model.validation.ResultLevel;
 import org.apache.kylin.cube.model.validation.ValidateContext;
-import org.apache.kylin.dict.GlobalDictionaryBuilder;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -92,11 +91,6 @@ public class DictionaryRule implements IValidatorRule<CubeDesc> {
                 }
             }
 
-            if (StringUtils.isNotEmpty(builderClass) && builderClass.equalsIgnoreCase(GlobalDictionaryBuilder.class.getName()) && dimensionColumns.contains(dictCol) && rowKeyDesc.isUseDictionary(dictCol)) {
-                context.addResult(ResultLevel.ERROR, ERROR_GLOBAL_DICTIONNARY_ONLY_MEASURE + dictCol);
-                return;
-            }
-
             if (reuseCol != null) {
                 reuseDictionaries.add(dictDesc);
             } else {
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
index c6d0c00..7c3958c 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
@@ -18,150 +18,150 @@
 
 package org.apache.kylin.cube.util;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
-import org.apache.kylin.dict.DictionaryGenerator;
-import org.apache.kylin.dict.DictionaryInfo;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
-import org.apache.kylin.measure.hllc.HLLCounter;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.IReadableTable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kylin.shaded.com.google.common.collect.HashMultimap;
-import org.apache.kylin.shaded.com.google.common.collect.Maps;
-import org.apache.kylin.shaded.com.google.common.hash.HashFunction;
-import org.apache.kylin.shaded.com.google.common.hash.Hasher;
-import org.apache.kylin.shaded.com.google.common.hash.Hashing;
+//import java.io.IOException;
+//import java.util.HashMap;
+//import java.util.List;
+//import java.util.Locale;
+//import java.util.Map;
+//import java.util.Set;
+//
+//import org.apache.kylin.common.util.Dictionary;
+//import org.apache.kylin.cube.CubeInstance;
+//import org.apache.kylin.cube.CubeSegment;
+//import org.apache.kylin.cube.cuboid.Cuboid;
+//import org.apache.kylin.cube.model.CubeDesc;
+//import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+//import org.apache.kylin.dict.DictionaryGenerator;
+//import org.apache.kylin.dict.DictionaryInfo;
+//import org.apache.kylin.dict.DictionaryManager;
+//import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
+//import org.apache.kylin.measure.hllc.HLLCounter;
+//import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+//import org.apache.kylin.metadata.model.TblColRef;
+//import org.apache.kylin.source.IReadableTable;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//import org.apache.kylin.shaded.com.google.common.collect.HashMultimap;
+//import org.apache.kylin.shaded.com.google.common.collect.Maps;
+//import org.apache.kylin.shaded.com.google.common.hash.HashFunction;
+//import org.apache.kylin.shaded.com.google.common.hash.Hasher;
+//import org.apache.kylin.shaded.com.google.common.hash.Hashing;
 
 /**
  */
 public class CubingUtils {
-
-    private static Logger logger = LoggerFactory.getLogger(CubingUtils.class);
-
-    public static Map<Long, HLLCounter> sampling(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDescIn,
-            Iterable<List<String>> streams) {
-        final CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(flatDescIn, cubeDesc);
-        final int rowkeyLength = cubeDesc.getRowkey().getRowKeyColumns().length;
-        final Set<Long> allCuboidIds = cubeDesc.getInitialCuboidScheduler().getAllCuboidIds();
-        final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMap();
-
-        final Map<Long, HLLCounter> result = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
-        for (Long cuboidId : allCuboidIds) {
-            result.put(cuboidId, new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision()));
-            Integer[] cuboidBitSet = new Integer[Long.bitCount(cuboidId)];
-
-            long mask = Long.highestOneBit(baseCuboidId);
-            int position = 0;
-            for (int i = 0; i < rowkeyLength; i++) {
-                if ((mask & cuboidId) > 0) {
-                    cuboidBitSet[position] = i;
-                    position++;
-                }
-                mask = mask >> 1;
-            }
-            allCuboidsBitSet.put(cuboidId, cuboidBitSet);
-        }
-
-        HashFunction hf = Hashing.murmur3_32();
-        byte[][] row_hashcodes = new byte[rowkeyLength][];
-        for (List<String> row : streams) {
-            //generate hash for each row key column
-            for (int i = 0; i < rowkeyLength; i++) {
-                Hasher hc = hf.newHasher();
-                final String cell = row.get(flatDesc.getRowKeyColumnIndexes()[i]);
-                if (cell != null) {
-                    row_hashcodes[i] = hc.putUnencodedChars(cell).hash().asBytes();
-                } else {
-                    row_hashcodes[i] = hc.putInt(0).hash().asBytes();
-                }
-            }
-
-            for (Map.Entry<Long, HLLCounter> longHyperLogLogPlusCounterNewEntry : result.entrySet()) {
-                Long cuboidId = longHyperLogLogPlusCounterNewEntry.getKey();
-                HLLCounter counter = longHyperLogLogPlusCounterNewEntry.getValue();
-                Hasher hc = hf.newHasher();
-                final Integer[] cuboidBitSet = allCuboidsBitSet.get(cuboidId);
-                for (int position = 0; position < cuboidBitSet.length; position++) {
-                    hc.putBytes(row_hashcodes[cuboidBitSet[position]]);
-                }
-                counter.add(hc.hash().asBytes());
-            }
-        }
-        return result;
-    }
-
-    public static Map<TblColRef, Dictionary<String>> buildDictionary(final CubeInstance cubeInstance,
-            Iterable<List<String>> recordList) throws IOException {
-        final List<TblColRef> columnsNeedToBuildDictionary = cubeInstance.getDescriptor()
-                .listDimensionColumnsExcludingDerived(true);
-        final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap();
-        int index = 0;
-        for (TblColRef column : columnsNeedToBuildDictionary) {
-            tblColRefMap.put(index++, column);
-        }
-
-        HashMap<TblColRef, Dictionary<String>> result = Maps.newHashMap();
-
-        HashMultimap<TblColRef, String> valueMap = HashMultimap.create();
-        for (List<String> row : recordList) {
-            for (int i = 0; i < row.size(); i++) {
-                String cell = row.get(i);
-                if (tblColRefMap.containsKey(i)) {
-                    valueMap.put(tblColRefMap.get(i), cell);
-                }
-            }
-        }
-        for (TblColRef tblColRef : valueMap.keySet()) {
-            Set<String> values = valueMap.get(tblColRef);
-            Dictionary<String> dict = DictionaryGenerator.buildDictionary(tblColRef.getType(),
-                    new IterableDictionaryValueEnumerator(values));
-            result.put(tblColRef, dict);
-        }
-        return result;
-    }
-
-    @SuppressWarnings("unchecked")
-    public static Map<TblColRef, Dictionary<String>> writeDictionary(CubeSegment cubeSegment,
-            Map<TblColRef, Dictionary<String>> dictionaryMap, long startOffset, long endOffset) {
-        Map<TblColRef, Dictionary<String>> realDictMap = Maps.newHashMap();
-
-        for (Map.Entry<TblColRef, Dictionary<String>> entry : dictionaryMap.entrySet()) {
-            final TblColRef tblColRef = entry.getKey();
-            final Dictionary<String> dictionary = entry.getValue();
-            IReadableTable.TableSignature signature = new IReadableTable.TableSignature();
-            signature.setLastModifiedTime(System.currentTimeMillis());
-            signature.setPath(String.format(Locale.ROOT, "streaming_%s_%s", startOffset, endOffset));
-            signature.setSize(endOffset - startOffset);
-            DictionaryInfo dictInfo = new DictionaryInfo(tblColRef.getColumnDesc(), tblColRef.getDatatype(), signature);
-            logger.info("writing dictionary for TblColRef:" + tblColRef.toString());
-            DictionaryManager dictionaryManager = DictionaryManager.getInstance(cubeSegment.getCubeDesc().getConfig());
-            try {
-                DictionaryInfo realDict = dictionaryManager.trySaveNewDict(dictionary, dictInfo);
-                cubeSegment.putDictResPath(tblColRef, realDict.getResourcePath());
-                realDictMap.put(tblColRef, (Dictionary<String>) realDict.getDictionaryObject());
-            } catch (IOException e) {
-                throw new RuntimeException("error save dictionary for column:" + tblColRef, e);
-            }
-        }
-
-        return realDictMap;
-    }
-
+//
+//    private static Logger logger = LoggerFactory.getLogger(CubingUtils.class);
+//
+//    public static Map<Long, HLLCounter> sampling(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDescIn,
+//            Iterable<List<String>> streams) {
+//        final CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(flatDescIn, cubeDesc);
+//        final int rowkeyLength = cubeDesc.getRowkey().getRowKeyColumns().length;
+//        final Set<Long> allCuboidIds = cubeDesc.getInitialCuboidScheduler().getAllCuboidIds();
+//        final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+//        final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMap();
+//
+//        final Map<Long, HLLCounter> result = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
+//        for (Long cuboidId : allCuboidIds) {
+//            result.put(cuboidId, new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision()));
+//            Integer[] cuboidBitSet = new Integer[Long.bitCount(cuboidId)];
+//
+//            long mask = Long.highestOneBit(baseCuboidId);
+//            int position = 0;
+//            for (int i = 0; i < rowkeyLength; i++) {
+//                if ((mask & cuboidId) > 0) {
+//                    cuboidBitSet[position] = i;
+//                    position++;
+//                }
+//                mask = mask >> 1;
+//            }
+//            allCuboidsBitSet.put(cuboidId, cuboidBitSet);
+//        }
+//
+//        HashFunction hf = Hashing.murmur3_32();
+//        byte[][] row_hashcodes = new byte[rowkeyLength][];
+//        for (List<String> row : streams) {
+//            //generate hash for each row key column
+//            for (int i = 0; i < rowkeyLength; i++) {
+//                Hasher hc = hf.newHasher();
+//                final String cell = row.get(flatDesc.getRowKeyColumnIndexes()[i]);
+//                if (cell != null) {
+//                    row_hashcodes[i] = hc.putUnencodedChars(cell).hash().asBytes();
+//                } else {
+//                    row_hashcodes[i] = hc.putInt(0).hash().asBytes();
+//                }
+//            }
+//
+//            for (Map.Entry<Long, HLLCounter> longHyperLogLogPlusCounterNewEntry : result.entrySet()) {
+//                Long cuboidId = longHyperLogLogPlusCounterNewEntry.getKey();
+//                HLLCounter counter = longHyperLogLogPlusCounterNewEntry.getValue();
+//                Hasher hc = hf.newHasher();
+//                final Integer[] cuboidBitSet = allCuboidsBitSet.get(cuboidId);
+//                for (int position = 0; position < cuboidBitSet.length; position++) {
+//                    hc.putBytes(row_hashcodes[cuboidBitSet[position]]);
+//                }
+//                counter.add(hc.hash().asBytes());
+//            }
+//        }
+//        return result;
+//    }
+//
+//    public static Map<TblColRef, Dictionary<String>> buildDictionary(final CubeInstance cubeInstance,
+//            Iterable<List<String>> recordList) throws IOException {
+//        final List<TblColRef> columnsNeedToBuildDictionary = cubeInstance.getDescriptor()
+//                .listDimensionColumnsExcludingDerived(true);
+//        final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap();
+//        int index = 0;
+//        for (TblColRef column : columnsNeedToBuildDictionary) {
+//            tblColRefMap.put(index++, column);
+//        }
+//
+//        HashMap<TblColRef, Dictionary<String>> result = Maps.newHashMap();
+//
+//        HashMultimap<TblColRef, String> valueMap = HashMultimap.create();
+//        for (List<String> row : recordList) {
+//            for (int i = 0; i < row.size(); i++) {
+//                String cell = row.get(i);
+//                if (tblColRefMap.containsKey(i)) {
+//                    valueMap.put(tblColRefMap.get(i), cell);
+//                }
+//            }
+//        }
+//        for (TblColRef tblColRef : valueMap.keySet()) {
+//            Set<String> values = valueMap.get(tblColRef);
+//            Dictionary<String> dict = DictionaryGenerator.buildDictionary(tblColRef.getType(),
+//                    new IterableDictionaryValueEnumerator(values));
+//            result.put(tblColRef, dict);
+//        }
+//        return result;
+//    }
+//
+//    @SuppressWarnings("unchecked")
+//    public static Map<TblColRef, Dictionary<String>> writeDictionary(CubeSegment cubeSegment,
+//            Map<TblColRef, Dictionary<String>> dictionaryMap, long startOffset, long endOffset) {
+//        Map<TblColRef, Dictionary<String>> realDictMap = Maps.newHashMap();
+//
+//        for (Map.Entry<TblColRef, Dictionary<String>> entry : dictionaryMap.entrySet()) {
+//            final TblColRef tblColRef = entry.getKey();
+//            final Dictionary<String> dictionary = entry.getValue();
+//            IReadableTable.TableSignature signature = new IReadableTable.TableSignature();
+//            signature.setLastModifiedTime(System.currentTimeMillis());
+//            signature.setPath(String.format(Locale.ROOT, "streaming_%s_%s", startOffset, endOffset));
+//            signature.setSize(endOffset - startOffset);
+//            DictionaryInfo dictInfo = new DictionaryInfo(tblColRef.getColumnDesc(), tblColRef.getDatatype(), signature);
+//            logger.info("writing dictionary for TblColRef:" + tblColRef.toString());
+//            DictionaryManager dictionaryManager = DictionaryManager.getInstance(cubeSegment.getCubeDesc().getConfig());
+//            try {
+//                DictionaryInfo realDict = dictionaryManager.trySaveNewDict(dictionary, dictInfo);
+//                cubeSegment.putDictResPath(tblColRef, realDict.getResourcePath());
+//                realDictMap.put(tblColRef, (Dictionary<String>) realDict.getDictionaryObject());
+//            } catch (IOException e) {
+//                throw new RuntimeException("error save dictionary for column:" + tblColRef, e);
+//            }
+//        }
+//
+//        return realDictMap;
+//    }
+//
 }
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java b/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
index de436d0..90cabfa 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
@@ -19,7 +19,6 @@
 package org.apache.kylin.cube.model.validation.rule;
 
 import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_DUPLICATE_DICTIONARY_COLUMN;
-import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_GLOBAL_DICTIONNARY_ONLY_MEASURE;
 import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_REUSE_BUILDER_BOTH_EMPTY;
 import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_REUSE_BUILDER_BOTH_SET;
 import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_TRANSITIVE_REUSE;
@@ -37,7 +36,6 @@ import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.DictionaryDesc;
 import org.apache.kylin.cube.model.validation.ValidateContext;
-import org.apache.kylin.dict.GlobalDictionaryBuilder;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -75,7 +73,6 @@ public class DictionaryRuleTest extends LocalFileMetadataTestCase {
     @Test
     public void testBadDesc() throws IOException {
         testDictionaryDesc(ERROR_DUPLICATE_DICTIONARY_COLUMN, DictionaryDesc.create("ORDER_ID", null, "FakeBuilderClass"));
-        testDictionaryDesc(ERROR_DUPLICATE_DICTIONARY_COLUMN, DictionaryDesc.create("ORDER_ID", null, GlobalDictionaryBuilder.class.getName()));
     }
 
     @Test
@@ -95,17 +92,6 @@ public class DictionaryRuleTest extends LocalFileMetadataTestCase {
                 DictionaryDesc.create("price", "lstg_site_id", null));
     }
 
-    @Test
-    public void testBadDesc5() throws IOException {
-        testDictionaryDesc(ERROR_GLOBAL_DICTIONNARY_ONLY_MEASURE,
-                DictionaryDesc.create("CATEG_LVL2_NAME", null, GlobalDictionaryBuilder.class.getName()));
-    }
-
-    @Test
-    public void testGoodDesc2() throws IOException {
-        testDictionaryDesc(null, DictionaryDesc.create("SELLER_ID", null, GlobalDictionaryBuilder.class.getName()));
-    }
-
     private void testDictionaryDesc(String expectMessage, DictionaryDesc... descs) throws IOException {
         DictionaryRule rule = new DictionaryRule();
         File f = new File(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/cube_desc/test_kylin_cube_without_slr_left_join_desc.json");
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
index a0730ff..f132569 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
@@ -40,196 +40,196 @@ import com.google.common.collect.Lists;
  */
 @SuppressWarnings({ "rawtypes", "unchecked" })
 public class DictionaryGenerator {
-
-    private static final Logger logger = LoggerFactory.getLogger(DictionaryGenerator.class);
-
-    public static IDictionaryBuilder newDictionaryBuilder(DataType dataType) {
-        Preconditions.checkNotNull(dataType, "dataType cannot be null");
-
-        // build dict, case by data type
-        IDictionaryBuilder builder;
-        boolean useForest = KylinConfig.getInstanceFromEnv().isUseForestTrieDictionary();
-        if (dataType.isNumberFamily())
-            builder = useForest ? new NumberTrieDictForestBuilder() : new NumberTrieDictBuilder();
-        else
-            builder = useForest ? new StringTrieDictForestBuilder() : new StringTrieDictBuilder();
-        return builder;
-    }
-
-    public static Dictionary<String> buildDictionary(DataType dataType, IDictionaryValueEnumerator valueEnumerator)
-            throws IOException {
-        return buildDictionary(newDictionaryBuilder(dataType), null, valueEnumerator);
-    }
-
-    static Dictionary<String> buildDictionary(IDictionaryBuilder builder, DictionaryInfo dictInfo,
-            IDictionaryValueEnumerator valueEnumerator) throws IOException {
-        int baseId = 0; // always 0 for now
-        int nSamples = 5;
-        ArrayList<String> samples = new ArrayList<String>(nSamples);
-
-        // init the builder
-        builder.init(dictInfo, baseId, null);
-
-        // add values
-        try {
-            while (valueEnumerator.moveNext()) {
-                String value = valueEnumerator.current();
-
-                boolean accept = builder.addValue(value);
-
-                if (accept && samples.size() < nSamples && samples.contains(value) == false)
-                    samples.add(value);
-            }
-        } catch (IOException e) {
-            logger.error("Error during adding dict value.", e);
-            builder.clear();
-            throw e;
-        }
-
-        // build
-        Dictionary<String> dict = builder.build();
-        logger.debug("Dictionary cardinality: " + dict.getSize());
-        logger.debug("Dictionary builder class: " + builder.getClass().getName());
-        logger.debug("Dictionary class: " + dict.getClass().getName());
-        // log a few samples
-        StringBuilder buf = new StringBuilder();
-        for (String s : samples) {
-            if (buf.length() > 0) {
-                buf.append(", ");
-            }
-            buf.append(s.toString()).append("=>").append(dict.getIdFromValue(s));
-        }
-        logger.debug("Dictionary value samples: " + buf.toString());
-
-        return dict;
-    }
-
-    public static Dictionary mergeDictionaries(DataType dataType, List<DictionaryInfo> sourceDicts) throws IOException {
-        List<Dictionary<String>> dictList = Lists.transform(sourceDicts, new Function<DictionaryInfo, Dictionary<String>>() {
-            @Nullable
-            @Override
-            public Dictionary<String> apply(@Nullable DictionaryInfo input) {
-                return input.dictionaryObject;
-            }
-        });
-        return buildDictionary(dataType, new MultipleDictionaryValueEnumerator(dataType, dictList));
-    }
-
-    private static class StringTrieDictBuilder implements IDictionaryBuilder {
-        int baseId;
-        TrieDictionaryBuilder builder;
-
-        @Override
-        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
-            this.baseId = baseId;
-            this.builder = new TrieDictionaryBuilder(new StringBytesConverter());
-        }
-
-        @Override
-        public boolean addValue(String value) {
-            if (value == null)
-                return false;
-
-            builder.addValue(value);
-            return true;
-        }
-
-        @Override
-        public Dictionary<String> build() throws IOException {
-            return builder.build(baseId);
-        }
-
-        @Override
-        public void clear() {
-
-        }
-    }
-
-    private static class StringTrieDictForestBuilder implements IDictionaryBuilder {
-        TrieDictionaryForestBuilder builder;
-
-        @Override
-        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
-            builder = new TrieDictionaryForestBuilder(new StringBytesConverter(), baseId);
-        }
-
-        @Override
-        public boolean addValue(String value) {
-            if (value == null)
-                return false;
-
-            builder.addValue(value);
-            return true;
-        }
-
-        @Override
-        public Dictionary<String> build() throws IOException {
-            return builder.build();
-        }
-
-        @Override
-        public void clear() {
-
-        }
-    }
-
-    @SuppressWarnings("deprecation")
-    private static class NumberTrieDictBuilder implements IDictionaryBuilder {
-        int baseId;
-        NumberDictionaryBuilder builder;
-
-        @Override
-        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
-            this.baseId = baseId;
-            this.builder = new NumberDictionaryBuilder();
-        }
-
-        @Override
-        public boolean addValue(String value) {
-            if (StringUtils.isBlank(value)) // empty string is treated as null
-                return false;
-
-            builder.addValue(value);
-            return true;
-        }
-
-        @Override
-        public Dictionary<String> build() throws IOException {
-            return builder.build(baseId);
-        }
-
-        @Override
-        public void clear() {
-
-        }
-    }
-
-    private static class NumberTrieDictForestBuilder implements IDictionaryBuilder {
-        NumberDictionaryForestBuilder builder;
-
-        @Override
-        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
-            builder = new NumberDictionaryForestBuilder(baseId);
-        }
-
-        @Override
-        public boolean addValue(String value) {
-            if (StringUtils.isBlank(value)) // empty string is treated as null
-                return false;
-
-            builder.addValue(value);
-            return true;
-        }
-
-        @Override
-        public Dictionary<String> build() throws IOException {
-            return builder.build();
-        }
-
-        @Override
-        public void clear() {
-
-        }
-    }
+//
+//    private static final Logger logger = LoggerFactory.getLogger(DictionaryGenerator.class);
+//
+//    public static IDictionaryBuilder newDictionaryBuilder(DataType dataType) {
+//        Preconditions.checkNotNull(dataType, "dataType cannot be null");
+//
+//        // build dict, case by data type
+//        IDictionaryBuilder builder;
+//        boolean useForest = KylinConfig.getInstanceFromEnv().isUseForestTrieDictionary();
+//        if (dataType.isNumberFamily())
+//            builder = useForest ? new NumberTrieDictForestBuilder() : new NumberTrieDictBuilder();
+//        else
+//            builder = useForest ? new StringTrieDictForestBuilder() : new StringTrieDictBuilder();
+//        return builder;
+//    }
+//
+//    public static Dictionary<String> buildDictionary(DataType dataType, IDictionaryValueEnumerator valueEnumerator)
+//            throws IOException {
+//        return buildDictionary(newDictionaryBuilder(dataType), null, valueEnumerator);
+//    }
+//
+//    static Dictionary<String> buildDictionary(IDictionaryBuilder builder, DictionaryInfo dictInfo,
+//            IDictionaryValueEnumerator valueEnumerator) throws IOException {
+//        int baseId = 0; // always 0 for now
+//        int nSamples = 5;
+//        ArrayList<String> samples = new ArrayList<String>(nSamples);
+//
+//        // init the builder
+//        builder.init(dictInfo, baseId, null);
+//
+//        // add values
+//        try {
+//            while (valueEnumerator.moveNext()) {
+//                String value = valueEnumerator.current();
+//
+//                boolean accept = builder.addValue(value);
+//
+//                if (accept && samples.size() < nSamples && samples.contains(value) == false)
+//                    samples.add(value);
+//            }
+//        } catch (IOException e) {
+//            logger.error("Error during adding dict value.", e);
+//            builder.clear();
+//            throw e;
+//        }
+//
+//        // build
+//        Dictionary<String> dict = builder.build();
+//        logger.debug("Dictionary cardinality: " + dict.getSize());
+//        logger.debug("Dictionary builder class: " + builder.getClass().getName());
+//        logger.debug("Dictionary class: " + dict.getClass().getName());
+//        // log a few samples
+//        StringBuilder buf = new StringBuilder();
+//        for (String s : samples) {
+//            if (buf.length() > 0) {
+//                buf.append(", ");
+//            }
+//            buf.append(s.toString()).append("=>").append(dict.getIdFromValue(s));
+//        }
+//        logger.debug("Dictionary value samples: " + buf.toString());
+//
+//        return dict;
+//    }
+//
+//    public static Dictionary mergeDictionaries(DataType dataType, List<DictionaryInfo> sourceDicts) throws IOException {
+//        List<Dictionary<String>> dictList = Lists.transform(sourceDicts, new Function<DictionaryInfo, Dictionary<String>>() {
+//            @Nullable
+//            @Override
+//            public Dictionary<String> apply(@Nullable DictionaryInfo input) {
+//                return input.dictionaryObject;
+//            }
+//        });
+//        return buildDictionary(dataType, new MultipleDictionaryValueEnumerator(dataType, dictList));
+//    }
+//
+//    private static class StringTrieDictBuilder implements IDictionaryBuilder {
+//        int baseId;
+//        TrieDictionaryBuilder builder;
+//
+//        @Override
+//        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
+//            this.baseId = baseId;
+//            this.builder = new TrieDictionaryBuilder(new StringBytesConverter());
+//        }
+//
+//        @Override
+//        public boolean addValue(String value) {
+//            if (value == null)
+//                return false;
+//
+//            builder.addValue(value);
+//            return true;
+//        }
+//
+//        @Override
+//        public Dictionary<String> build() throws IOException {
+//            return builder.build(baseId);
+//        }
+//
+//        @Override
+//        public void clear() {
+//
+//        }
+//    }
+//
+//    private static class StringTrieDictForestBuilder implements IDictionaryBuilder {
+//        TrieDictionaryForestBuilder builder;
+//
+//        @Override
+//        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
+//            builder = new TrieDictionaryForestBuilder(new StringBytesConverter(), baseId);
+//        }
+//
+//        @Override
+//        public boolean addValue(String value) {
+//            if (value == null)
+//                return false;
+//
+//            builder.addValue(value);
+//            return true;
+//        }
+//
+//        @Override
+//        public Dictionary<String> build() throws IOException {
+//            return builder.build();
+//        }
+//
+//        @Override
+//        public void clear() {
+//
+//        }
+//    }
+//
+//    @SuppressWarnings("deprecation")
+//    private static class NumberTrieDictBuilder implements IDictionaryBuilder {
+//        int baseId;
+//        NumberDictionaryBuilder builder;
+//
+//        @Override
+//        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
+//            this.baseId = baseId;
+//            this.builder = new NumberDictionaryBuilder();
+//        }
+//
+//        @Override
+//        public boolean addValue(String value) {
+//            if (StringUtils.isBlank(value)) // empty string is treated as null
+//                return false;
+//
+//            builder.addValue(value);
+//            return true;
+//        }
+//
+//        @Override
+//        public Dictionary<String> build() throws IOException {
+//            return builder.build(baseId);
+//        }
+//
+//        @Override
+//        public void clear() {
+//
+//        }
+//    }
+//
+//    private static class NumberTrieDictForestBuilder implements IDictionaryBuilder {
+//        NumberDictionaryForestBuilder builder;
+//
+//        @Override
+//        public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
+//            builder = new NumberDictionaryForestBuilder(baseId);
+//        }
+//
+//        @Override
+//        public boolean addValue(String value) {
+//            if (StringUtils.isBlank(value)) // empty string is treated as null
+//                return false;
+//
+//            builder.addValue(value);
+//            return true;
+//        }
+//
+//        @Override
+//        public Dictionary<String> build() throws IOException {
+//            return builder.build();
+//        }
+//
+//        @Override
+//        public void clear() {
+//
+//        }
+//    }
 
 }
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
index 9d591b5..c72681b 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -63,7 +63,7 @@ public class SnapshotManager {
                 SnapshotManager.logger.info("Snapshot with resource path {} is removed due to {}",
                         notification.getKey(), notification.getCause());
             }
-        }).maximumSize(config.getCachedSnapshotMaxEntrySize())//
+        }).maximumSize(100L)//
                 .expireAfterWrite(1, TimeUnit.DAYS).build(new CacheLoader<String, SnapshotTable>() {
                     @Override
                     public SnapshotTable load(String key) throws Exception {
@@ -173,11 +173,11 @@ public class SnapshotManager {
             KylinConfig cubeConfig) throws IOException {
         String dup = checkDupByInfo(snapshot);
 
-        if ((float) snapshot.getSignature().getSize() / 1024 / 1024 > cubeConfig.getTableSnapshotMaxMB()) {
-            throw new IllegalStateException(
-                    "Table snapshot should be no greater than " + cubeConfig.getTableSnapshotMaxMB() //
-                            + " MB, but " + tableDesc + " size is " + snapshot.getSignature().getSize());
-        }
+//        if ((float) snapshot.getSignature().getSize() / 1024 / 1024 > cubeConfig.getTableSnapshotMaxMB()) {
+//            throw new IllegalStateException(
+//                    "Table snapshot should be no greater than " + cubeConfig.getTableSnapshotMaxMB() //
+//                            + " MB, but " + tableDesc + " size is " + snapshot.getSignature().getSize());
+//        }
 
         if (dup != null) {
             logger.info("Identical input {}, reuse existing snapshot at {}", table.getSignature(), dup);
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 2c4bc6a..31cc081 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -18,7 +18,6 @@
 
 package org.apache.kylin.job;
 
-import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -32,7 +31,6 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.RowKeyColDesc;
-import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.JoinDesc;
@@ -113,16 +111,16 @@ public class JoinedFlatTable {
             kylinConfig = (flatDesc.getSegment()).getConfig();
         }
 
-        if (kylinConfig.isAdvancedFlatTableUsed()) {
-            try {
-                Class advancedFlatTable = Class.forName(kylinConfig.getAdvancedFlatTableClass());
-                Method method = advancedFlatTable.getMethod("generateInsertDataStatement", IJoinedFlatTableDesc.class,
-                        JobEngineConfig.class);
-                return (String) method.invoke(null, flatDesc);
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
+//        if (kylinConfig.isAdvancedFlatTableUsed()) {
+//            try {
+//                Class advancedFlatTable = Class.forName(kylinConfig.getAdvancedFlatTableClass());
+//                Method method = advancedFlatTable.getMethod("generateInsertDataStatement", IJoinedFlatTableDesc.class,
+//                        JobEngineConfig.class);
+//                return (String) method.invoke(null, flatDesc);
+//            } catch (Exception e) {
+//                throw new RuntimeException(e);
+//            }
+//        }
 
         return "INSERT OVERWRITE TABLE " + quoteIdentifier(flatDesc.getTableName(), null) + " " + generateSelectDataStatement(flatDesc)
                 + ";\n";
diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/DimensionEncodingFactory.java b/core-metadata/src/main/java/org/apache/kylin/dimension/DimensionEncodingFactory.java
index 422a802..937c1ad 100644
--- a/core-metadata/src/main/java/org/apache/kylin/dimension/DimensionEncodingFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/dimension/DimensionEncodingFactory.java
@@ -32,6 +32,7 @@ import org.apache.kylin.shaded.com.google.common.base.Predicate;
 import org.apache.kylin.shaded.com.google.common.collect.Iterables;
 import org.apache.kylin.shaded.com.google.common.collect.Maps;
 
+@Deprecated
 public abstract class DimensionEncodingFactory {
 
     private static final Logger logger = LoggerFactory.getLogger(DimensionEncodingFactory.class);
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
index acdc2e9..55bdb4b 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java
@@ -20,7 +20,6 @@ package org.apache.kylin.storage;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.threadlocal.InternalThreadLocal;
-import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.ImplementationSwitch;
 import org.apache.kylin.metadata.model.IStorageAware;
 import org.apache.kylin.metadata.realization.IRealization;
@@ -32,19 +31,7 @@ public class StorageFactory {
     // Use thread-local because KylinConfig can be thread-local and implementation might be different among multiple threads.
     private static InternalThreadLocal<ImplementationSwitch<IStorage>> storages = new InternalThreadLocal<>();
 
-    private static IStorage configuredUseLocalStorage;
-
-    static {
-        String localStorageImpl = KylinConfig.getInstanceFromEnv().getLocalStorageImpl();
-        if (localStorageImpl != null){
-            configuredUseLocalStorage = (IStorage) ClassUtil.newInstance(localStorageImpl);
-        }
-    }
-
     public static IStorage storage(IStorageAware aware) {
-        if (configuredUseLocalStorage != null) {
-            return configuredUseLocalStorage;
-        }
         ImplementationSwitch<IStorage> current = storages.get();
         if (storages.get() == null) {
             current = new ImplementationSwitch<>(KylinConfig.getInstanceFromEnv().getStorageEngines(), IStorage.class);
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
index e6e0737..3fa104d 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
@@ -28,7 +28,6 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
@@ -80,9 +79,9 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
             TupleFilter havingFilter, StorageContext context) {
         this.context = context;
 
-        this.maxScanRanges = cubeSegment.getConfig().getQueryStorageVisitScanRangeMax();
-        this.maxFuzzyKeysPerSplit = cubeSegment.getConfig().getQueryScanFuzzyKeyMax();
-        this.maxFuzzyKeys = maxFuzzyKeysPerSplit * cubeSegment.getConfig().getQueryScanFuzzyKeySplitMax();
+//        this.maxScanRanges = cubeSegment.getConfig().getQueryStorageVisitScanRangeMax();
+//        this.maxFuzzyKeysPerSplit = cubeSegment.getConfig().getQueryScanFuzzyKeyMax();
+//        this.maxFuzzyKeys = maxFuzzyKeysPerSplit * cubeSegment.getConfig().getQueryScanFuzzyKeySplitMax();
 
         this.cubeSegment = cubeSegment;
         this.cubeDesc = cubeSegment.getCubeDesc();
@@ -148,9 +147,9 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
      */
     public CubeScanRangePlanner(GTInfo info, TblColRef gtPartitionCol, TupleFilter gtFilter) {
 
-        this.maxScanRanges = KylinConfig.getInstanceFromEnv().getQueryStorageVisitScanRangeMax();
-        this.maxFuzzyKeysPerSplit = KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeyMax();
-        this.maxFuzzyKeys = maxFuzzyKeysPerSplit * KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeySplitMax();
+//        this.maxScanRanges = KylinConfig.getInstanceFromEnv().getQueryStorageVisitScanRangeMax();
+//        this.maxFuzzyKeysPerSplit = KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeyMax();
+//        this.maxFuzzyKeys = maxFuzzyKeysPerSplit * KylinConfig.getInstanceFromEnv().getQueryScanFuzzyKeySplitMax();
 
         this.gtInfo = info;
 
@@ -175,7 +174,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
                     .setRtAggrMetrics(gtRtAggrMetrics).setDynamicColumns(gtDynColumns)
                     .setExprsPushDown(tupleExpressionMap)//
                     .setAllowStorageAggregation(context.isNeedStorageAggregation())
-                    .setAggCacheMemThreshold(cubeSegment.getConfig().getQueryCoprocessorMemGB())//
+                    .setAggCacheMemThreshold(0)//
                     .setStoragePushDownLimit(context.getFinalPushDownLimit())
                     .setStorageLimitLevel(context.getStorageLimitLevel()).setHavingFilterPushDown(havingFilter)
                     .createGTScanRequest();
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
index 3adbb8e..b76fe47 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
@@ -26,12 +26,10 @@ import java.util.Set;
 
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.dict.BuiltInFunctionTransformer;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.metadata.expression.TupleExpression;
-import org.apache.kylin.metadata.filter.ITupleFilterTransformer;
 import org.apache.kylin.metadata.filter.StringCodeSystem;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilterSerializer;
@@ -68,10 +66,6 @@ public class CubeSegmentScanner implements Iterable<GTRecord> {
         byte[] serialize = TupleFilterSerializer.serialize(originalfilter, StringCodeSystem.INSTANCE);
         TupleFilter filter = TupleFilterSerializer.deserialize(serialize, StringCodeSystem.INSTANCE);
 
-        // translate FunctionTupleFilter to IN clause
-        ITupleFilterTransformer translator = new BuiltInFunctionTransformer(cubeSeg.getDimensionEncodingMap());
-        filter = translator.transform(filter);
-
         CubeScanRangePlanner scanRangePlanner;
         try {
             scanRangePlanner = new CubeScanRangePlanner(cubeSeg, cuboid, filter, dimensions, groups, dynGroups,
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
index 4fb71bb..db08e62 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
@@ -36,7 +36,6 @@ import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.cube.model.RowKeyColDesc;
 import org.apache.kylin.cube.model.RowKeyDesc;
-import org.apache.kylin.dict.lookup.ILookupTable;
 import org.apache.kylin.dimension.TimeDerivedColumnType;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
@@ -45,6 +44,7 @@ import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.tuple.Tuple;
 import org.apache.kylin.metadata.tuple.TupleInfo;
+import org.apache.kylin.cube.ILookupTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -97,7 +97,7 @@ public class CubeTupleConverter implements ITupleConverter {
         advMeasureFillers = Lists.newArrayListWithCapacity(1);
         advMeasureIndexInGTValues = Lists.newArrayListWithCapacity(1);
         usedLookupTables = Lists.newArrayList();
-        eventTimezone = cubeSeg.getConfig().getStreamingDerivedTimeTimezone();
+//        eventTimezone = cubeSeg.getConfig().getStreamingDerivedTimeTimezone();
         autoJustByTimezone = eventTimezone.length() > 0
                 && cubeSeg.getCubeDesc().getModel().getRootFactTable().getTableDesc().isStreamingTable();
         if (autoJustByTimezone) {
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 804ce3f..63ef36d 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -33,6 +33,7 @@ import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.ILookupTable;
 import org.apache.kylin.cube.RawQueryLastHacker;
 import org.apache.kylin.cube.common.SegmentPruner;
 import org.apache.kylin.cube.cuboid.Cuboid;
@@ -41,7 +42,6 @@ import org.apache.kylin.cube.gridtable.CuboidToGridTableMappingExt;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.cube.model.RowKeyColDesc;
-import org.apache.kylin.dict.lookup.ILookupTable;
 import org.apache.kylin.gridtable.StorageLimitLevel;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.measure.bitmap.BitmapMeasureType;
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
index 9bfdd76..fea4550 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
@@ -25,9 +25,9 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Array;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.ILookupTable;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.cube.model.CubeDesc.DeriveType;
-import org.apache.kylin.dict.lookup.ILookupTable;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.datatype.DataTypeOrder;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
index 574bb9f..47a3ba7 100644
--- a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
+++ b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/DictGridTableTest.java
@@ -18,721 +18,673 @@
 
 package org.apache.kylin.storage.gtrecord;
 
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.List;
-
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.BytesSerializer;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.gridtable.CubeCodeSystem;
-import org.apache.kylin.dict.NumberDictionaryForestBuilder;
-import org.apache.kylin.dict.StringBytesConverter;
-import org.apache.kylin.dict.TrieDictionaryBuilder;
-import org.apache.kylin.dimension.DictionaryDimEnc;
-import org.apache.kylin.dimension.DimensionEncoding;
-import org.apache.kylin.gridtable.GTBuilder;
-import org.apache.kylin.gridtable.GTFilterScanner.FilterResultCache;
-import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.gridtable.GTInfo.Builder;
-import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.GTScanRange;
-import org.apache.kylin.gridtable.GTScanRequest;
-import org.apache.kylin.gridtable.GTScanRequestBuilder;
-import org.apache.kylin.gridtable.GTUtil;
-import org.apache.kylin.gridtable.GridTable;
-import org.apache.kylin.gridtable.IGTScanner;
-import org.apache.kylin.gridtable.memstore.GTSimpleMemStore;
-import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.filter.ColumnTupleFilter;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.ConstantTupleFilter;
-import org.apache.kylin.metadata.filter.ExtractTupleFilter;
-import org.apache.kylin.metadata.filter.LogicalTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.model.TblColRef.InnerDataTypeEnum;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.kylin.shaded.com.google.common.collect.Lists;
 
 public class DictGridTableTest extends LocalFileMetadataTestCase {
-
-    private GridTable table;
-    private GTInfo info;
-    private CompareTupleFilter timeComp0;
-    private CompareTupleFilter timeComp1;
-    private CompareTupleFilter timeComp2;
-    private CompareTupleFilter timeComp3;
-    private CompareTupleFilter timeComp4;
-    private CompareTupleFilter timeComp5;
-    private CompareTupleFilter timeComp6;
-    private CompareTupleFilter timeComp7;
-    private CompareTupleFilter ageComp1;
-    private CompareTupleFilter ageComp2;
-    private CompareTupleFilter ageComp3;
-    private CompareTupleFilter ageComp4;
-
-    @After
-    public void after() throws Exception {
-
-        this.cleanupTestMetadata();
-    }
-
-    @Before
-    public void setup() throws IOException {
-
-        this.createTestMetadata();
-
-        table = newTestTable();
-        info = table.getInfo();
-
-        timeComp0 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-14"));
-        timeComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
-        timeComp2 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-13"));
-        timeComp3 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-15"));
-        timeComp4 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-15"));
-        timeComp5 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-15"));
-        timeComp6 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-14"));
-        timeComp7 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "1970-01-01"));
-        ageComp1 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "10"));
-        ageComp2 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "20"));
-        ageComp3 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "30"));
-        ageComp4 = compare(info.colRef(1), FilterOperatorEnum.NEQ, enc(info, 1, "30"));
-
-    }
-
-    @Test
-    public void verifySegmentSkipping() {
-
-        ByteArray segmentStart = enc(info, 0, "2015-01-14");
-        ByteArray segmentStartX = enc(info, 0, "2015-01-14 00:00:00");//when partition col is dict encoded, time format will be free
-        assertEquals(segmentStart, segmentStartX);
-
-        {
-            LogicalTupleFilter filter = and(timeComp0, ageComp1);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-            List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(1, r.size());//scan range are [close,close]
-            assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
-            assertEquals(1, r.get(0).fuzzyKeys.size());
-            assertEquals("[[null, 10, null, null, null]]", r.get(0).fuzzyKeys.toString());
-        }
-        {
-            LogicalTupleFilter filter = and(timeComp2, ageComp1);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-            List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(1, r.size());
-        }
-        {
-            LogicalTupleFilter filter = and(timeComp4, ageComp1);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-            List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(1, r.size());
-        }
-        {
-            LogicalTupleFilter filter = and(timeComp5, ageComp1);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-            List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(1, r.size());
-        }
-        {
-            LogicalTupleFilter filter = or(and(timeComp2, ageComp1), and(timeComp1, ageComp1),
-                    and(timeComp6, ageComp1));
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-            List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(2, r.size());
-            assertEquals("[1421193600000, 10]-[null, 10]", r.get(1).toString());
-            assertEquals("[[null, 10, null, null, null], [1421193600000, 10, null, null, null]]",
-                    r.get(1).fuzzyKeys.toString());
-        }
-        {
-            LogicalTupleFilter filter = or(and(timeComp3, ageComp3), and(timeComp7, ageComp1));
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
-            List<GTScanRange> r = planner.planScanRanges();
-            assertEquals("[[0, 10]-[1421280000000, 30]]", r.toString());
-        }
-        {
-            LogicalTupleFilter filter = or(timeComp2, timeComp1, timeComp6);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-            List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(2, r.size());
-            assertEquals("[1421193600000, null]-[null, null]", r.get(1).toString());
-            assertEquals(0, r.get(1).fuzzyKeys.size());
-        }
-        {
-            //skip FALSE filter
-            LogicalTupleFilter filter = and(ageComp1, ConstantTupleFilter.FALSE);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-            List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(0, r.size());
-        }
-        {
-            //TRUE or FALSE filter
-            LogicalTupleFilter filter = or(ConstantTupleFilter.TRUE, ConstantTupleFilter.FALSE);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-            List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(1, r.size());
-            assertEquals("[null, null]-[null, null]", r.get(0).toString());
-        }
-        {
-            //TRUE or other filter
-            LogicalTupleFilter filter = or(ageComp1, ConstantTupleFilter.TRUE);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-            List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(1, r.size());
-            assertEquals("[null, null]-[null, null]", r.get(0).toString());
-        }
-    }
-
-    @Test
-    public void verifySegmentSkipping2() {
-        {
-            LogicalTupleFilter filter = and(timeComp0, ageComp1);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-            List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(1, r.size());//scan range are [close,close]
-            assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
-            assertEquals(1, r.get(0).fuzzyKeys.size());
-            assertEquals("[[null, 10, null, null, null]]", r.get(0).fuzzyKeys.toString());
-        }
-
-        {
-            LogicalTupleFilter filter = and(timeComp5, ageComp1);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
-            List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(1, r.size());//scan range are [close,close]
-        }
-    }
-
-    @Test
-    public void verifyScanRangePlanner() {
-
-        // flatten or-and & hbase fuzzy value
-        {
-            LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2));
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
-            List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(1, r.size());
-            assertEquals("[1421193600000, 10]-[null, 20]", r.get(0).toString());
-            assertEquals("[[null, 10, null, null, null], [null, 20, null, null, null]]", r.get(0).fuzzyKeys.toString());
-        }
-
-        // pre-evaluate ever false
-        {
-            LogicalTupleFilter filter = and(timeComp1, timeComp2);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
-            List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(0, r.size());
-        }
-
-        // pre-evaluate ever true
-        {
-            LogicalTupleFilter filter = or(timeComp1, ageComp4);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
-            List<GTScanRange> r = planner.planScanRanges();
-            assertEquals("[[null, null]-[null, null]]", r.toString());
-        }
-
-        // merge overlap range
-        {
-            LogicalTupleFilter filter = or(timeComp1, timeComp3);
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
-            List<GTScanRange> r = planner.planScanRanges();
-            assertEquals("[[null, null]-[null, null]]", r.toString());
-        }
-
-        // merge too many ranges
-        {
-            LogicalTupleFilter filter = or(and(timeComp4, ageComp1), and(timeComp4, ageComp2),
-                    and(timeComp4, ageComp3));
-            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
-            List<GTScanRange> r = planner.planScanRanges();
-            assertEquals(3, r.size());
-            assertEquals("[1421280000000, 10]-[1421280000000, 10]", r.get(0).toString());
-            assertEquals("[1421280000000, 20]-[1421280000000, 20]", r.get(1).toString());
-            assertEquals("[1421280000000, 30]-[1421280000000, 30]", r.get(2).toString());
-            planner.setMaxScanRanges(2);
-            List<GTScanRange> r2 = planner.planScanRanges();
-            assertEquals("[[1421280000000, 10]-[1421280000000, 30]]", r2.toString());
-        }
-    }
-
-    @Test
-    public void verifyFirstRow() throws IOException {
-        doScanAndVerify(table,
-                new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null)
-                        .setFilterPushDown(null).createGTScanRequest(),
-                "[1421193600000, 30, Yang, 10, 10.5]", //
-                "[1421193600000, 30, Luke, 10, 10.5]", //
-                "[1421280000000, 20, Dong, 10, 10.5]", //
-                "[1421280000000, 20, Jason, 10, 10.5]", //
-                "[1421280000000, 30, Xu, 10, 10.5]", //
-                "[1421366400000, 20, Mahone, 10, 10.5]", //
-                "[1421366400000, 20, Qianhao, 10, 10.5]", //
-                "[1421366400000, 30, George, 10, 10.5]", //
-                "[1421366400000, 30, Shaofeng, 10, 10.5]", //
-                "[1421452800000, 10, Kejia, 10, 10.5]");
-    }
-
-    //for testing GTScanRequest serialization and deserialization
-    public static GTScanRequest useDeserializedGTScanRequest(GTScanRequest origin) {
-        ByteBuffer buffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);
-        GTScanRequest.serializer.serialize(origin, buffer);
-        buffer.flip();
-        GTScanRequest sGTScanRequest = GTScanRequest.serializer.deserialize(buffer);
-
-        Assert.assertArrayEquals(origin.getAggrMetricsFuncs(), sGTScanRequest.getAggrMetricsFuncs());
-        Assert.assertEquals(origin.getAggCacheMemThreshold(), sGTScanRequest.getAggCacheMemThreshold(), 0.01);
-        return sGTScanRequest;
-    }
-
-    @Test
-    public void verifyScanWithUnevaluatableFilter() throws IOException {
-        GTInfo info = table.getInfo();
-
-        CompareTupleFilter fComp = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
-        ExtractTupleFilter fUnevaluatable = unevaluatable(info.colRef(1));
-        LogicalTupleFilter fNotPlusUnevaluatable = not(unevaluatable(info.colRef(1)));
-        LogicalTupleFilter filter = and(fComp, fUnevaluatable, fNotPlusUnevaluatable);
-
-        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
-                .setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[] { "sum" })
-                .setFilterPushDown(filter).createGTScanRequest();
-
-        // note the unEvaluatable column 1 in filter is added to group by
-        assertEquals(
-                "GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [], []], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]",
-                req.toString());
-
-        doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 20, null]",
-                "[1421280000000, 30, null, 10, null]", "[1421366400000, 20, null, 20, null]",
-                "[1421366400000, 30, null, 20, null]", "[1421452800000, 10, null, 10, null]");
-    }
-
-    @Test
-    public void verifyScanWithEvaluatableFilter() throws IOException {
-        GTInfo info = table.getInfo();
-
-        CompareTupleFilter fComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
-        CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10"));
-        LogicalTupleFilter filter = and(fComp1, fComp2);
-
-        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
-                .setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[] { "sum" })
-                .setFilterPushDown(filter).createGTScanRequest();
-        // note the evaluatable column 1 in filter is added to returned columns but not in group by
-        assertEquals(
-                "GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]",
-                req.toString());
-
-        doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 30, null]",
-                "[1421366400000, 20, null, 40, null]");
-    }
-
-    @Test
-    public void verifyAggregateAndHavingFilter() throws IOException {
-        GTInfo info = table.getInfo();
-
-        TblColRef havingCol = TblColRef.newInnerColumn("SUM_OF_BIGDECIMAL", InnerDataTypeEnum.LITERAL);
-        havingCol.getColumnDesc().setId("1"); // point to the first aggregated measure
-        CompareTupleFilter havingFilter = compare(havingCol, FilterOperatorEnum.GT, "20");
-
-        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
-                .setAggrGroupBy(setOf(1)).setAggrMetrics(setOf(4)).setAggrMetricsFuncs(new String[] { "sum" })
-                .setHavingFilterPushDown(havingFilter).createGTScanRequest();
-
-        doScanAndVerify(table, useDeserializedGTScanRequest(req), "[null, 20, null, null, 42.0]",
-                "[null, 30, null, null, 52.5]");
-    }
-
-    @SuppressWarnings("unused")
-    private void testFilterScannerPerfInner(GridTable table, GTInfo info, LogicalTupleFilter filter)
-            throws IOException {
-        long start = System.currentTimeMillis();
-        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
-                .setFilterPushDown(filter).createGTScanRequest();
-        int i = 0;
-        try (IGTScanner scanner = table.scan(req)) {
-            for (GTRecord r : scanner) {
-                i++;
-            }
-        }
-        long end = System.currentTimeMillis();
-        System.out.println(
-                (end - start) + "ms with filter cache enabled=" + FilterResultCache.DEFAULT_OPTION + ", " + i + " rows");
-    }
-
-    @Test
-    public void verifyConvertFilterConstants1() {
-        GTInfo info = table.getInfo();
-
-        TableDesc extTable = TableDesc.mockup("ext");
-        TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
-        TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
-
-        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
-        CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.EQ, "10");
-        LogicalTupleFilter filter = and(fComp1, fComp2);
-
-        List<TblColRef> colMapping = Lists.newArrayList();
-        colMapping.add(extColA);
-        colMapping.add(extColB);
-
-        TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-        assertEquals(
-                "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 EQ [\\x00]]",
-                newFilter.toString());
-    }
-
-    @Test
-    public void verifyConvertFilterConstants2() {
-        GTInfo info = table.getInfo();
-
-        TableDesc extTable = TableDesc.mockup("ext");
-        TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
-        TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
-
-        List<TblColRef> colMapping = Lists.newArrayList();
-        colMapping.add(extColA);
-        colMapping.add(extColB);
-        
-        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
-        
-        // $1<"9" round down to FALSE
-        {
-            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LT, "9"));
-            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-            assertEquals(
-                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 ISNOTNULL []]",
-                    newFilter.toString());
-        }
-
-        // $1<"10" needs no rounding
-        {
-            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LT, "10"));
-            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-            assertEquals(
-                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LT [\\x00]]",
-                    newFilter.toString());
-        }
-        
-        // $1<"11" round down to <="10"
-        {
-            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LT, "11"));
-            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-            assertEquals(
-                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LTE [\\x00]]",
-                    newFilter.toString());
-        }
-        
-        // $1<="9" round down to FALSE
-        {
-            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LTE, "9"));
-            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-            assertEquals(ConstantTupleFilter.FALSE, newFilter);
-        }
-        
-        // $1<="10" needs no rounding
-        {
-            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LTE, "10"));
-            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-            assertEquals(
-                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LTE [\\x00]]",
-                    newFilter.toString());
-        }
-        
-        // $1<="11" round down to <="10"
-        {
-            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LTE, "11"));
-            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-            assertEquals(
-                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LTE [\\x00]]",
-                    newFilter.toString());
-        }
-    }
-
-    @Test
-    public void verifyConvertFilterConstants3() {
-        GTInfo info = table.getInfo();
-
-        TableDesc extTable = TableDesc.mockup("ext");
-        TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
-        TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
-
-        List<TblColRef> colMapping = Lists.newArrayList();
-        colMapping.add(extColA);
-        colMapping.add(extColB);
-        
-        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
-        
-        // $1>"101" round up to FALSE
-        {
-            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GT, "101"));
-            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-            assertEquals("AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 ISNOTNULL []]",
-                    newFilter.toString());
-        }
-
-        // $1>"100" needs no rounding
-        {
-            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GT, "100"));
-            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-            assertEquals(
-                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GT [\\x09]]",
-                    newFilter.toString());
-        }
-        
-        // $1>"99" round up to >="100"
-        {
-            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GT, "99"));
-            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-            assertEquals(
-                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GTE [\\x09]]",
-                    newFilter.toString());
-        }
-        
-        // $1>="101" round up to FALSE
-        {
-            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GTE, "101"));
-            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-            assertEquals(ConstantTupleFilter.FALSE, newFilter);
-        }
-        
-        // $1>="100" needs no rounding
-        {
-            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GTE, "100"));
-            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-            assertEquals(
-                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GTE [\\x09]]",
-                    newFilter.toString());
-        }
-        
-        // $1>="99" round up to >="100"
-        {
-            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GTE, "99"));
-            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-            assertEquals(
-                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GTE [\\x09]]",
-                    newFilter.toString());
-        }
-    }
-
-    @Test
-    public void verifyConvertFilterConstants4() {
-        GTInfo info = table.getInfo();
-
-        TableDesc extTable = TableDesc.mockup("ext");
-        TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
-        TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
-
-        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
-        CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.IN, "9", "10", "15");
-        LogicalTupleFilter filter = and(fComp1, fComp2);
-
-        List<TblColRef> colMapping = Lists.newArrayList();
-        colMapping.add(extColA);
-        colMapping.add(extColB);
-
-        // $1 in ("9", "10", "15") has only "10" left
-        TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
-        assertEquals(
-                "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 IN [\\x00]]",
-                newFilter.toString());
-    }
-
-    private void doScanAndVerify(GridTable table, GTScanRequest req, String... verifyRows) throws IOException {
-        System.out.println(req);
-        try (IGTScanner scanner = table.scan(req)) {
-            int i = 0;
-            for (GTRecord r : scanner) {
-                System.out.println(r);
-                if (verifyRows == null || i >= verifyRows.length) {
-                    Assert.fail();
-                }
-                assertEquals(verifyRows[i], r.toString());
-                i++;
-            }
-        }
-    }
-
-    public static ByteArray enc(GTInfo info, int col, String value) {
-        ByteBuffer buf = ByteBuffer.allocate(info.getMaxColumnLength());
-        info.getCodeSystem().encodeColumnValue(col, value, buf);
-        return ByteArray.copyOf(buf.array(), buf.arrayOffset(), buf.position());
-    }
-
-    public static ExtractTupleFilter unevaluatable(TblColRef col) {
-        ExtractTupleFilter r = new ExtractTupleFilter(FilterOperatorEnum.EXTRACT);
-        r.addChild(new ColumnTupleFilter(col));
-        return r;
-    }
-
-    public static CompareTupleFilter compare(TblColRef col, FilterOperatorEnum op, Object... value) {
-        CompareTupleFilter result = new CompareTupleFilter(op);
-        result.addChild(new ColumnTupleFilter(col));
-        result.addChild(new ConstantTupleFilter(Arrays.asList(value)));
-        return result;
-    }
-
-    public static LogicalTupleFilter and(TupleFilter... children) {
-        return logic(FilterOperatorEnum.AND, children);
-    }
-
-    public static LogicalTupleFilter or(TupleFilter... children) {
-        return logic(FilterOperatorEnum.OR, children);
-    }
-
-    public static LogicalTupleFilter not(TupleFilter child) {
-        return logic(FilterOperatorEnum.NOT, child);
-    }
-
-    public static LogicalTupleFilter logic(FilterOperatorEnum op, TupleFilter... children) {
-        LogicalTupleFilter result = new LogicalTupleFilter(op);
-        for (TupleFilter c : children) {
-            result.addChild(c);
-        }
-        return result;
-    }
-
-    public static GridTable newTestTable() throws IOException {
-        GTInfo info = newInfo();
-        GTSimpleMemStore store = new GTSimpleMemStore(info);
-        GridTable table = new GridTable(info, store);
-
-        GTRecord r = new GTRecord(table.getInfo());
-        GTBuilder builder = table.rebuild();
-
-        builder.write(r.setValues("2015-01-14", "30", "Yang", new Long(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-14", "30", "Luke", new Long(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-15", "20", "Dong", new Long(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-15", "20", "Jason", new Long(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-15", "30", "Xu", new Long(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-16", "20", "Mahone", new Long(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-16", "20", "Qianhao", new Long(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-16", "30", "George", new Long(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new Long(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-17", "10", "Kejia", new Long(10), new BigDecimal("10.5")));
-        builder.close();
-
-        return table;
-    }
-
-    static GridTable newTestPerfTable() throws IOException {
-        GTInfo info = newInfo();
-        GTSimpleMemStore store = new GTSimpleMemStore(info);
-        GridTable table = new GridTable(info, store);
-
-        GTRecord r = new GTRecord(table.getInfo());
-        GTBuilder builder = table.rebuild();
-
-        for (int i = 0; i < 100000; i++) {
-            for (int j = 0; j < 10; j++)
-                builder.write(r.setValues("2015-01-14", "30", "Yang", new Long(10), new BigDecimal("10.5")));
-
-            for (int j = 0; j < 10; j++)
-                builder.write(r.setValues("2015-01-14", "30", "Luke", new Long(10), new BigDecimal("10.5")));
-
-            for (int j = 0; j < 10; j++)
-                builder.write(r.setValues("2015-01-15", "20", "Dong", new Long(10), new BigDecimal("10.5")));
-
-            for (int j = 0; j < 10; j++)
-                builder.write(r.setValues("2015-01-15", "20", "Jason", new Long(10), new BigDecimal("10.5")));
-
-            for (int j = 0; j < 10; j++)
-                builder.write(r.setValues("2015-01-15", "30", "Xu", new Long(10), new BigDecimal("10.5")));
-
-            for (int j = 0; j < 10; j++)
-                builder.write(r.setValues("2015-01-16", "20", "Mahone", new Long(10), new BigDecimal("10.5")));
-
-            for (int j = 0; j < 10; j++)
-                builder.write(r.setValues("2015-01-16", "20", "Qianhao", new Long(10), new BigDecimal("10.5")));
-
-            for (int j = 0; j < 10; j++)
-                builder.write(r.setValues("2015-01-16", "30", "George", new Long(10), new BigDecimal("10.5")));
-
-            for (int j = 0; j < 10; j++)
-                builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new Long(10), new BigDecimal("10.5")));
-
-            for (int j = 0; j < 10; j++)
-                builder.write(r.setValues("2015-01-17", "10", "Kejia", new Long(10), new BigDecimal("10.5")));
-        }
-        builder.close();
-
-        return table;
-    }
-
-    static GTInfo newInfo() {
-        Builder builder = GTInfo.builder();
-        builder.setCodeSystem(newDictCodeSystem());
-        builder.setColumns(//
-                DataType.getType("timestamp"), //
-                DataType.getType("integer"), //
-                DataType.getType("varchar(10)"), //
-                DataType.getType("bigint"), //
-                DataType.getType("decimal") //
-        );
-        builder.setPrimaryKey(setOf(0, 1));
-        builder.setColumnPreferIndex(setOf(0));
-        builder.enableColumnBlock(new ImmutableBitSet[] { setOf(0, 1), setOf(2), setOf(3, 4) });
-        builder.enableRowBlock(4);
-        GTInfo info = builder.build();
-        return info;
-    }
-
-    private static CubeCodeSystem newDictCodeSystem() {
-        DimensionEncoding[] dimEncs = new DimensionEncoding[3];
-        dimEncs[1] = new DictionaryDimEnc(newDictionaryOfInteger());
-        dimEncs[2] = new DictionaryDimEnc(newDictionaryOfString());
-        return new CubeCodeSystem(dimEncs);
-    }
-
-    private static Dictionary newDictionaryOfString() {
-        TrieDictionaryBuilder<String> builder = new TrieDictionaryBuilder<>(new StringBytesConverter());
-        builder.addValue("Dong");
-        builder.addValue("George");
-        builder.addValue("Jason");
-        builder.addValue("Kejia");
-        builder.addValue("Luke");
-        builder.addValue("Mahone");
-        builder.addValue("Qianhao");
-        builder.addValue("Shaofeng");
-        builder.addValue("Xu");
-        builder.addValue("Yang");
-        return builder.build(0);
-    }
-
-    private static Dictionary newDictionaryOfInteger() {
-        NumberDictionaryForestBuilder builder = new NumberDictionaryForestBuilder();
-        builder.addValue("10");
-        builder.addValue("20");
-        builder.addValue("30");
-        builder.addValue("40");
-        builder.addValue("50");
-        builder.addValue("60");
-        builder.addValue("70");
-        builder.addValue("80");
-        builder.addValue("90");
-        builder.addValue("100");
-        return builder.build();
-    }
-
-    public static ImmutableBitSet setOf(int... values) {
-        BitSet set = new BitSet();
-        for (int i : values)
-            set.set(i);
-        return new ImmutableBitSet(set);
-    }
+//
+//    private GridTable table;
+//    private GTInfo info;
+//    private CompareTupleFilter timeComp0;
+//    private CompareTupleFilter timeComp1;
+//    private CompareTupleFilter timeComp2;
+//    private CompareTupleFilter timeComp3;
+//    private CompareTupleFilter timeComp4;
+//    private CompareTupleFilter timeComp5;
+//    private CompareTupleFilter timeComp6;
+//    private CompareTupleFilter timeComp7;
+//    private CompareTupleFilter ageComp1;
+//    private CompareTupleFilter ageComp2;
+//    private CompareTupleFilter ageComp3;
+//    private CompareTupleFilter ageComp4;
+//
+//    @After
+//    public void after() throws Exception {
+//
+//        this.cleanupTestMetadata();
+//    }
+//
+//    @Before
+//    public void setup() throws IOException {
+//
+//        this.createTestMetadata();
+//
+//        table = newTestTable();
+//        info = table.getInfo();
+//
+//        timeComp0 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-14"));
+//        timeComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+//        timeComp2 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-13"));
+//        timeComp3 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-15"));
+//        timeComp4 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-15"));
+//        timeComp5 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-15"));
+//        timeComp6 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "2015-01-14"));
+//        timeComp7 = compare(info.colRef(0), FilterOperatorEnum.EQ, enc(info, 0, "1970-01-01"));
+//        ageComp1 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "10"));
+//        ageComp2 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "20"));
+//        ageComp3 = compare(info.colRef(1), FilterOperatorEnum.EQ, enc(info, 1, "30"));
+//        ageComp4 = compare(info.colRef(1), FilterOperatorEnum.NEQ, enc(info, 1, "30"));
+//
+//    }
+//
+//    @Test
+//    public void verifySegmentSkipping() {
+//
+//        ByteArray segmentStart = enc(info, 0, "2015-01-14");
+//        ByteArray segmentStartX = enc(info, 0, "2015-01-14 00:00:00");//when partition col is dict encoded, time format will be free
+//        assertEquals(segmentStart, segmentStartX);
+//
+//        {
+//            LogicalTupleFilter filter = and(timeComp0, ageComp1);
+//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+//            List<GTScanRange> r = planner.planScanRanges();
+//            assertEquals(1, r.size());//scan range are [close,close]
+//            assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
+//            assertEquals(1, r.get(0).fuzzyKeys.size());
+//            assertEquals("[[null, 10, null, null, null]]", r.get(0).fuzzyKeys.toString());
+//        }
+//        {
+//            LogicalTupleFilter filter = and(timeComp2, ageComp1);
+//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+//            List<GTScanRange> r = planner.planScanRanges();
+//            assertEquals(1, r.size());
+//        }
+//        {
+//            LogicalTupleFilter filter = and(timeComp4, ageComp1);
+//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+//            List<GTScanRange> r = planner.planScanRanges();
+//            assertEquals(1, r.size());
+//        }
+//        {
+//            LogicalTupleFilter filter = and(timeComp5, ageComp1);
+//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+//            List<GTScanRange> r = planner.planScanRanges();
+//            assertEquals(1, r.size());
+//        }
+//        {
+//            LogicalTupleFilter filter = or(and(timeComp2, ageComp1), and(timeComp1, ageComp1),
+//                    and(timeComp6, ageComp1));
+//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+//            List<GTScanRange> r = planner.planScanRanges();
+//            assertEquals(2, r.size());
+//            assertEquals("[1421193600000, 10]-[null, 10]", r.get(1).toString());
+//            assertEquals("[[null, 10, null, null, null], [1421193600000, 10, null, null, null]]",
+//                    r.get(1).fuzzyKeys.toString());
+//        }
+//        {
+//            LogicalTupleFilter filter = or(and(timeComp3, ageComp3), and(timeComp7, ageComp1));
+//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
+//            List<GTScanRange> r = planner.planScanRanges();
+//            assertEquals("[[0, 10]-[1421280000000, 30]]", r.toString());
+//        }
+//        {
+//            LogicalTupleFilter filter = or(timeComp2, timeComp1, timeComp6);
+//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+//            List<GTScanRange> r = planner.planScanRanges();
+//            assertEquals(2, r.size());
+//            assertEquals("[1421193600000, null]-[null, null]", r.get(1).toString());
+//            assertEquals(0, r.get(1).fuzzyKeys.size());
+//        }
+//        {
+//            //skip FALSE filter
+//            LogicalTupleFilter filter = and(ageComp1, ConstantTupleFilter.FALSE);
+//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+//            List<GTScanRange> r = planner.planScanRanges();
+//            assertEquals(0, r.size());
+//        }
+//        {
+//            //TRUE or FALSE filter
+//            LogicalTupleFilter filter = or(ConstantTupleFilter.TRUE, ConstantTupleFilter.FALSE);
+//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+//            List<GTScanRange> r = planner.planScanRanges();
+//            assertEquals(1, r.size());
+//            assertEquals("[null, null]-[null, null]", r.get(0).toString());
+//        }
+//        {
+//            //TRUE or other filter
+//            LogicalTupleFilter filter = or(ageComp1, ConstantTupleFilter.TRUE);
+//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+//            List<GTScanRange> r = planner.planScanRanges();
+//            assertEquals(1, r.size());
+//            assertEquals("[null, null]-[null, null]", r.get(0).toString());
+//        }
+//    }
+//
+//    @Test
+//    public void verifySegmentSkipping2() {
+//        {
+//            LogicalTupleFilter filter = and(timeComp0, ageComp1);
+//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+//            List<GTScanRange> r = planner.planScanRanges();
+//            assertEquals(1, r.size());//scan range are [close,close]
+//            assertEquals("[null, 10]-[1421193600000, 10]", r.get(0).toString());
+//            assertEquals(1, r.get(0).fuzzyKeys.size());
+//            assertEquals("[[null, 10, null, null, null]]", r.get(0).fuzzyKeys.toString());
+//        }
+//
+//        {
+//            LogicalTupleFilter filter = and(timeComp5, ageComp1);
+//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, info.colRef(0), filter);
+//            List<GTScanRange> r = planner.planScanRanges();
+//            assertEquals(1, r.size());//scan range are [close,close]
+//        }
+//    }
+//
+//    @Test
+//    public void verifyScanRangePlanner() {
+//
+//        // flatten or-and & hbase fuzzy value
+//        {
+//            LogicalTupleFilter filter = and(timeComp1, or(ageComp1, ageComp2));
+//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
+//            List<GTScanRange> r = planner.planScanRanges();
+//            assertEquals(1, r.size());
+//            assertEquals("[1421193600000, 10]-[null, 20]", r.get(0).toString());
+//            assertEquals("[[null, 10, null, null, null], [null, 20, null, null, null]]", r.get(0).fuzzyKeys.toString());
+//        }
+//
+//        // pre-evaluate ever false
+//        {
+//            LogicalTupleFilter filter = and(timeComp1, timeComp2);
+//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
+//            List<GTScanRange> r = planner.planScanRanges();
+//            assertEquals(0, r.size());
+//        }
+//
+//        // pre-evaluate ever true
+//        {
+//            LogicalTupleFilter filter = or(timeComp1, ageComp4);
+//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
+//            List<GTScanRange> r = planner.planScanRanges();
+//            assertEquals("[[null, null]-[null, null]]", r.toString());
+//        }
+//
+//        // merge overlap range
+//        {
+//            LogicalTupleFilter filter = or(timeComp1, timeComp3);
+//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
+//            List<GTScanRange> r = planner.planScanRanges();
+//            assertEquals("[[null, null]-[null, null]]", r.toString());
+//        }
+//
+//        // merge too many ranges
+//        {
+//            LogicalTupleFilter filter = or(and(timeComp4, ageComp1), and(timeComp4, ageComp2),
+//                    and(timeComp4, ageComp3));
+//            CubeScanRangePlanner planner = new CubeScanRangePlanner(info, null, filter);
+//            List<GTScanRange> r = planner.planScanRanges();
+//            assertEquals(3, r.size());
+//            assertEquals("[1421280000000, 10]-[1421280000000, 10]", r.get(0).toString());
+//            assertEquals("[1421280000000, 20]-[1421280000000, 20]", r.get(1).toString());
+//            assertEquals("[1421280000000, 30]-[1421280000000, 30]", r.get(2).toString());
+//            planner.setMaxScanRanges(2);
+//            List<GTScanRange> r2 = planner.planScanRanges();
+//            assertEquals("[[1421280000000, 10]-[1421280000000, 30]]", r2.toString());
+//        }
+//    }
+//
+//    @Test
+//    public void verifyFirstRow() throws IOException {
+//        doScanAndVerify(table,
+//                new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null)
+//                        .setFilterPushDown(null).createGTScanRequest(),
+//                "[1421193600000, 30, Yang, 10, 10.5]", //
+//                "[1421193600000, 30, Luke, 10, 10.5]", //
+//                "[1421280000000, 20, Dong, 10, 10.5]", //
+//                "[1421280000000, 20, Jason, 10, 10.5]", //
+//                "[1421280000000, 30, Xu, 10, 10.5]", //
+//                "[1421366400000, 20, Mahone, 10, 10.5]", //
+//                "[1421366400000, 20, Qianhao, 10, 10.5]", //
+//                "[1421366400000, 30, George, 10, 10.5]", //
+//                "[1421366400000, 30, Shaofeng, 10, 10.5]", //
+//                "[1421452800000, 10, Kejia, 10, 10.5]");
+//    }
+//
+//    //for testing GTScanRequest serialization and deserialization
+//    public static GTScanRequest useDeserializedGTScanRequest(GTScanRequest origin) {
+//        ByteBuffer buffer = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);
+//        GTScanRequest.serializer.serialize(origin, buffer);
+//        buffer.flip();
+//        GTScanRequest sGTScanRequest = GTScanRequest.serializer.deserialize(buffer);
+//
+//        Assert.assertArrayEquals(origin.getAggrMetricsFuncs(), sGTScanRequest.getAggrMetricsFuncs());
+//        Assert.assertEquals(origin.getAggCacheMemThreshold(), sGTScanRequest.getAggCacheMemThreshold(), 0.01);
+//        return sGTScanRequest;
+//    }
+//
+//    @Test
+//    public void verifyScanWithUnevaluatableFilter() throws IOException {
+//        GTInfo info = table.getInfo();
+//
+//        CompareTupleFilter fComp = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+//        ExtractTupleFilter fUnevaluatable = unevaluatable(info.colRef(1));
+//        LogicalTupleFilter fNotPlusUnevaluatable = not(unevaluatable(info.colRef(1)));
+//        LogicalTupleFilter filter = and(fComp, fUnevaluatable, fNotPlusUnevaluatable);
+//
+//        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
+//                .setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[] { "sum" })
+//                .setFilterPushDown(filter).createGTScanRequest();
+//
+//        // note the unEvaluatable column 1 in filter is added to group by
+//        assertEquals(
+//                "GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [], []], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]",
+//                req.toString());
+//
+//        doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 20, null]",
+//                "[1421280000000, 30, null, 10, null]", "[1421366400000, 20, null, 20, null]",
+//                "[1421366400000, 30, null, 20, null]", "[1421452800000, 10, null, 10, null]");
+//    }
+//
+//    @Test
+//    public void verifyScanWithEvaluatableFilter() throws IOException {
+//        GTInfo info = table.getInfo();
+//
+//        CompareTupleFilter fComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14"));
+//        CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10"));
+//        LogicalTupleFilter filter = and(fComp1, fComp2);
+//
+//        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
+//                .setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[] { "sum" })
+//                .setFilterPushDown(filter).createGTScanRequest();
+//        // note the evaluatable column 1 in filter is added to returned columns but not in group by
+//        assertEquals(
+//                "GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]",
+//                req.toString());
+//
+//        doScanAndVerify(table, useDeserializedGTScanRequest(req), "[1421280000000, 20, null, 30, null]",
+//                "[1421366400000, 20, null, 40, null]");
+//    }
+//
+//    @Test
+//    public void verifyAggregateAndHavingFilter() throws IOException {
+//        GTInfo info = table.getInfo();
+//
+//        TblColRef havingCol = TblColRef.newInnerColumn("SUM_OF_BIGDECIMAL", InnerDataTypeEnum.LITERAL);
+//        havingCol.getColumnDesc().setId("1"); // point to the first aggregated measure
+//        CompareTupleFilter havingFilter = compare(havingCol, FilterOperatorEnum.GT, "20");
+//
+//        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
+//                .setAggrGroupBy(setOf(1)).setAggrMetrics(setOf(4)).setAggrMetricsFuncs(new String[] { "sum" })
+//                .setHavingFilterPushDown(havingFilter).createGTScanRequest();
+//
+//        doScanAndVerify(table, useDeserializedGTScanRequest(req), "[null, 20, null, null, 42.0]",
+//                "[null, 30, null, null, 52.5]");
+//    }
+//
+//    @SuppressWarnings("unused")
+//    private void testFilterScannerPerfInner(GridTable table, GTInfo info, LogicalTupleFilter filter)
+//            throws IOException {
+//        long start = System.currentTimeMillis();
+//        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null)
+//                .setFilterPushDown(filter).createGTScanRequest();
+//        int i = 0;
+//        try (IGTScanner scanner = table.scan(req)) {
+//            for (GTRecord r : scanner) {
+//                i++;
+//            }
+//        }
+//        long end = System.currentTimeMillis();
+//        System.out.println(
+//                (end - start) + "ms with filter cache enabled=" + FilterResultCache.DEFAULT_OPTION + ", " + i + " rows");
+//    }
+//
+//    @Test
+//    public void verifyConvertFilterConstants1() {
+//        GTInfo info = table.getInfo();
+//
+//        TableDesc extTable = TableDesc.mockup("ext");
+//        TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
+//        TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
+//
+//        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+//        CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.EQ, "10");
+//        LogicalTupleFilter filter = and(fComp1, fComp2);
+//
+//        List<TblColRef> colMapping = Lists.newArrayList();
+//        colMapping.add(extColA);
+//        colMapping.add(extColB);
+//
+//        TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+//        assertEquals(
+//                "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 EQ [\\x00]]",
+//                newFilter.toString());
+//    }
+//
+//    @Test
+//    public void verifyConvertFilterConstants2() {
+//        GTInfo info = table.getInfo();
+//
+//        TableDesc extTable = TableDesc.mockup("ext");
+//        TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
+//        TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
+//
+//        List<TblColRef> colMapping = Lists.newArrayList();
+//        colMapping.add(extColA);
+//        colMapping.add(extColB);
+//
+//        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+//
+//        // $1<"9" round down to FALSE
+//        {
+//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LT, "9"));
+//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+//            assertEquals(
+//                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 ISNOTNULL []]",
+//                    newFilter.toString());
+//        }
+//
+//        // $1<"10" needs no rounding
+//        {
+//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LT, "10"));
+//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+//            assertEquals(
+//                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LT [\\x00]]",
+//                    newFilter.toString());
+//        }
+//
+//        // $1<"11" round down to <="10"
+//        {
+//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LT, "11"));
+//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+//            assertEquals(
+//                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LTE [\\x00]]",
+//                    newFilter.toString());
+//        }
+//
+//        // $1<="9" round down to FALSE
+//        {
+//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LTE, "9"));
+//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+//            assertEquals(ConstantTupleFilter.FALSE, newFilter);
+//        }
+//
+//        // $1<="10" needs no rounding
+//        {
+//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LTE, "10"));
+//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+//            assertEquals(
+//                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LTE [\\x00]]",
+//                    newFilter.toString());
+//        }
+//
+//        // $1<="11" round down to <="10"
+//        {
+//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.LTE, "11"));
+//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+//            assertEquals(
+//                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 LTE [\\x00]]",
+//                    newFilter.toString());
+//        }
+//    }
+//
+//    @Test
+//    public void verifyConvertFilterConstants3() {
+//        GTInfo info = table.getInfo();
+//
+//        TableDesc extTable = TableDesc.mockup("ext");
+//        TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
+//        TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
+//
+//        List<TblColRef> colMapping = Lists.newArrayList();
+//        colMapping.add(extColA);
+//        colMapping.add(extColB);
+//
+//        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+//
+//        // $1>"101" round up to FALSE
+//        {
+//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GT, "101"));
+//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+//            assertEquals("AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 ISNOTNULL []]",
+//                    newFilter.toString());
+//        }
+//
+//        // $1>"100" needs no rounding
+//        {
+//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GT, "100"));
+//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+//            assertEquals(
+//                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GT [\\x09]]",
+//                    newFilter.toString());
+//        }
+//
+//        // $1>"99" round up to >="100"
+//        {
+//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GT, "99"));
+//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+//            assertEquals(
+//                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GTE [\\x09]]",
+//                    newFilter.toString());
+//        }
+//
+//        // $1>="101" round up to FALSE
+//        {
+//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GTE, "101"));
+//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+//            assertEquals(ConstantTupleFilter.FALSE, newFilter);
+//        }
+//
+//        // $1>="100" needs no rounding
+//        {
+//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GTE, "100"));
+//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+//            assertEquals(
+//                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GTE [\\x09]]",
+//                    newFilter.toString());
+//        }
+//
+//        // $1>="99" round up to >="100"
+//        {
+//            LogicalTupleFilter filter = and(fComp1, compare(extColB, FilterOperatorEnum.GTE, "99"));
+//            TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+//            assertEquals(
+//                    "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 GTE [\\x09]]",
+//                    newFilter.toString());
+//        }
+//    }
+//
+//    @Test
+//    public void verifyConvertFilterConstants4() {
+//        GTInfo info = table.getInfo();
+//
+//        TableDesc extTable = TableDesc.mockup("ext");
+//        TblColRef extColA = TblColRef.mockup(extTable, 1, "A", "timestamp");
+//        TblColRef extColB = TblColRef.mockup(extTable, 2, "B", "integer");
+//
+//        CompareTupleFilter fComp1 = compare(extColA, FilterOperatorEnum.GT, "2015-01-14");
+//        CompareTupleFilter fComp2 = compare(extColB, FilterOperatorEnum.IN, "9", "10", "15");
+//        LogicalTupleFilter filter = and(fComp1, fComp2);
+//
+//        List<TblColRef> colMapping = Lists.newArrayList();
+//        colMapping.add(extColA);
+//        colMapping.add(extColB);
+//
+//        // $1 in ("9", "10", "15") has only "10" left
+//        TupleFilter newFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, colMapping, null);
+//        assertEquals(
+//                "AND [UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], UNKNOWN_MODEL:NULL.GT_MOCKUP_TABLE.1 IN [\\x00]]",
+//                newFilter.toString());
+//    }
+//
+//    private void doScanAndVerify(GridTable table, GTScanRequest req, String... verifyRows) throws IOException {
+//        System.out.println(req);
+//        try (IGTScanner scanner = table.scan(req)) {
+//            int i = 0;
+//            for (GTRecord r : scanner) {
+//                System.out.println(r);
+//                if (verifyRows == null || i >= verifyRows.length) {
+//                    Assert.fail();
+//                }
+//                assertEquals(verifyRows[i], r.toString());
+//                i++;
+//            }
+//        }
+//    }
+//
+//    public static ByteArray enc(GTInfo info, int col, String value) {
+//        ByteBuffer buf = ByteBuffer.allocate(info.getMaxColumnLength());
+//        info.getCodeSystem().encodeColumnValue(col, value, buf);
+//        return ByteArray.copyOf(buf.array(), buf.arrayOffset(), buf.position());
+//    }
+//
+//    public static ExtractTupleFilter unevaluatable(TblColRef col) {
+//        ExtractTupleFilter r = new ExtractTupleFilter(FilterOperatorEnum.EXTRACT);
+//        r.addChild(new ColumnTupleFilter(col));
+//        return r;
+//    }
+//
+//    public static CompareTupleFilter compare(TblColRef col, FilterOperatorEnum op, Object... value) {
+//        CompareTupleFilter result = new CompareTupleFilter(op);
+//        result.addChild(new ColumnTupleFilter(col));
+//        result.addChild(new ConstantTupleFilter(Arrays.asList(value)));
+//        return result;
+//    }
+//
+//    public static LogicalTupleFilter and(TupleFilter... children) {
+//        return logic(FilterOperatorEnum.AND, children);
+//    }
+//
+//    public static LogicalTupleFilter or(TupleFilter... children) {
+//        return logic(FilterOperatorEnum.OR, children);
+//    }
+//
+//    public static LogicalTupleFilter not(TupleFilter child) {
+//        return logic(FilterOperatorEnum.NOT, child);
+//    }
+//
+//    public static LogicalTupleFilter logic(FilterOperatorEnum op, TupleFilter... children) {
+//        LogicalTupleFilter result = new LogicalTupleFilter(op);
+//        for (TupleFilter c : children) {
+//            result.addChild(c);
+//        }
+//        return result;
+//    }
+//
+//    public static GridTable newTestTable() throws IOException {
+//        GTInfo info = newInfo();
+//        GTSimpleMemStore store = new GTSimpleMemStore(info);
+//        GridTable table = new GridTable(info, store);
+//
+//        GTRecord r = new GTRecord(table.getInfo());
+//        GTBuilder builder = table.rebuild();
+//
+//        builder.write(r.setValues("2015-01-14", "30", "Yang", new Long(10), new BigDecimal("10.5")));
+//        builder.write(r.setValues("2015-01-14", "30", "Luke", new Long(10), new BigDecimal("10.5")));
+//        builder.write(r.setValues("2015-01-15", "20", "Dong", new Long(10), new BigDecimal("10.5")));
+//        builder.write(r.setValues("2015-01-15", "20", "Jason", new Long(10), new BigDecimal("10.5")));
+//        builder.write(r.setValues("2015-01-15", "30", "Xu", new Long(10), new BigDecimal("10.5")));
+//        builder.write(r.setValues("2015-01-16", "20", "Mahone", new Long(10), new BigDecimal("10.5")));
+//        builder.write(r.setValues("2015-01-16", "20", "Qianhao", new Long(10), new BigDecimal("10.5")));
+//        builder.write(r.setValues("2015-01-16", "30", "George", new Long(10), new BigDecimal("10.5")));
+//        builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new Long(10), new BigDecimal("10.5")));
+//        builder.write(r.setValues("2015-01-17", "10", "Kejia", new Long(10), new BigDecimal("10.5")));
+//        builder.close();
+//
+//        return table;
+//    }
+//
+//    static GridTable newTestPerfTable() throws IOException {
+//        GTInfo info = newInfo();
+//        GTSimpleMemStore store = new GTSimpleMemStore(info);
+//        GridTable table = new GridTable(info, store);
+//
+//        GTRecord r = new GTRecord(table.getInfo());
+//        GTBuilder builder = table.rebuild();
+//
+//        for (int i = 0; i < 100000; i++) {
+//            for (int j = 0; j < 10; j++)
+//                builder.write(r.setValues("2015-01-14", "30", "Yang", new Long(10), new BigDecimal("10.5")));
+//
+//            for (int j = 0; j < 10; j++)
+//                builder.write(r.setValues("2015-01-14", "30", "Luke", new Long(10), new BigDecimal("10.5")));
+//
+//            for (int j = 0; j < 10; j++)
+//                builder.write(r.setValues("2015-01-15", "20", "Dong", new Long(10), new BigDecimal("10.5")));
+//
+//            for (int j = 0; j < 10; j++)
+//                builder.write(r.setValues("2015-01-15", "20", "Jason", new Long(10), new BigDecimal("10.5")));
+//
+//            for (int j = 0; j < 10; j++)
+//                builder.write(r.setValues("2015-01-15", "30", "Xu", new Long(10), new BigDecimal("10.5")));
+//
+//            for (int j = 0; j < 10; j++)
+//                builder.write(r.setValues("2015-01-16", "20", "Mahone", new Long(10), new BigDecimal("10.5")));
+//
+//            for (int j = 0; j < 10; j++)
+//                builder.write(r.setValues("2015-01-16", "20", "Qianhao", new Long(10), new BigDecimal("10.5")));
+//
+//            for (int j = 0; j < 10; j++)
+//                builder.write(r.setValues("2015-01-16", "30", "George", new Long(10), new BigDecimal("10.5")));
+//
+//            for (int j = 0; j < 10; j++)
+//                builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new Long(10), new BigDecimal("10.5")));
+//
+//            for (int j = 0; j < 10; j++)
+//                builder.write(r.setValues("2015-01-17", "10", "Kejia", new Long(10), new BigDecimal("10.5")));
+//        }
+//        builder.close();
+//
+//        return table;
+//    }
+//
+//    static GTInfo newInfo() {
+//        Builder builder = GTInfo.builder();
+//        builder.setCodeSystem(newDictCodeSystem());
+//        builder.setColumns(//
+//                DataType.getType("timestamp"), //
+//                DataType.getType("integer"), //
+//                DataType.getType("varchar(10)"), //
+//                DataType.getType("bigint"), //
+//                DataType.getType("decimal") //
+//        );
+//        builder.setPrimaryKey(setOf(0, 1));
+//        builder.setColumnPreferIndex(setOf(0));
+//        builder.enableColumnBlock(new ImmutableBitSet[] { setOf(0, 1), setOf(2), setOf(3, 4) });
+//        builder.enableRowBlock(4);
+//        GTInfo info = builder.build();
+//        return info;
+//    }
+//
+//    private static CubeCodeSystem newDictCodeSystem() {
+//        DimensionEncoding[] dimEncs = new DimensionEncoding[3];
+//        dimEncs[1] = new DictionaryDimEnc(newDictionaryOfInteger());
+//        dimEncs[2] = new DictionaryDimEnc(newDictionaryOfString());
+//        return new CubeCodeSystem(dimEncs);
+//    }
+//
+//    private static Dictionary newDictionaryOfString() {
+//        TrieDictionaryBuilder<String> builder = new TrieDictionaryBuilder<>(new StringBytesConverter());
+//        builder.addValue("Dong");
+//        builder.addValue("George");
+//        builder.addValue("Jason");
+//        builder.addValue("Kejia");
+//        builder.addValue("Luke");
+//        builder.addValue("Mahone");
+//        builder.addValue("Qianhao");
+//        builder.addValue("Shaofeng");
+//        builder.addValue("Xu");
+//        builder.addValue("Yang");
+//        return builder.build(0);
+//    }
+//
+//    private static Dictionary newDictionaryOfInteger() {
+//        NumberDictionaryForestBuilder builder = new NumberDictionaryForestBuilder();
+//        builder.addValue("10");
+//        builder.addValue("20");
+//        builder.addValue("30");
+//        builder.addValue("40");
+//        builder.addValue("50");
+//        builder.addValue("60");
+//        builder.addValue("70");
+//        builder.addValue("80");
+//        builder.addValue("90");
+//        builder.addValue("100");
+//        return builder.build();
+//    }
+//
+//    public static ImmutableBitSet setOf(int... values) {
+//        BitSet set = new BitSet();
+//        for (int i : values)
+//            set.set(i);
+//        return new ImmutableBitSet(set);
+//    }
 }
diff --git a/pom.xml b/pom.xml
index 75d33a9..3fe15f2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -284,11 +284,11 @@
         <artifactId>kylin-core-metadata</artifactId>
         <version>${project.version}</version>
       </dependency>
-      <dependency>
-        <groupId>org.apache.kylin</groupId>
-        <artifactId>kylin-core-dictionary</artifactId>
-        <version>${project.version}</version>
-      </dependency>
+<!--      <dependency>-->
+<!--        <groupId>org.apache.kylin</groupId>-->
+<!--        <artifactId>kylin-core-dictionary</artifactId>-->
+<!--        <version>${project.version}</version>-->
+<!--      </dependency>-->
       <dependency>
         <groupId>org.apache.kylin</groupId>
         <artifactId>kylin-core-cube</artifactId>
@@ -1530,7 +1530,7 @@
     <module>external</module>
     <module>core-common</module>
     <module>core-metadata</module>
-    <module>core-dictionary</module>
+<!--    <module>core-dictionary</module>-->
     <module>core-cube</module>
     <module>core-job</module>
     <module>core-storage</module>
@@ -1602,7 +1602,7 @@
 
   <profiles>
     <profile>
-      <id>sandbox</id>
+      <id>spark2</id>
       <activation>
         <activeByDefault>true</activeByDefault>
         <property>
@@ -1745,114 +1745,6 @@
         </plugins>
       </build>
     </profile>
-    <profile>
-      <id>cdh5.7</id>
-      <properties>
-        <hadoop2.version>2.6.0-cdh5.7.0</hadoop2.version>
-        <yarn.version>2.6.0-cdh5.7.0</yarn.version>
-        <hive.version>1.1.0-cdh5.7.0</hive.version>
-        <hive-hcatalog.version>1.1.0-cdh5.7.0</hive-hcatalog.version>
-        <hbase-hadoop2.version>1.2.0-cdh5.7.0</hbase-hadoop2.version>
-        <zookeeper.version>3.4.5-cdh5.7.0</zookeeper.version>
-      </properties>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-compiler-plugin</artifactId>
-            <configuration>
-              <fork>true</fork>
-              <meminitial>1024m</meminitial>
-              <maxmem>2048m</maxmem>
-            </configuration>
-          </plugin>
-
-    <plugin>
-      <groupId>org.apache.maven.plugins</groupId>
-      <artifactId>maven-dependency-plugin</artifactId>
-      <executions>
-        <execution>
-          <id>copy-jamm</id>
-          <goals>
-            <goal>copy</goal>
-          </goals>
-          <phase>generate-test-resources</phase>
-          <configuration>
-            <artifactItems>
-              <artifactItem>
-                <groupId>com.github.jbellis</groupId>
-                <artifactId>jamm</artifactId>
-                <outputDirectory>${project.build.testOutputDirectory}
-                </outputDirectory>
-                <destFileName>jamm.jar</destFileName>
-              </artifactItem>
-            </artifactItems>
-          </configuration>
-        </execution>
-      </executions>
-    </plugin>
-
-    <plugin>
-      <groupId>org.jacoco</groupId>
-      <artifactId>jacoco-maven-plugin</artifactId>
-      <configuration>
-        <append>true</append>
-        <destFile>
-          ${sonar.jacoco.reportPaths}
-        </destFile>
-      </configuration>
-      <executions>
-        <execution>
-          <id>pre-test</id>
-          <goals>
-            <goal>prepare-agent</goal>
-          </goals>
-          <configuration>
-            <propertyName>surefireArgLine</propertyName>
-          </configuration>
-        </execution>
-        <execution>
-          <id>post-test</id>
-          <phase>test</phase>
-          <goals>
-            <goal>report</goal>
-          </goals>
-        </execution>
-      </executions>
-    </plugin>
-    <plugin>
-      <groupId>org.apache.maven.plugins</groupId>
-      <artifactId>maven-surefire-plugin</artifactId>
-      <version>2.21.0</version>
-      <configuration>
-        <reportsDirectory>${project.basedir}/../target/surefire-reports
-        </reportsDirectory>
-        <excludes>
-          <exclude>**/IT*.java</exclude>
-          <exclude>org.apache.kylin.engine.spark2.NManualBuildAndQueryCuboidTest</exclude>
-          <exclude>org.apache.kylin.engine.spark2.NBuildAndQueryTest</exclude>
-          <exclude>org.apache.kylin.engine.spark2.NBadQueryAndPushDownTest</exclude>
-        </excludes>
-        <systemProperties>
-          <property>
-            <name>buildCubeUsingProvidedData</name>
-            <value>false</value>
-          </property>
-          <property>
-            <name>log4j.configuration</name>
-            <value>
-              file:${project.basedir}/../build/conf/kylin-tools-log4j.properties
-            </value>
-          </property>
-        </systemProperties>
-        <argLine>-javaagent:${project.build.testOutputDirectory}/jamm.jar
-          ${argLine} ${surefireArgLine}
-        </argLine>
-      </configuration>
-    </plugin>
-  </plugins>
-</build>
-</profile>
 <profile>
 <!-- This profile adds/overrides few features of the 'apache-release'
      profile in the parent pom. -->
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java b/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java
deleted file mode 100644
index ad2e20c..0000000
--- a/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.query.enumerator;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-import java.util.Set;
-
-import org.apache.calcite.linq4j.Enumerator;
-import org.apache.kylin.common.debug.BackdoorToggles;
-import org.apache.kylin.common.util.StringUtil;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.lookup.ILookupTable;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.metadata.project.RealizationEntry;
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.metadata.tuple.Tuple;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.schema.OLAPTable;
-import org.apache.kylin.storage.hybrid.HybridInstance;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class LookupTableEnumerator implements Enumerator<Object[]> {
-    private final static Logger logger = LoggerFactory.getLogger(LookupTableEnumerator.class);
-
-    private final ILookupTable lookupTable;
-    private final List<ColumnDesc> colDescs;
-    private final Object[] current;
-    private Iterator<String[]> iterator;
-
-    public LookupTableEnumerator(OLAPContext olapContext) {
-
-        //TODO: assuming LookupTableEnumerator is handled by a cube
-        CubeInstance cube = null;
-
-        if (olapContext.realization instanceof CubeInstance) {
-            cube = (CubeInstance) olapContext.realization;
-            ProjectInstance project = cube.getProjectInstance();
-            List<RealizationEntry> realizationEntries = project.getRealizationEntries();
-            String lookupTableName = olapContext.firstTableScan.getTableName();
-            CubeManager cubeMgr = CubeManager.getInstance(cube.getConfig());
-
-            // Make force hit cube in lookup table
-            String forceHitCubeName = BackdoorToggles.getForceHitCube();
-            if (!StringUtil.isEmpty(forceHitCubeName)) {
-                String forceHitCubeNameLower = forceHitCubeName.toLowerCase(Locale.ROOT);
-                String[] forceHitCubeNames = forceHitCubeNameLower.split(",");
-                final Set<String> forceHitCubeNameSet = new HashSet<String>(Arrays.asList(forceHitCubeNames));
-                cube = cubeMgr.findLatestSnapshot(
-                        (List<RealizationEntry>) realizationEntries.stream()
-                                .filter(x -> forceHitCubeNameSet.contains(x.getRealization().toLowerCase(Locale.ROOT))),
-                        lookupTableName, cube);
-                olapContext.realization = cube;
-            } else {
-                cube = cubeMgr.findLatestSnapshot(realizationEntries, lookupTableName, cube);
-                olapContext.realization = cube;
-            }
-        } else if (olapContext.realization instanceof HybridInstance) {
-            final HybridInstance hybridInstance = (HybridInstance) olapContext.realization;
-            final IRealization latestRealization = hybridInstance.getLatestRealization();
-            if (latestRealization instanceof CubeInstance) {
-                cube = (CubeInstance) latestRealization;
-            } else {
-                throw new IllegalStateException();
-            }
-        }
-
-        String lookupTableName = olapContext.firstTableScan.getTableName();
-        DimensionDesc dim = cube.getDescriptor().findDimensionByTable(lookupTableName);
-        if (dim == null)
-            throw new IllegalStateException("No dimension with derived columns found for lookup table " + lookupTableName + ", cube desc " + cube.getDescriptor());
-
-        CubeManager cubeMgr = CubeManager.getInstance(cube.getConfig());
-        this.lookupTable = cubeMgr.getLookupTable(cube.getLatestReadySegment(), dim.getJoin());
-
-        OLAPTable olapTable = (OLAPTable) olapContext.firstTableScan.getOlapTable();
-        this.colDescs = olapTable.getSourceColumns();
-        this.current = new Object[colDescs.size()];
-
-        reset();
-    }
-
-    @Override
-    public boolean moveNext() {
-        boolean hasNext = iterator.hasNext();
-        if (hasNext) {
-            String[] row = iterator.next();
-            for (int i = 0, n = colDescs.size(); i < n; i++) {
-                ColumnDesc colDesc = colDescs.get(i);
-                int colIdx = colDesc.getZeroBasedIndex();
-                if (colIdx >= 0) {
-                    current[i] = Tuple.convertOptiqCellValue(row[colIdx], colDesc.getUpgradedType().getName());
-                } else {
-                    current[i] = null; // fake column
-                }
-            }
-        }
-        return hasNext;
-    }
-
-    @Override
-    public Object[] current() {
-        // NOTE if without the copy, sql_lookup/query03.sql will yields messy result. Very weird coz other lookup queries are all good.
-        return Arrays.copyOf(current, current.length);
-    }
-
-    @Override
-    public void reset() {
-        this.iterator = lookupTable.iterator();
-    }
-
-    @Override
-    public void close() {
-        try {
-            lookupTable.close();
-        } catch (IOException e) {
-            logger.error("error when close lookup table", e);
-        }
-    }
-
-}
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
index c094ff5..6c0b5b1 100644
--- a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java
@@ -64,10 +64,6 @@ public class OLAPQuery extends AbstractEnumerable<Object[]> implements Enumerabl
         case OLAP:
             return BackdoorToggles.getPrepareOnly() ? new EmptyEnumerator()
                     : new OLAPEnumerator(olapContext, optiqContext);
-        case LOOKUP_TABLE:
-            return BackdoorToggles.getPrepareOnly() ? new EmptyEnumerator() : new LookupTableEnumerator(olapContext);
-        case COL_DICT:
-            return BackdoorToggles.getPrepareOnly() ? new EmptyEnumerator() : new DictionaryEnumerator(olapContext);
         case HIVE:
             return BackdoorToggles.getPrepareOnly() ? new EmptyEnumerator() : new HiveEnumerator(olapContext);
         default:
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
index bfd6c4d..6ab86a8 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
@@ -38,7 +38,6 @@ import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexProgram;
 import org.apache.calcite.rex.RexProgramBuilder;
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.metadata.filter.FilterOptimizeTransformer;
 import org.apache.kylin.metadata.filter.LogicalTupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter;
@@ -55,7 +54,7 @@ public class OLAPFilterRel extends Filter implements OLAPRel {
 
     ColumnRowType columnRowType;
     OLAPContext context;
-    boolean autoJustTimezone = KylinConfig.getInstanceFromEnv().getStreamingDerivedTimeTimezone().length() > 0;
+    boolean autoJustTimezone = false;
 
     public OLAPFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
         super(cluster, traits, child, condition);
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java b/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java
index ffff10b..dd79a4f 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java
@@ -37,7 +37,6 @@ import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.calcite.util.NlsString;
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.metadata.filter.CaseTupleFilter;
@@ -60,7 +59,6 @@ import java.util.GregorianCalendar;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TimeZone;
 
 public class TupleFilterVisitor extends RexVisitorImpl<TupleFilter> {
 
@@ -70,8 +68,7 @@ public class TupleFilterVisitor extends RexVisitorImpl<TupleFilter> {
 
     // is the fact table is a streamingv2 table
     private boolean autoJustByTimezone = false;
-    private static final long TIME_ZONE_OFFSET = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getStreamingDerivedTimeTimezone())
-            .getRawOffset();
+    private static final long TIME_ZONE_OFFSET = 0;
 
     public TupleFilterVisitor(ColumnRowType inputRowType) {
         super(true);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 00370e3..a8285ae 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -36,7 +36,7 @@ import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
+//import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.cuboid.TreeCuboidScheduler;
@@ -308,20 +308,20 @@ public class CubeController extends BasicController {
      *
      * @throws IOException
      */
-    @RequestMapping(value = "/{cubeName}/segs/{segmentName}/refresh_lookup", method = {
-            RequestMethod.PUT }, produces = { "application/json" })
-    @ResponseBody
-    public CubeInstance rebuildLookupSnapshot(@PathVariable String cubeName, @PathVariable String segmentName,
-            @RequestParam(value = "lookupTable") String lookupTable) {
-        try {
-            final CubeManager cubeMgr = cubeService.getCubeManager();
-            final CubeInstance cube = cubeMgr.getCube(cubeName);
-            return cubeService.rebuildLookupSnapshot(cube, segmentName, lookupTable);
-        } catch (IOException e) {
-            logger.error(e.getLocalizedMessage(), e);
-            throw new InternalErrorException(e.getLocalizedMessage(), e);
-        }
-    }
+//    @RequestMapping(value = "/{cubeName}/segs/{segmentName}/refresh_lookup", method = {
+//            RequestMethod.PUT }, produces = { "application/json" })
+//    @ResponseBody
+//    public CubeInstance rebuildLookupSnapshot(@PathVariable String cubeName, @PathVariable String segmentName,
+//            @RequestParam(value = "lookupTable") String lookupTable) {
+//        try {
+//            final CubeManager cubeMgr = cubeService.getCubeManager();
+//            final CubeInstance cube = cubeMgr.getCube(cubeName);
+//            return cubeService.rebuildLookupSnapshot(cube, segmentName, lookupTable);
+//        } catch (IOException e) {
+//            logger.error(e.getLocalizedMessage(), e);
+//            throw new InternalErrorException(e.getLocalizedMessage(), e);
+//        }
+//    }
 
     /**
      * Delete a cube segment
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index bd30109..dd45599 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -64,7 +64,7 @@ import org.apache.kylin.metadata.model.IStorageAware;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TableDesc;
+//import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.project.RealizationEntry;
@@ -592,20 +592,20 @@ public class CubeService extends BasicService implements InitializingBean {
         return getCubeManager().updateCube(update);
     }
 
-    public CubeInstance rebuildLookupSnapshot(CubeInstance cube, String segmentName, String lookupTable)
-            throws IOException {
-        aclEvaluate.checkProjectOperationPermission(cube);
-        Message msg = MsgPicker.getMsg();
-        TableDesc tableDesc = getTableManager().getTableDesc(lookupTable, cube.getProject());
-        if (tableDesc.isView()) {
-            throw new BadRequestException(
-                    String.format(Locale.ROOT, msg.getREBUILD_SNAPSHOT_OF_VIEW(), tableDesc.getName()));
-        }
-        CubeSegment seg = cube.getSegment(segmentName, SegmentStatusEnum.READY);
-        getCubeManager().buildSnapshotTable(seg, lookupTable, null);
-
-        return cube;
-    }
+//    public CubeInstance rebuildLookupSnapshot(CubeInstance cube, String segmentName, String lookupTable)
+//            throws IOException {
+//        aclEvaluate.checkProjectOperationPermission(cube);
+//        Message msg = MsgPicker.getMsg();
+//        TableDesc tableDesc = getTableManager().getTableDesc(lookupTable, cube.getProject());
+//        if (tableDesc.isView()) {
+//            throw new BadRequestException(
+//                    String.format(Locale.ROOT, msg.getREBUILD_SNAPSHOT_OF_VIEW(), tableDesc.getName()));
+//        }
+//        CubeSegment seg = cube.getSegment(segmentName, SegmentStatusEnum.READY);
+//        getCubeManager().buildSnapshotTable(seg, lookupTable, null);
+//
+//        return cube;
+//    }
 
     public CubeInstance deleteSegmentById(CubeInstance cube, String uuid) throws IOException {
         aclEvaluate.checkProjectWritePermission(cube);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
index 6f282ff..966fd1f 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -47,9 +47,6 @@ import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.dict.lookup.SnapshotManager;
-import org.apache.kylin.dict.lookup.SnapshotTable;
-import org.apache.kylin.engine.spark.source.CsvSource;
 import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.CsvColumnDesc;
@@ -63,8 +60,6 @@ import org.apache.kylin.rest.msg.MsgPicker;
 import org.apache.kylin.rest.response.TableDescResponse;
 import org.apache.kylin.rest.response.TableSnapshotResponse;
 import org.apache.kylin.rest.util.AclEvaluate;
-import org.apache.kylin.source.IReadableTable;
-import org.apache.kylin.source.IReadableTable.TableSignature;
 import org.apache.kylin.source.ISource;
 import org.apache.kylin.source.ISourceMetadataExplorer;
 import org.apache.kylin.source.SourceManager;
@@ -362,38 +357,32 @@ public class TableService extends BasicService {
 //    }
 
     public List<TableSnapshotResponse> getLookupTableSnapshots(String project, String tableName) throws IOException {
-        TableDesc tableDesc = getTableManager().getTableDesc(tableName, project);
-        if (SourceManager.getSource(tableDesc).getClass() == CsvSource.class) {
-            return new ArrayList<>();
-        }
-        IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc, null);
-        TableSignature signature = hiveTable.getSignature();
-        return internalGetLookupTableSnapshots(tableName, signature);
-    }
-
-    List<TableSnapshotResponse> internalGetLookupTableSnapshots(String tableName, TableSignature signature)
-            throws IOException {
-        SnapshotManager snapshotManager = SnapshotManager.getInstance(getConfig());
-        List<SnapshotTable> metaStoreTableSnapshots = snapshotManager.getSnapshots(tableName, signature);
-
-        Map<String, List<String>> snapshotUsageMap = getSnapshotUsages();
-
-        List<TableSnapshotResponse> result = Lists.newArrayList();
-
-        for (SnapshotTable metaStoreTableSnapshot : metaStoreTableSnapshots) {
-            TableSnapshotResponse response = new TableSnapshotResponse();
-            response.setSnapshotID(metaStoreTableSnapshot.getId());
-            response.setSnapshotType(TableSnapshotResponse.TYPE_INNER);
-            response.setLastBuildTime(metaStoreTableSnapshot.getLastBuildTime());
-            response.setStorageType(SnapshotTable.STORAGE_TYPE_METASTORE);
-            response.setSourceTableSize(metaStoreTableSnapshot.getSignature().getSize());
-            response.setSourceTableLastModifyTime(metaStoreTableSnapshot.getSignature().getLastModifiedTime());
-            response.setCubesAndSegmentsUsage(snapshotUsageMap.get(metaStoreTableSnapshot.getResourcePath()));
-            result.add(response);
-        }
-
-        return result;
+        return Lists.newArrayList();
     }
+//
+//    List<TableSnapshotResponse> internalGetLookupTableSnapshots(String tableName, TableSignature signature)
+//            throws IOException {
+//        SnapshotManager snapshotManager = SnapshotManager.getInstance(getConfig());
+//        List<SnapshotTable> metaStoreTableSnapshots = snapshotManager.getSnapshots(tableName, signature);
+//
+//        Map<String, List<String>> snapshotUsageMap = getSnapshotUsages();
+//
+//        List<TableSnapshotResponse> result = Lists.newArrayList();
+//
+//        for (SnapshotTable metaStoreTableSnapshot : metaStoreTableSnapshots) {
+//            TableSnapshotResponse response = new TableSnapshotResponse();
+//            response.setSnapshotID(metaStoreTableSnapshot.getId());
+//            response.setSnapshotType(TableSnapshotResponse.TYPE_INNER);
+//            response.setLastBuildTime(metaStoreTableSnapshot.getLastBuildTime());
+//            response.setStorageType(SnapshotTable.STORAGE_TYPE_METASTORE);
+//            response.setSourceTableSize(metaStoreTableSnapshot.getSignature().getSize());
+//            response.setSourceTableLastModifyTime(metaStoreTableSnapshot.getSignature().getLastModifiedTime());
+//            response.setCubesAndSegmentsUsage(snapshotUsageMap.get(metaStoreTableSnapshot.getResourcePath()));
+//            result.add(response);
+//        }
+//
+//        return result;
+//    }
 
     /**
      * @return Map of SnapshotID, CubeName or SegmentName list
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index 8ff5719..6236ef9 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Base64;
-import java.util.Objects;
 import java.util.Set;
 import java.util.Locale;
 import java.util.Collections;
@@ -93,15 +92,15 @@ public class HiveInputBase {
 
             // create global dict
             KylinConfig dictConfig = (flatDesc.getSegment()).getConfig();
-            String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumns();
-            if (mrHiveDictColumns.length > 0) {
-                String globalDictDatabase = dictConfig.getMrHiveDictDB();
-                if (null == globalDictDatabase) {
-                    throw new IllegalArgumentException("Mr-Hive Global dict database is null.");
-                }
-                String globalDictTable = cubeName + dictConfig.getMrHiveDictTableSuffix();
-                addStepPhase1_DoCreateMrHiveGlobalDict(jobFlow, mrHiveDictColumns, globalDictDatabase, globalDictTable);
-            }
+//            String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumns();
+//            if (mrHiveDictColumns.length > 0) {
+//                String globalDictDatabase = dictConfig.getMrHiveDictDB();
+//                if (null == globalDictDatabase) {
+//                    throw new IllegalArgumentException("Mr-Hive Global dict database is null.");
+//                }
+//                String globalDictTable = cubeName + dictConfig.getMrHiveDictTableSuffix();
+//                addStepPhase1_DoCreateMrHiveGlobalDict(jobFlow, mrHiveDictColumns, globalDictDatabase, globalDictTable);
+//            }
 
             // then count and redistribute
             if (cubeConfig.isHiveRedistributeEnabled()) {
@@ -289,13 +288,13 @@ public class HiveInputBase {
             deleteTables.add(getIntermediateTableIdentity());
 
             // mr-hive dict and inner table do not need delete hdfs
-            String[] mrHiveDicts = flatDesc.getSegment().getConfig().getMrHiveDictColumns();
-            if (Objects.nonNull(mrHiveDicts) && mrHiveDicts.length > 0) {
-                String dictDb = flatDesc.getSegment().getConfig().getMrHiveDictDB();
-                String tableName = dictDb + "." + flatDesc.getTableName() + "_"
-                        + MRHiveDictUtil.DictHiveType.GroupBy.getName();
-                deleteTables.add(tableName);
-            }
+//            String[] mrHiveDicts = flatDesc.getSegment().getConfig().getMrHiveDictColumns();
+//            if (Objects.nonNull(mrHiveDicts) && mrHiveDicts.length > 0) {
+//                String dictDb = flatDesc.getSegment().getConfig().getMrHiveDictDB();
+//                String tableName = dictDb + "." + flatDesc.getTableName() + "_"
+//                        + MRHiveDictUtil.DictHiveType.GroupBy.getName();
+//                deleteTables.add(tableName);
+//            }
             step.setIntermediateTables(deleteTables);
 
             step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir)));