You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/12/10 03:41:27 UTC

[kylin] 03/06: KYLIN-3625 Query engine for Parquet and apply CI for Parquet

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch kylin-on-parquet
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit c464a38d0a7864abf11e03e55d8b13038b451a12
Author: chao long <wa...@qq.com>
AuthorDate: Sat Sep 29 16:17:58 2018 +0800

    KYLIN-3625 Query engine for Parquet and apply CI for Parquet
---
 build/script/prepare-libs.sh                       |   2 +
 .../java/org/apache/kylin/common/KylinConfig.java  |  60 ++-
 .../java/org/apache/kylin/common/QueryContext.java |  50 +++
 .../org/apache/kylin/common/util/FileUtils.java    |  31 +-
 .../src/main/resources/kylin-defaults.properties   |   6 +-
 .../cube/gridtable/CuboidToGridTableMapping.java   |   7 +
 .../org/apache/kylin/cube/kv/RowKeyColumnIO.java   |   6 +-
 .../org/apache/kylin/cube/kv/RowKeyDecoder.java    |  12 +-
 .../apache/kylin/cube/kv/RowKeyDecoderParquet.java |  27 +-
 .../java/org/apache/kylin/gridtable/GTRecord.java  |  53 ++-
 .../org/apache/kylin/gridtable/GTScanRequest.java  |  33 +-
 .../kylin/gridtable/GTScanRequestBuilder.java      |   8 +-
 .../java/org/apache/kylin/gridtable/GTUtil.java    | 100 ++++-
 .../apache/kylin/measure/MeasureTypeFactory.java   |  11 +-
 .../kylin/measure/bitmap/BitmapAggregator.java     |  10 +-
 .../apache/kylin/measure/hllc/HLLCAggregator.java  |  10 +-
 .../measure/percentile/PercentileAggregator.java   |  10 +-
 .../measure/percentile/PercentileMeasureType.java  |   2 +
 .../apache/kylin/measure/raw/RawAggregator.java    |  11 +-
 .../apache/kylin/measure/topn/TopNAggregator.java  |  13 +-
 .../org/apache/kylin/measure/topn/TopNCounter.java |   5 +
 .../filter/BuiltInFunctionTupleFilter.java         |  40 ++
 .../kylin/metadata/filter/CaseTupleFilter.java     |  15 +
 .../kylin/metadata/filter/ColumnTupleFilter.java   |  11 +
 .../kylin/metadata/filter/CompareTupleFilter.java  |  26 ++
 .../kylin/metadata/filter/ConstantTupleFilter.java |  37 ++
 .../kylin/metadata/filter/DynamicTupleFilter.java  |   5 +
 .../kylin/metadata/filter/ExtractTupleFilter.java  |   5 +
 .../kylin/metadata/filter/LogicalTupleFilter.java  |  25 ++
 .../apache/kylin/metadata/filter/TupleFilter.java  |  45 ++
 .../metadata/filter/UDF/MassInTupleFilter.java     |   5 +
 .../metadata/filter/UnsupportedTupleFilter.java    |   5 +
 .../apache/kylin/metadata/model/ParameterDesc.java |   2 +-
 .../storage/gtrecord/CubeScanRangePlanner.java     |  15 +-
 .../apache/kylin/engine/mr/JobBuilderSupport.java  |   4 +
 .../kylin/engine/mr/steps/CuboidReducer.java       |   6 +-
 engine-spark/pom.xml                               |   5 +
 .../engine/spark/SparkBatchCubingJobBuilder2.java  |  10 +-
 .../kylin/engine/spark/SparkCubingByLayer.java     |  18 +-
 .../engine/spark/SparkCubingByLayerParquet.java    |  16 +-
 .../java/org/apache/spark/sql/KylinSession.scala   | 231 ++++++++++
 .../java/org/apache/spark/sql/SparderEnv.scala     | 114 +++++
 .../sql/hive/KylinHiveSessionStateBuilder.scala    |  64 +++
 .../org/apache/spark/sql/manager/UdfManager.scala  |  98 +++++
 .../org/apache/spark/sql/udf/SparderAggFun.scala   | 152 +++++++
 .../apache/spark/sql/util/SparderTypeUtil.scala    | 473 +++++++++++++++++++++
 .../org/apache/spark/util/KylinReflectUtils.scala  |  78 ++++
 .../main/java/org/apache/spark/util/XmlUtils.scala | 111 +++++
 .../test_case_data/webapps/META-INF/context.xml    |  38 ++
 kylin-it/jacoco-it.exec                            | Bin 0 -> 2432563 bytes
 kylin-it/pom.xml                                   | 246 ++++++-----
 .../org/apache/kylin/jdbc/ITJDBCDriver2Test.java   |  30 +-
 .../kylin/provision/BuildCubeWithEngine.java       |  83 ++--
 .../org/apache/kylin/query/ITCombination2Test.java |  80 ++++
 .../apache/kylin/query/ITFailfastQuery2Test.java   |  84 ++++
 .../org/apache/kylin/query/ITKylinQuery2Test.java  | 115 +++++
 .../org/apache/kylin/query/ITKylinQueryTest.java   |   8 +-
 kylin-test/pom.xml                                 |  32 ++
 .../main/java/org/apache/kylin/junit/EnvUtils.java |  64 +++
 .../org/apache/kylin/junit/SparkTestRunner.java    |  78 ++++
 .../junit/SparkTestRunnerRunnerWithParameters.java | 147 +++++++
 .../SparkTestRunnerWithParametersFactory.java      |  35 +-
 pom.xml                                            |  18 +
 .../apache/kylin/rest/init/InitialTaskManager.java |  26 ++
 server/pom.xml                                     |   7 +
 .../java/org/apache/kylin/rest/DebugTomcat.java    |  10 +
 server/src/main/webapp/META-INF/context.xml        |  38 ++
 storage-parquet/pom.xml                            | 159 +++++++
 .../kylin/storage/parquet/cube/CubeSparkRPC.java   |   9 +
 .../storage/parquet/cube/CubeStorageQuery.java     |   3 +-
 .../storage/parquet/spark/ParquetPayload.java      | 222 ++++++++++
 .../kylin/storage/parquet/spark/ParquetTask.java   | 308 ++++++++++++++
 .../storage/parquet/spark/SparkSubmitter.java      |  42 ++
 .../spark/gtscanner/ParquetRecordGTScanner.java    | 105 +++++
 .../gtscanner/ParquetRecordGTScanner4Cube.java     |  64 +++
 .../storage/parquet/steps/SparkCubeParquet.java    |  46 +-
 .../org/apache/kylin/ext/ClassLoaderUtils.java     |  89 ++++
 .../apache/kylin/ext/DebugTomcatClassLoader.java   | 152 +++++++
 .../java/org/apache/kylin/ext/ItClassLoader.java   | 175 ++++++++
 .../org/apache/kylin/ext/ItSparkClassLoader.java   | 189 ++++++++
 .../org/apache/kylin/ext/SparkClassLoader.java     | 236 ++++++++++
 .../org/apache/kylin/ext/TomcatClassLoader.java    | 190 +++++++++
 tool/pom.xml                                       |   4 +
 webapp/app/META-INF/context.xml                    |  38 ++
 84 files changed, 4910 insertions(+), 343 deletions(-)

diff --git a/build/script/prepare-libs.sh b/build/script/prepare-libs.sh
index 789a120..45c8150 100644
--- a/build/script/prepare-libs.sh
+++ b/build/script/prepare-libs.sh
@@ -32,6 +32,7 @@ rm -rf build/lib build/tool
 mkdir build/lib build/tool
 cp assembly/target/kylin-assembly-${version}-job.jar build/lib/kylin-job-${version}.jar
 cp storage-hbase/target/kylin-storage-hbase-${version}-coprocessor.jar build/lib/kylin-coprocessor-${version}.jar
+cp storage-parquet/target/kylin-storage-parquet-${version}-spark.jar build/lib/kylin-storage-parquet-${version}.jar
 cp jdbc/target/kylin-jdbc-${version}.jar build/lib/kylin-jdbc-${version}.jar
 cp tool-assembly/target/kylin-tool-assembly-${version}-assembly.jar build/tool/kylin-tool-${version}.jar
 cp datasource-sdk/target/kylin-datasource-sdk-${version}-lib.jar build/lib/kylin-datasource-sdk-${version}.jar
@@ -39,6 +40,7 @@ cp datasource-sdk/target/kylin-datasource-sdk-${version}-lib.jar build/lib/kylin
 # Copied file becomes 000 for some env (e.g. my Cygwin)
 chmod 644 build/lib/kylin-job-${version}.jar
 chmod 644 build/lib/kylin-coprocessor-${version}.jar
+chmod 644 build/lib/kylin-storage-parquet-${version}.jar
 chmod 644 build/lib/kylin-jdbc-${version}.jar
 chmod 644 build/tool/kylin-tool-${version}.jar
 chmod 644 build/lib/kylin-datasource-sdk-${version}.jar
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 4a86b76..f3d3a29 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -18,6 +18,16 @@
 
 package org.apache.kylin.common;
 
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.restclient.RestClient;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.FileUtils;
+import org.apache.kylin.common.util.OrderedProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
@@ -35,16 +45,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.restclient.RestClient;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.OrderedProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
 /**
  */
 public class KylinConfig extends KylinConfigBase {
@@ -555,4 +555,44 @@ public class KylinConfig extends KylinConfigBase {
             return this.base() == ((KylinConfig) another).base();
     }
 
+    public Map<String, String> getSparkConf() {
+        return getPropertiesByPrefix("kylin.storage.columnar.spark-conf.");
+    }
+
+    public String getColumnarSparkEnv(String conf) {
+        return getPropertiesByPrefix("kylin.storage.columnar.spark-env.").get(conf);
+    }
+
+    public boolean isParquetSeparateFsEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.storage.columnar.separate-fs-enable", "false"));
+    }
+
+    public String getParquetSeparateOverrideFiles() {
+        return getOptional("kylin.storage.columnar.separate-override-files",
+                "core-site.xml,hdfs-site.xml,yarn-site.xml");
+    }
+
+    public String getSparkCubeGTStorage() {
+        return getOptional("kylin.storage.parquet.gtstorage",
+                "org.apache.kylin.storage.parquet.cube.CubeSparkRPC");
+    }
+
+    public boolean isParquetSparkCleanCachedRDDAfterUse() {
+        return Boolean.parseBoolean(getOptional("kylin.storage.parquet.clean-cached-rdd-after-use", "false"));
+    }
+
+    public String sparderJars() {
+        try {
+            File storageFile = FileUtils.findFile(KylinConfigBase.getKylinHome() + "/lib",
+                    "kylin-storage-parquet-.*.jar");
+            String path1 = "";
+            if (storageFile != null) {
+                path1 = storageFile.getCanonicalPath();
+            }
+
+            return getOptional("kylin.query.parquet-additional-jars", path1);
+        } catch (IOException e) {
+            return "";
+        }
+    }
 }
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index a065a13..49ffab1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -19,10 +19,12 @@
 package org.apache.kylin.common;
 
 import java.io.Serializable;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -45,6 +47,21 @@ public class QueryContext {
         void stop(QueryContext query);
     }
 
+    private static final ThreadLocal<QueryContext> contexts = new ThreadLocal<QueryContext>() {
+        @Override
+        protected QueryContext initialValue() {
+            return new QueryContext();
+        }
+    };
+
+    public static QueryContext current() {
+        return contexts.get();
+    }
+
+    public static void reset() {
+        contexts.remove();
+    }
+
     private long queryStartMillis;
 
     private final String queryId;
@@ -55,6 +72,10 @@ public class QueryContext {
     private AtomicLong scannedBytes = new AtomicLong();
     private Object calcitePlan;
 
+    private boolean isHighPriorityQuery = false;
+    private Set<Future> allRunningTasks = new HashSet<>();
+    private boolean isTimeout;
+
     private AtomicBoolean isRunning = new AtomicBoolean(true);
     private volatile Throwable throwable;
     private String stopReason;
@@ -82,6 +103,35 @@ public class QueryContext {
         }
     }
 
+    public boolean isHighPriorityQuery() {
+        return isHighPriorityQuery;
+    }
+
+    public void markHighPriorityQuery() {
+        isHighPriorityQuery = true;
+    }
+
+    public void addRunningTasks(Future task) {
+        this.allRunningTasks.add(task);
+    }
+
+    public Set<Future> getAllRunningTasks() {
+        return allRunningTasks;
+    }
+
+    public void removeRunningTask(Future task) {
+        this.allRunningTasks.remove(task);
+    }
+
+    public boolean isTimeout() {
+        return isTimeout;
+    }
+
+    public void setTimeout(boolean timeout) {
+        isTimeout = timeout;
+    }
+
+
     public String getQueryId() {
         return queryId == null ? "" : queryId;
     }
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java b/core-common/src/main/java/org/apache/kylin/common/util/FileUtils.java
similarity index 52%
copy from storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
copy to core-common/src/main/java/org/apache/kylin/common/util/FileUtils.java
index 6a3ad59..dd9b9b5 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/FileUtils.java
@@ -16,28 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.kylin.storage.parquet.cube;
+package org.apache.kylin.common.util;
 
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase;
+import java.io.File;
 
-public class CubeStorageQuery extends GTCubeStorageQueryBase {
-
-    public CubeStorageQuery(CubeInstance cube) {
-        super(cube);
-    }
-
-    @Override
-    public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
-        return super.search(context, sqlDigest, returnTupleInfo);
-    }
-
-    @Override
-    protected String getGTStorage() {
+public final class FileUtils {
+    public static File findFile(String dir, String ptn) {
+        File[] files = new File(dir).listFiles();
+        if (files != null) {
+            for (File f : files) {
+                if (f.getName().matches(ptn))
+                    return f;
+            }
+        }
         return null;
     }
 }
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index 6238e44..8115a50 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -146,7 +146,7 @@ kylin.storage.partition.aggr-spill-enabled=true
 
 # The maximum number of bytes each coprocessor is allowed to scan.
 # To allow arbitrary large scan, you can set it to 0.
-kylin.storage.partition.max-scan-bytes=3221225472
+kylin.storage.partition.max-scan-bytes=0
 
 # The default coprocessor timeout is (hbase.rpc.timeout * 0.9) / 1000 seconds,
 # You can set it to a smaller value. 0 means use default.
@@ -361,8 +361,8 @@ kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=
 kylin.storage.columnar.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current
 kylin.storage.columnar.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
 #kylin.storage.columnar.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializer
-kylin.storage.columnar.spark-conf.spark.driver.memory=512m
-kylin.storage.columnar.spark-conf.spark.executor.memory=512m
+kylin.storage.columnar.spark-conf.spark.driver.memory=1g
+kylin.storage.columnar.spark-conf.spark.executor.memory=1g
 kylin.storage.columnar.spark-conf.spark.yarn.executor.memoryOverhead=512
 kylin.storage.columnar.spark-conf.yarn.am.memory=512m
 kylin.storage.columnar.spark-conf.spark.executor.cores=1
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
index 05256cc..9052e50 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
@@ -51,6 +51,7 @@ public class CuboidToGridTableMapping {
 
     private int nDimensions;
     private Map<TblColRef, Integer> dim2gt;
+    private Map<MeasureDesc, Integer> met2gt;
     private ImmutableBitSet gtPrimaryKey;
 
     private int nMetrics;
@@ -68,6 +69,7 @@ public class CuboidToGridTableMapping {
 
         // dimensions
         dim2gt = Maps.newHashMap();
+        met2gt = Maps.newHashMap();
         BitSet pk = new BitSet();
         for (TblColRef dimension : cuboid.getColumns()) {
             gtDataTypes.add(dimension.getType());
@@ -96,6 +98,7 @@ public class CuboidToGridTableMapping {
             // Ensure the holistic version if exists is always the first.
             FunctionDesc func = measure.getFunction();
             metrics2gt.put(func, gtColIdx);
+            met2gt.put(measure, gtColIdx);
             gtDataTypes.add(func.getReturnDataType());
 
             // map to column block
@@ -245,4 +248,8 @@ public class CuboidToGridTableMapping {
     public Map<TblColRef, Integer> getDim2gt() {
         return ImmutableMap.copyOf(dim2gt);
     }
+
+    public Map<MeasureDesc, Integer> getMet2gt() {
+        return ImmutableMap.copyOf(met2gt);
+    }
 }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
index b0efc91..79cc233 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
@@ -56,11 +56,15 @@ public class RowKeyColumnIO implements java.io.Serializable {
         dimEnc.encode(value, output, outputOffset);
     }
 
-    public String readColumnString(TblColRef col, byte[] bytes, int offset, int length) {
+    public String readColumnStringKeepDicValue(TblColRef col, byte[] bytes, int offset, int length) {
         DimensionEncoding dimEnc = dimEncMap.get(col);
         if (dimEnc instanceof DictionaryDimEnc)
             return String.valueOf(BytesUtil.readUnsigned(bytes, offset, length));
         return dimEnc.decode(bytes, offset, length);
     }
 
+    public String readColumnString(TblColRef col, byte[] bytes, int offset, int length) {
+        DimensionEncoding dimEnc = dimEncMap.get(col);
+        return dimEnc.decode(bytes, offset, length);
+    }
 }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
index 71ad4bf..fde7f33 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
@@ -36,12 +36,12 @@ import org.apache.kylin.metadata.model.TblColRef;
  */
 public class RowKeyDecoder {
 
-    private final CubeDesc cubeDesc;
-    private final RowKeyColumnIO colIO;
-    private final RowKeySplitter rowKeySplitter;
+    protected final CubeDesc cubeDesc;
+    protected final RowKeyColumnIO colIO;
+    protected final RowKeySplitter rowKeySplitter;
 
-    private Cuboid cuboid;
-    private List<String> values;
+    protected Cuboid cuboid;
+    protected List<String> values;
 
     public RowKeyDecoder(CubeSegment cubeSegment) {
         this.cubeDesc = cubeSegment.getCubeDesc();
@@ -76,7 +76,7 @@ public class RowKeyDecoder {
         this.cuboid = Cuboid.findForMandatory(cubeDesc, cuboidID);
     }
 
-    private void collectValue(TblColRef col, byte[] valueBytes, int offset, int length) throws IOException {
+    protected void collectValue(TblColRef col, byte[] valueBytes, int offset, int length) throws IOException {
         String strValue = colIO.readColumnString(col, valueBytes, offset, length);
         values.add(strValue);
     }
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoderParquet.java
similarity index 52%
copy from storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
copy to core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoderParquet.java
index 6a3ad59..9b1f1a5 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoderParquet.java
@@ -16,28 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.kylin.storage.parquet.cube;
+package org.apache.kylin.cube.kv;
 
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.metadata.model.TblColRef;
 
-public class CubeStorageQuery extends GTCubeStorageQueryBase {
+import java.io.IOException;
 
-    public CubeStorageQuery(CubeInstance cube) {
-        super(cube);
+public class RowKeyDecoderParquet extends RowKeyDecoder {
+    public RowKeyDecoderParquet(CubeSegment cubeSegment) {
+        super(cubeSegment);
     }
 
     @Override
-    public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
-        return super.search(context, sqlDigest, returnTupleInfo);
-    }
-
-    @Override
-    protected String getGTStorage() {
-        return null;
+    protected void collectValue(TblColRef col, byte[] valueBytes, int offset, int length) throws IOException {
+        String strValue = colIO.readColumnStringKeepDicValue(col, valueBytes, offset, length);
+        values.add(strValue);
     }
 }
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index b4a57c7..d7be088 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -18,14 +18,23 @@
 
 package org.apache.kylin.gridtable;
 
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Comparator;
-
+import com.google.common.base.Preconditions;
 import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.dimension.DictionaryDimEnc;
+import org.apache.kylin.measure.bitmap.BitmapSerializer;
+import org.apache.kylin.measure.dim.DimCountDistincSerializer;
+import org.apache.kylin.measure.extendedcolumn.ExtendedColumnSerializer;
+import org.apache.kylin.measure.hllc.HLLCSerializer;
+import org.apache.kylin.measure.percentile.PercentileSerializer;
+import org.apache.kylin.measure.raw.RawSerializer;
+import org.apache.kylin.measure.topn.TopNCounterSerializer;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
 
-import com.google.common.base.Preconditions;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Comparator;
 
 public class GTRecord implements Comparable<GTRecord> {
 
@@ -103,6 +112,40 @@ public class GTRecord implements Comparable<GTRecord> {
         return this;
     }
 
+    /** set record to the codes of specified values, reuse given space to hold the codes */
+    public GTRecord setValuesParquet(ImmutableBitSet selectedCols, ByteArray space, Object... values) {
+        assert selectedCols.cardinality() == values.length;
+
+        ByteBuffer buf = space.asBuffer();
+        int pos = buf.position();
+        for (int i = 0; i < selectedCols.trueBitCount(); i++) {
+            int c = selectedCols.trueBitAt(i);
+
+            DataTypeSerializer serializer = info.codeSystem.getSerializer(c);
+            if (serializer instanceof DictionaryDimEnc.DictionarySerializer) {
+                int len = serializer.peekLength(buf);
+                BytesUtil.writeUnsigned((Integer) values[i], len, buf);
+                int newPos = buf.position();
+                cols[c].reset(buf.array(), buf.arrayOffset() + pos, newPos - pos);
+                pos = newPos;
+            } else if (serializer instanceof TopNCounterSerializer ||
+                    serializer instanceof HLLCSerializer ||
+                    serializer instanceof BitmapSerializer ||
+                    serializer instanceof ExtendedColumnSerializer ||
+                    serializer instanceof PercentileSerializer ||
+                    serializer instanceof DimCountDistincSerializer ||
+                    serializer instanceof RawSerializer) {
+                cols[c].reset((byte[]) values[i], 0, ((byte[]) values[i]).length);
+            } else {
+                info.codeSystem.encodeColumnValue(c, values[i], buf);
+                int newPos = buf.position();
+                cols[c].reset(buf.array(), buf.arrayOffset() + pos, newPos - pos);
+                pos = newPos;
+            }
+        }
+        return this;
+    }
+
     /** decode and return the values of this record */
     public Object[] getValues() {
         return getValues(info.colAll, new Object[info.getColumnCount()]);
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index e3a2234..b6cf250 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -18,14 +18,9 @@
 
 package org.apache.kylin.gridtable;
 
-import java.io.IOException;
-import java.nio.BufferOverflowException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.commons.io.IOUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ByteArray;
@@ -45,9 +40,13 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 public class GTScanRequest {
 
@@ -71,6 +70,7 @@ public class GTScanRequest {
 
     // optional filtering
     private TupleFilter filterPushDown;
+    private String filterPushDownSQL;
     private TupleFilter havingFilterPushDown;
 
     // optional aggregation
@@ -95,7 +95,7 @@ public class GTScanRequest {
     GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, //
             ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, ImmutableBitSet rtAggrMetrics, //
             ImmutableBitSet dynamicCols, Map<Integer, TupleExpression> tupleExpressionMap, //
-            TupleFilter filterPushDown, TupleFilter havingFilterPushDown, //
+            TupleFilter filterPushDown, String filterPushDownSQL, TupleFilter havingFilterPushDown, //
             boolean allowStorageAggregation, double aggCacheMemThreshold, int storageScanRowNumThreshold, //
             int storagePushDownLimit, StorageLimitLevel storageLimitLevel, String storageBehavior, long startTime,
             long timeout) {
@@ -107,6 +107,7 @@ public class GTScanRequest {
         }
         this.columns = dimensions;
         this.filterPushDown = filterPushDown;
+        this.filterPushDownSQL = filterPushDownSQL;
         this.havingFilterPushDown = havingFilterPushDown;
 
         this.aggrGroupBy = aggrGroupBy;
@@ -318,6 +319,10 @@ public class GTScanRequest {
         return filterPushDown;
     }
 
+    public String getFilterPushDownSQL() {
+        return filterPushDownSQL;
+    }
+
     public TupleFilter getHavingFilterPushDown() {
         return havingFilterPushDown;
     }
@@ -445,6 +450,7 @@ public class GTScanRequest {
             BytesUtil.writeVLong(value.startTime, out);
             BytesUtil.writeVLong(value.timeout, out);
             BytesUtil.writeUTFString(value.storageBehavior, out);
+            BytesUtil.writeUTFString(value.filterPushDownSQL, out);
 
             // for dynamic related info
             ImmutableBitSet.serializer.serialize(value.dynamicCols, out);
@@ -499,6 +505,7 @@ public class GTScanRequest {
             long startTime = BytesUtil.readVLong(in);
             long timeout = BytesUtil.readVLong(in);
             String storageBehavior = BytesUtil.readUTFString(in);
+            String filterPushDownSQL = BytesUtil.readUTFString(in);
 
             ImmutableBitSet aDynCols = ImmutableBitSet.serializer.deserialize(in);
 
@@ -516,7 +523,7 @@ public class GTScanRequest {
                     .setAggrGroupBy(sAggGroupBy).setAggrMetrics(sAggrMetrics).setAggrMetricsFuncs(sAggrMetricFuncs)
                     .setRtAggrMetrics(aRuntimeAggrMetrics).setDynamicColumns(aDynCols)
                     .setExprsPushDown(sTupleExpressionMap)
-                    .setFilterPushDown(sGTFilter).setHavingFilterPushDown(sGTHavingFilter)
+                    .setFilterPushDown(sGTFilter).setFilterPushDownSQL(filterPushDownSQL).setHavingFilterPushDown(sGTHavingFilter)
                     .setAllowStorageAggregation(sAllowPreAggr).setAggCacheMemThreshold(sAggrCacheGB)
                     .setStorageScanRowNumThreshold(storageScanRowNumThreshold)
                     .setStoragePushDownLimit(storagePushDownLimit).setStorageLimitLevel(storageLimitLevel)
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
index 94a89e6..daf766d 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
@@ -33,6 +33,7 @@ public class GTScanRequestBuilder {
     private GTInfo info;
     private List<GTScanRange> ranges;
     private TupleFilter filterPushDown;
+    private String filterPushDownSQL;
     private TupleFilter havingFilterPushDown;
     private ImmutableBitSet dimensions;
     private ImmutableBitSet aggrGroupBy = null;
@@ -80,6 +81,11 @@ public class GTScanRequestBuilder {
         return this;
     }
 
+    public GTScanRequestBuilder setFilterPushDownSQL(String filterPushDownSQL) {
+        this.filterPushDownSQL = filterPushDownSQL;
+        return this;
+    }
+
     public GTScanRequestBuilder setHavingFilterPushDown(TupleFilter havingFilterPushDown) {
         this.havingFilterPushDown = havingFilterPushDown;
         return this;
@@ -180,7 +186,7 @@ public class GTScanRequestBuilder {
         this.timeout = timeout == -1 ? 300000 : timeout;
 
         return new GTScanRequest(info, ranges, dimensions, aggrGroupBy, aggrMetrics, aggrMetricsFuncs, rtAggrMetrics,
-                dynamicColumns, exprsPushDown, filterPushDown, havingFilterPushDown, allowStorageAggregation,
+                dynamicColumns, exprsPushDown, filterPushDown, filterPushDownSQL, havingFilterPushDown, allowStorageAggregation,
                 aggCacheMemThreshold, storageScanRowNumThreshold, storagePushDownLimit, storageLimitLevel,
                 storageBehavior, startTime, timeout);
     }
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
index 298225f..49c68c5 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
@@ -24,9 +24,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.collect.Lists;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
+import org.apache.kylin.dimension.AbstractDateDimEnc;
+import org.apache.kylin.dimension.DictionaryDimEnc;
+import org.apache.kylin.dimension.DimensionEncoding;
+import org.apache.kylin.dimension.FixedLenDimEnc;
+import org.apache.kylin.dimension.IntDimEnc;
+import org.apache.kylin.dimension.IntegerDimEnc;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
 import org.apache.kylin.metadata.expression.TupleExpression;
 import org.apache.kylin.metadata.expression.TupleExpressionSerializer;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
@@ -44,6 +52,8 @@ import org.apache.kylin.metadata.model.TblColRef;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import static org.apache.kylin.metadata.filter.ConstantTupleFilter.TRUE;
+
 public class GTUtil {
 
     private GTUtil(){}
@@ -76,8 +86,22 @@ public class GTUtil {
     }
 
     public static TupleFilter convertFilterColumnsAndConstants(TupleFilter rootFilter, GTInfo info, //
-            Map<TblColRef, Integer> colMapping, Set<TblColRef> unevaluatableColumnCollector) {
-        TupleFilter filter = convertFilter(rootFilter, info, colMapping, true, unevaluatableColumnCollector);
+                                                               Map<TblColRef, Integer> colMapping, Set<TblColRef> unevaluatableColumnCollector) {
+        return convertFilterColumnsAndConstants(rootFilter, info, colMapping, unevaluatableColumnCollector, false);
+    }
+
+    public static TupleFilter convertFilterColumnsAndConstants(TupleFilter rootFilter, GTInfo info, //
+            Map<TblColRef, Integer> colMapping, Set<TblColRef> unevaluatableColumnCollector, boolean isParquet) {
+        if (rootFilter == null) {
+            return null;
+        }
+
+        TupleFilter filter;
+        if (isParquet) {
+            filter = convertFilter(rootFilter, new GTConvertDecoratorParquet(unevaluatableColumnCollector, colMapping, info, true));
+        } else {
+            filter = convertFilter(rootFilter, info, colMapping, true, unevaluatableColumnCollector);
+        }
 
         // optimize the filter: after translating with dictionary, some filters become determined
         // e.g.
@@ -110,6 +134,20 @@ public class GTUtil {
         return TupleFilterSerializer.deserialize(bytes, filterCodeSystem);
     }
 
+    private static TupleFilter convertFilter(TupleFilter rootFilter, GTConvertDecorator decorator) {
+        rootFilter = decorator.onSerialize(rootFilter);
+        List<TupleFilter> newChildren = Lists.newArrayListWithCapacity(rootFilter.getChildren().size());
+        if (rootFilter.hasChildren()) {
+            for (TupleFilter childFilter : rootFilter.getChildren()) {
+                newChildren.add(convertFilter(childFilter, decorator));
+
+            }
+            rootFilter.removeAllChildren();
+            rootFilter.addChildren(newChildren);
+        }
+        return rootFilter;
+    }
+
     public static TupleExpression convertFilterColumnsAndConstants(TupleExpression rootExpression, GTInfo info,
             CuboidToGridTableMapping mapping, Set<TblColRef> unevaluatableColumnCollector) {
         Map<TblColRef, FunctionDesc> innerFuncMap = Maps.newHashMap();
@@ -183,6 +221,43 @@ public class GTUtil {
         };
     }
 
+    private static class GTConvertDecoratorParquet extends GTConvertDecorator {
+        public GTConvertDecoratorParquet(Set<TblColRef> unevaluatableColumnCollector, Map<TblColRef, Integer> colMapping,
+                                         GTInfo info, boolean encodeConstants) {
+            super(unevaluatableColumnCollector, colMapping, info, encodeConstants);
+        }
+
+        @Override
+        protected TupleFilter convertColumnFilter(ColumnTupleFilter columnFilter) {
+            return columnFilter;
+        }
+
+        @Override
+        protected Object translate(int col, Object value, int roundingFlag) {
+            try {
+                buf.clear();
+                DimensionEncoding dimEnc = info.codeSystem.getDimEnc(col);
+                info.codeSystem.encodeColumnValue(col, value, roundingFlag, buf);
+                DataTypeSerializer serializer = dimEnc.asDataTypeSerializer();
+                buf.flip();
+                if (dimEnc instanceof DictionaryDimEnc) {
+                    int id = BytesUtil.readUnsigned(buf, dimEnc.getLengthOfEncoding());
+                    return id;
+                } else if (dimEnc instanceof AbstractDateDimEnc) {
+                    return Long.valueOf((String)serializer.deserialize(buf));
+                } else if (dimEnc instanceof FixedLenDimEnc) {
+                    return serializer.deserialize(buf);
+                } else if (dimEnc instanceof IntegerDimEnc || dimEnc instanceof IntDimEnc) {
+                    return Integer.valueOf((String)serializer.deserialize(buf));
+                } else {
+                    return value;
+                }
+            } catch (IllegalArgumentException ex) {
+                return null;
+            }
+        }
+    }
+
     protected static class GTConvertDecorator implements TupleFilterSerializer.Decorator {
         protected final Set<TblColRef> unevaluatableColumnCollector;
         protected final Map<TblColRef, Integer> colMapping;
@@ -190,7 +265,7 @@ public class GTUtil {
         protected final boolean useEncodeConstants;
 
         public GTConvertDecorator(Set<TblColRef> unevaluatableColumnCollector, Map<TblColRef, Integer> colMapping,
-                GTInfo info, boolean encodeConstants) {
+                                  GTInfo info, boolean encodeConstants) {
             this.unevaluatableColumnCollector = unevaluatableColumnCollector;
             this.colMapping = colMapping;
             this.info = info;
@@ -213,20 +288,18 @@ public class GTUtil {
             // will always return FALSE.
             if (filter.getOperator() == FilterOperatorEnum.NOT && !TupleFilter.isEvaluableRecursively(filter)) {
                 TupleFilter.collectColumns(filter, unevaluatableColumnCollector);
-                return ConstantTupleFilter.TRUE;
+                return TRUE;
             }
 
             // shortcut for unEvaluatable filter
             if (!filter.isEvaluable()) {
                 TupleFilter.collectColumns(filter, unevaluatableColumnCollector);
-                return ConstantTupleFilter.TRUE;
+                return TRUE;
             }
 
             // map to column onto grid table
             if (colMapping != null && filter instanceof ColumnTupleFilter) {
-                ColumnTupleFilter colFilter = (ColumnTupleFilter) filter;
-                int gtColIdx = mapCol(colFilter.getColumn());
-                return new ColumnTupleFilter(info.colRef(gtColIdx));
+                return convertColumnFilter((ColumnTupleFilter) filter);
             }
 
             // encode constants
@@ -237,6 +310,11 @@ public class GTUtil {
             return filter;
         }
 
+        protected TupleFilter convertColumnFilter(ColumnTupleFilter columnFilter) {
+            int gtColIdx = mapCol(columnFilter.getColumn());
+            return new ColumnTupleFilter(info.colRef(gtColIdx));
+        }
+
         protected TupleFilter encodeConstants(CompareTupleFilter oldCompareFilter) {
             // extract ColumnFilter & ConstantFilter
             TblColRef externalCol = oldCompareFilter.getColumn();
@@ -261,7 +339,7 @@ public class GTUtil {
             int col = colMapping == null ? externalCol.getColumnDesc().getZeroBasedIndex() : mapCol(externalCol);
 
             TupleFilter result;
-            ByteArray code;
+            Object code;
 
             // translate constant into code
             switch (newCompareFilter.getOperator()) {
@@ -353,7 +431,7 @@ public class GTUtil {
             return result;
         }
 
-        private TupleFilter newCompareFilter(FilterOperatorEnum op, TblColRef col, ByteArray code) {
+        private TupleFilter newCompareFilter(FilterOperatorEnum op, TblColRef col, Object code) {
             CompareTupleFilter r = new CompareTupleFilter(op);
             r.addChild(new ColumnTupleFilter(col));
             r.addChild(new ConstantTupleFilter(code));
@@ -368,7 +446,7 @@ public class GTUtil {
 
         transient ByteBuffer buf;
 
-        protected ByteArray translate(int col, Object value, int roundingFlag) {
+        protected Object translate(int col, Object value, int roundingFlag) {
             try {
                 buf.clear();
                 info.codeSystem.encodeColumnValue(col, value, roundingFlag, buf);
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
index d16a705..621a13e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
@@ -18,10 +18,8 @@
 
 package org.apache.kylin.measure;
 
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigCannotInitException;
 import org.apache.kylin.measure.basic.BasicMeasureType;
@@ -38,8 +36,9 @@ import org.apache.kylin.metadata.model.FunctionDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
 
 /**
  * Factory for MeasureType.
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java
index 19fa49e..d57af48 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java
@@ -45,8 +45,14 @@ public class BitmapAggregator extends MeasureAggregator<BitmapCounter> {
 
     @Override
     public BitmapCounter aggregate(BitmapCounter value1, BitmapCounter value2) {
-        value1.orWith(value2);
-        return value1;
+        BitmapCounter merged = bitmapFactory.newBitmap();
+        if (value1 != null) {
+            merged.orWith(value1);
+        }
+        if (value2 != null) {
+            merged.orWith(value2);
+        }
+        return merged;
     }
 
     @Override
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java
index 4e09265..a134f92 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java
@@ -47,8 +47,14 @@ public class HLLCAggregator extends MeasureAggregator<HLLCounter> {
 
     @Override
     public HLLCounter aggregate(HLLCounter value1, HLLCounter value2) {
-        value1.merge(value2);
-        return value1;
+        if (value1 == null) {
+            return new HLLCounter(value2);
+        } else if (value2 == null) {
+            return new HLLCounter(value1);
+        }
+        HLLCounter result = new HLLCounter(value1);
+        result.merge(value2);
+        return result;
     }
 
     @Override
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggregator.java
index ef8896a..b7185e3 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggregator.java
@@ -43,8 +43,14 @@ public class PercentileAggregator extends MeasureAggregator<PercentileCounter> {
 
     @Override
     public PercentileCounter aggregate(PercentileCounter value1, PercentileCounter value2) {
-        value1.merge(value2);
-        return value1;
+        if (value1 == null) {
+            return new PercentileCounter(value2);
+        } else if (value2 == null) {
+            return new PercentileCounter(value1);
+        }
+        PercentileCounter merged = new PercentileCounter(value1);
+        merged.merge(value2);
+        return merged;
     }
 
     @Override
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileMeasureType.java
index 60b3282..9534ecf 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileMeasureType.java
@@ -108,4 +108,6 @@ public class PercentileMeasureType extends MeasureType<PercentileCounter> {
     public Map<String, Class<?>> getRewriteCalciteAggrFunctions() {
         return UDAF_MAP;
     }
+
+
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawAggregator.java
index 2ee36e3..9cc539a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawAggregator.java
@@ -21,6 +21,7 @@ package org.apache.kylin.measure.raw;
 import java.util.ArrayList;
 import java.util.List;
 
+import com.google.common.collect.Lists;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.measure.MeasureAggregator;
 
@@ -49,13 +50,15 @@ public class RawAggregator extends MeasureAggregator<List<ByteArray>> {
     @Override
     public List<ByteArray> aggregate(List<ByteArray> value1, List<ByteArray> value2) {
         if (value1 == null) {
-            return value2;
+            return Lists.newArrayList(value2);
         } else if (value2 == null) {
-            return value1;
+            return Lists.newArrayList(value1);
         }
 
-        value1.addAll(value2);
-        return value1;
+        List<ByteArray> result = new ArrayList<>(value1.size() + value2.size());
+        result.addAll(value1);
+        result.addAll(value2);
+        return result;
     }
 
     @Override
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java
index 34ceb9c..ec8bca7 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java
@@ -46,8 +46,17 @@ public class TopNAggregator extends MeasureAggregator<TopNCounter<ByteArray>> {
 
     @Override
     public TopNCounter<ByteArray> aggregate(TopNCounter<ByteArray> value1, TopNCounter<ByteArray> value2) {
-        value1.merge(value2);
-        return value1;
+        if (value1 == null) {
+            return new TopNCounter<>(value2);
+        } else if (value2 == null) {
+            return new TopNCounter<>(value1);
+        }
+        int thisCapacity = value1.getCapacity();
+        TopNCounter<ByteArray> aggregated = new TopNCounter<>(thisCapacity * 2);
+        aggregated.merge(value1);
+        aggregated.merge(value2);
+        aggregated.retain(thisCapacity);
+        return aggregated;
     }
 
     @Override
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
index 932248d..84debeb 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
@@ -59,6 +59,11 @@ public class TopNCounter<T> implements Iterable<Counter<T>>, java.io.Serializabl
         counterList = Lists.newLinkedList();
     }
 
+    public TopNCounter(TopNCounter another) {
+        this(another.capacity);
+        merge(another);
+    }
+
     public int getCapacity() {
         return capacity;
     }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/BuiltInFunctionTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/BuiltInFunctionTupleFilter.java
index 9082c1f..38cbd66 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/BuiltInFunctionTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/BuiltInFunctionTupleFilter.java
@@ -177,6 +177,46 @@ public class BuiltInFunctionTupleFilter extends FunctionTupleFilter {
     }
 
     @Override
+    public String toSparkSqlFilter() {
+        List<? extends TupleFilter> childFilter = this.getChildren();
+        String op = this.getName();
+        switch (op) {
+            case "LIKE":
+                assert childFilter.size() == 2;
+                return childFilter.get(0).toSparkSqlFilter() + toSparkFuncMap.get(op) + childFilter.get(1).toSparkSqlFilter();
+            case "||":
+                StringBuilder result = new StringBuilder().append(toSparkFuncMap.get(op)).append("(");
+                int index = 0;
+                for (TupleFilter filter : childFilter) {
+                    result.append(filter.toSparkSqlFilter());
+                    if (index < childFilter.size() - 1) {
+                        result.append(",");
+                    }
+                    index ++;
+                }
+                result.append(")");
+                return result.toString();
+            case "LOWER":
+            case "UPPER":
+            case "CHAR_LENGTH":
+                assert childFilter.size() == 1;
+                return toSparkFuncMap.get(op) + "(" + childFilter.get(0).toSparkSqlFilter() + ")";
+            case "SUBSTRING":
+                assert childFilter.size() == 3;
+                return toSparkFuncMap.get(op) + "(" + childFilter.get(0).toSparkSqlFilter() + "," + childFilter.get(1).toSparkSqlFilter() + "," + childFilter.get(2).toSparkSqlFilter() + ")";
+            default:
+                if (childFilter.size() == 1) {
+                    return op + "(" + childFilter.get(0).toSparkSqlFilter() + ")";
+                } else if (childFilter.size() == 2) {
+                    return childFilter.get(0).toSparkSqlFilter() + op + childFilter.get(1).toSparkSqlFilter();
+                } else if (childFilter.size() == 3) {
+                    return op + "(" + childFilter.get(0).toSparkSqlFilter() + "," + childFilter.get(1).toSparkSqlFilter() + "," + childFilter.get(2).toSparkSqlFilter() + ")";
+                }
+                throw new IllegalArgumentException("Operator " + op + " is not supported");
+        }
+    }
+
+    @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
         if (isReversed)
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java
index 9083212..4305557 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java
@@ -134,6 +134,21 @@ public class CaseTupleFilter extends TupleFilter implements IOptimizeableTupleFi
     }
 
     @Override
+    public String toSparkSqlFilter() {
+        String result = "(case ";
+        TupleFilter whenFilter;
+        TupleFilter thenFilter;
+        for (int i = 0; i < this.getWhenFilters().size(); i++) {
+            whenFilter = this.getWhenFilters().get(i);
+            thenFilter = this.getThenFilters().get(i);
+            result += " when " + whenFilter.toSparkSqlFilter() + " then " + thenFilter.toSparkSqlFilter();
+        }
+        result += " else " + this.getElseFilter().toSparkSqlFilter();
+        result += " end)";
+        return result;
+    }
+
+    @Override
     public TupleFilter acceptOptimizeTransformer(FilterOptimizeTransformer transformer) {
         List<TupleFilter> newChildren = Lists.newArrayList();
         for (TupleFilter child : this.getChildren()) {
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
index 6d3d541..09a16f5 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
@@ -46,12 +46,18 @@ public class ColumnTupleFilter extends TupleFilter {
     private TblColRef columnRef;
     private Object tupleValue;
     private List<Object> values;
+    private String colName;
 
     public ColumnTupleFilter(TblColRef column) {
+        this(column, null);
+    }
+
+    public ColumnTupleFilter(TblColRef column, String colName) {
         super(Collections.<TupleFilter> emptyList(), FilterOperatorEnum.COLUMN);
         this.columnRef = column;
         this.values = new ArrayList<Object>(1);
         this.values.add(null);
+        this.colName = colName;
     }
 
     public TblColRef getColumn() {
@@ -155,4 +161,9 @@ public class ColumnTupleFilter extends TupleFilter {
             this.columnRef = column.getRef();
         }
     }
+
+    @Override
+    public String toSparkSqlFilter() {
+        return this.columnRef.getTableAlias() + "_" + this.columnRef.getName();
+    }
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
index 1c1c409..b63ac0a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -279,6 +280,31 @@ public class CompareTupleFilter extends TupleFilter implements IOptimizeableTupl
     }
 
     @Override
+    public String toSparkSqlFilter() {
+        List<? extends TupleFilter> childFilter = this.getChildren();
+        switch (this.getOperator()) {
+            case EQ:
+            case NEQ:
+            case LT:
+            case GT:
+            case GTE:
+            case LTE:
+                assert childFilter.size() == 2;
+                return childFilter.get(0).toSparkSqlFilter() + toSparkOpMap.get(this.getOperator()) + childFilter.get(1).toSparkSqlFilter();
+            case IN:
+            case NOTIN:
+                assert childFilter.size() == 2;
+                return childFilter.get(0).toSparkSqlFilter() + toSparkOpMap.get(this.getOperator()) + "(" + childFilter.get(1).toSparkSqlFilter() + ")";
+            case ISNULL:
+            case ISNOTNULL:
+                assert childFilter.size() == 1;
+                return childFilter.get(0).toSparkSqlFilter() + toSparkOpMap.get(this.getOperator());
+            default:
+                throw new IllegalStateException("operator " + this.getOperator() + " not supported: ");
+        }
+    }
+
+    @Override
     public TupleFilter acceptOptimizeTransformer(FilterOptimizeTransformer transformer) {
         return transformer.visit(this);
     }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java
index e4f8b2e..e9ecf16 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java
@@ -21,6 +21,7 @@ package org.apache.kylin.metadata.filter;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
@@ -88,6 +89,10 @@ public class ConstantTupleFilter extends TupleFilter {
         return this.constantValues;
     }
 
+    public void setValues(List<Object> constantValues) {
+        this.constantValues = constantValues;
+    }
+
     @SuppressWarnings({ "unchecked", "rawtypes" })
     @Override
     public void serialize(IFilterCodeSystem cs, ByteBuffer buffer) {
@@ -108,6 +113,38 @@ public class ConstantTupleFilter extends TupleFilter {
         }
     }
 
+    @Override
+    public String toSparkSqlFilter() {
+        if (this.equals(TRUE)) {
+            return "true";
+        } else if (this.equals(FALSE)) {
+            return "false";
+        }
+
+        StringBuilder sb = new StringBuilder("");
+
+        if (constantValues.isEmpty()) {
+            sb.append("null");
+        } else {
+            for (Object value : constantValues) {
+                if (value == null) {
+                    sb.append("null");
+                }
+                if (value instanceof String) {
+                    sb.append("'" + value + "'");
+                } else {
+                    sb.append(value);
+                }
+                sb.append(",");
+            }
+        }
+        String result = sb.toString();
+        if (result.endsWith(",")) {
+            result = result.substring(0, result.length() - 1);
+        }
+        return result;
+    }
+
     @Override public boolean equals(Object o) {
         if (this == o)
             return true;
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java
index d9dc52a..c4490e5 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java
@@ -78,4 +78,9 @@ public class DynamicTupleFilter extends TupleFilter {
         this.variableName = BytesUtil.readUTFString(buffer);
     }
 
+    @Override
+    public String toSparkSqlFilter() {
+        return "1=1";
+    }
+
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
index 8c2ba94..36ea021 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
@@ -122,4 +122,9 @@ public class ExtractTupleFilter extends TupleFilter {
     public void deserialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
     }
 
+    @Override
+    public String toSparkSqlFilter() {
+        return "1=1";
+    }
+
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
index f0c825f..99cf3f3 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
@@ -155,6 +155,31 @@ public class LogicalTupleFilter extends TupleFilter implements IOptimizeableTupl
     }
 
     @Override
+    public String toSparkSqlFilter() {
+        StringBuilder result = new StringBuilder("");
+        switch (this.getOperator()) {
+            case AND:
+            case OR:
+                result.append("(");
+                String op = toSparkOpMap.get(this.getOperator());
+                int index = 0;
+                for (TupleFilter filter : this.getChildren()) {
+                    result.append(filter.toSparkSqlFilter());
+                    if (index < this.getChildren().size() - 1) {
+                        result.append(op);
+                    }
+                    index ++;
+                }
+                result.append(")");
+                break;
+            default:
+                throw new IllegalArgumentException("Operator " + this.getOperator() + " is not supported");
+        }
+
+        return result.toString();
+    }
+
+    @Override
     public TupleFilter acceptOptimizeTransformer(FilterOptimizeTransformer transformer) {
         List<TupleFilter> newChildren = Lists.newArrayList();
         for (TupleFilter child : this.getChildren()) {
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
index 672aba0..28a8c6c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
 import org.slf4j.Logger;
@@ -34,6 +35,19 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import static org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum.AND;
+import static org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum.EQ;
+import static org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum.GT;
+import static org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum.GTE;
+import static org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum.IN;
+import static org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum.ISNOTNULL;
+import static org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum.ISNULL;
+import static org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum.LT;
+import static org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum.LTE;
+import static org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum.NEQ;
+import static org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum.NOTIN;
+import static org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum.OR;
+
 /**
  * 
  * @author xjiang
@@ -43,6 +57,35 @@ public abstract class TupleFilter {
 
     static final Logger logger = LoggerFactory.getLogger(TupleFilter.class);
 
+    public static final Map<TupleFilter.FilterOperatorEnum, String> toSparkOpMap = ImmutableMap.<TupleFilter.FilterOperatorEnum, String>builder()
+            .put(EQ, " = ")
+            .put(NEQ, " != ")
+            .put(LT, " < ")
+            .put(GT, " > ")
+            .put(GTE, " >= ")
+            .put(LTE, " <= ")
+            .put(ISNULL, " is null")
+            .put(ISNOTNULL, " is not null")
+            .put(IN, " in ")
+            .put(NOTIN, " not in ")
+            .put(AND, " and ")
+            .put(OR, " or ")
+            .build();
+
+    //TODO all function mapping
+    public static final Map<String, String> toSparkFuncMap = ImmutableMap.<String, String>builder()
+            .put("LOWER", "LOWER")
+            .put("UPPER", "UPPER")
+            .put("CHAR_LENGTH", "LENGTH")
+            .put("SUBSTRING", "SUBSTRING")
+            .put("LIKE", " LIKE ")
+            .put("||", "CONCAT")
+            .build();
+
+    public void removeAllChildren() {
+        this.children.clear();
+    }
+
     public enum FilterOperatorEnum {
         EQ(1), NEQ(2), GT(3), LT(4), GTE(5), LTE(6), ISNULL(7), ISNOTNULL(8), IN(9), NOTIN(10), AND(20), OR(21), NOT(22), COLUMN(30), CONSTANT(31), DYNAMIC(32), EXTRACT(33), CASE(34), FUNCTION(35), MASSIN(36), EVAL_FUNC(37), UNSUPPORTED(38);
 
@@ -378,4 +421,6 @@ public abstract class TupleFilter {
         }
     }
 
+    public abstract String toSparkSqlFilter();
+
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInTupleFilter.java
index d0ff92e..b153003 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInTupleFilter.java
@@ -153,6 +153,11 @@ public class MassInTupleFilter extends FunctionTupleFilter {
         reverse = Boolean.parseBoolean(BytesUtil.readUTFString(buffer));
     }
 
+    @Override
+    public String toSparkSqlFilter() {
+        return "1=1";
+    }
+
     public static boolean containsMassInTupleFilter(TupleFilter filter) {
         if (filter == null)
             return false;
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UnsupportedTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UnsupportedTupleFilter.java
index 85605d4..143ee6d 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UnsupportedTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UnsupportedTupleFilter.java
@@ -56,4 +56,9 @@ public class UnsupportedTupleFilter extends TupleFilter {
     @Override
     public void deserialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
     }
+
+    @Override
+    public String toSparkSqlFilter() {
+        return "1=1";
+    }
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
index f757503..c3bb5a3 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
@@ -195,7 +195,7 @@ public class ParameterDesc implements Serializable {
      * 1. easy to compare without considering order
      * 2. easy to compare one by one
      */
-    private static class PlainParameter {
+    private static class PlainParameter implements Serializable{
         private String type;
         private String value;
         private TblColRef colRef = null;
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
index 229ef01..60fe33f 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
@@ -72,6 +72,8 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
     protected CubeSegment cubeSegment;
     protected CubeDesc cubeDesc;
     protected Cuboid cuboid;
+    protected String filterPushDownSQL;
+    protected CuboidToGridTableMapping mapping;
 
     public CubeScanRangePlanner(CubeSegment cubeSegment, Cuboid cuboid, TupleFilter filter, Set<TblColRef> dimensions, //
             Set<TblColRef> groupByDims, List<TblColRef> dynGroupsDims, List<TupleExpression> dynGroupExprs, //
@@ -87,7 +89,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
         this.cubeDesc = cubeSegment.getCubeDesc();
         this.cuboid = cuboid;
 
-        final CuboidToGridTableMapping mapping = context.getMapping();
+        mapping = context.getMapping();
 
         this.gtInfo = CubeGridTable.newGTInfo(cuboid, new CubeDimEncMap(cubeSegment), mapping);
 
@@ -98,13 +100,21 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
         this.rangeEndComparator = RecordComparators.getRangeEndComparator(comp);
         //start key GTRecord compare to stop key GTRecord
         this.rangeStartEndComparator = RecordComparators.getRangeStartEndComparator(comp);
-
         //replace the constant values in filter to dictionary codes
         Set<TblColRef> groupByPushDown = Sets.newHashSet(groupByDims);
         groupByPushDown.addAll(dynGroupsDims);
+
         this.gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, gtInfo, mapping.getDim2gt(), groupByPushDown);
+
+        TupleFilter convertedFilter = GTUtil.convertFilterColumnsAndConstants(filter, gtInfo, mapping.getDim2gt(), groupByPushDown, true);
+
         this.havingFilter = havingFilter;
 
+        if (convertedFilter != null) {
+            this.filterPushDownSQL = convertedFilter.toSparkSqlFilter();
+            logger.info("--filterPushDownSQL--: {}", this.filterPushDownSQL);
+        }
+
         this.gtDimensions = mapping.makeGridTableColumns(dimensions);
         this.gtAggrGroups = mapping.makeGridTableColumns(replaceDerivedColumns(groupByPushDown, cubeSegment.getCubeDesc()));
         this.gtAggrMetrics = mapping.makeGridTableColumns(metrics);
@@ -171,6 +181,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
             scanRequest = new GTScanRequestBuilder().setInfo(gtInfo).setRanges(scanRanges).setDimensions(gtDimensions)
                     .setAggrGroupBy(gtAggrGroups).setAggrMetrics(gtAggrMetrics).setAggrMetricsFuncs(gtAggrFuncs)
                     .setFilterPushDown(gtFilter)//
+                    .setFilterPushDownSQL(filterPushDownSQL)
                     .setRtAggrMetrics(gtRtAggrMetrics).setDynamicColumns(gtDynColumns)
                     .setExprsPushDown(tupleExpressionMap)//
                     .setAllowStorageAggregation(context.isNeedStorageAggregation())
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index d21638a..fd11c99 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -287,6 +287,10 @@ public class JobBuilderSupport {
         return getCuboidRootPath(seg.getLastBuildJobID());
     }
 
+    public String getCuboidRootPath() {
+        return getCuboidRootPath(seg.getLastBuildJobID());
+    }
+
     public void appendMapReduceParameters(StringBuilder buf) {
         appendMapReduceParameters(buf, JobEngineConfig.DEFAULT_JOB_CONF_SUFFIX);
     }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
index a7fa2cd..e0652ca 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  * @author George Song (ysong1)
- * 
+ *
  */
 public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> {
 
diff --git a/engine-spark/pom.xml b/engine-spark/pom.xml
index 26a3ad7..7bb4a52 100644
--- a/engine-spark/pom.xml
+++ b/engine-spark/pom.xml
@@ -83,6 +83,11 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-tomcat-ext</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 62ccf03..94333a2 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -18,9 +18,6 @@
 
 package org.apache.kylin.engine.spark;
 
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.StorageURL;
 import org.apache.kylin.common.util.StringUtil;
@@ -35,7 +32,8 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.kylin.metadata.model.IStorageAware.ID_PARQUET;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  */
@@ -144,10 +142,6 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
         StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars());
         sparkExecutable.setJars(jars.toString());
         sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_SPARK_CUBE);
-
-        if (seg.getStorageType() == ID_PARQUET) {
-            sparkExecutable.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES, getCounterOutputPath(jobId));
-        }
     }
 
     public String getSegmentMetadataUrl(KylinConfig kylinConfig, String jobId) {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 99e6a67..0d5ee37 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -17,14 +17,6 @@
 */
 package org.apache.kylin.engine.spark;
 
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
 import com.google.common.collect.Maps;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
@@ -69,13 +61,19 @@ import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.sql.SQLContext;
 import org.apache.spark.storage.StorageLevel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.Tuple2;
 
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
 /**
  * Spark application to build cube with the "by-layer" algorithm. Only support source data from Hive; Metadata in HBase.
  */
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerParquet.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerParquet.java
index d8fccf9..154c058 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerParquet.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerParquet.java
@@ -56,6 +56,8 @@ import org.apache.kylin.measure.basic.BasicMeasureType;
 import org.apache.kylin.measure.basic.BigDecimalIngester;
 import org.apache.kylin.measure.basic.DoubleIngester;
 import org.apache.kylin.measure.basic.LongIngester;
+import org.apache.kylin.measure.percentile.PercentileSerializer;
+import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.parquet.example.data.Group;
@@ -75,6 +77,7 @@ import scala.Tuple2;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
@@ -264,6 +267,8 @@ public class SparkCubingByLayerParquet extends SparkCubingByLayer {
         private Map<MeasureDesc, String> meaTypeMap;
         private GroupFactory factory;
         private BufferedMeasureCodec measureCodec;
+        private PercentileSerializer serializer;
+        private ByteBuffer byteBuffer;
 
         public GenerateGroupRDDFunction(String cubeName, String segmentId, String metaurl, SerializableConfiguration conf, Map<TblColRef, String> colTypeMap, Map<MeasureDesc, String> meaTypeMap) {
             this.cubeName = cubeName;
@@ -284,6 +289,8 @@ public class SparkCubingByLayerParquet extends SparkCubingByLayer {
             decoder = new RowKeyDecoder(cubeSegment);
             factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(conf.get()));
             measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures());
+            serializer = new PercentileSerializer(DataType.getType("percentile(100)"));
+
         }
 
         @Override
@@ -319,7 +326,7 @@ public class SparkCubingByLayerParquet extends SparkCubingByLayer {
             int valueOffset = 0;
             for (int i = 0; i < valueLengths.length; ++i) {
                 MeasureDesc measureDesc = measureDescs.get(i);
-                parseMeaValue(group, measureDesc, encodedBytes, valueOffset, valueLengths[i]);
+                parseMeaValue(group, measureDesc, encodedBytes, valueOffset, valueLengths[i], tuple._2[i]);
                 valueOffset += valueLengths[i];
             }
 
@@ -340,7 +347,7 @@ public class SparkCubingByLayerParquet extends SparkCubingByLayer {
             }
         }
 
-        private void parseMeaValue(final Group group, final MeasureDesc measureDesc, final byte[] value, final int offset, final int length) {
+        private void parseMeaValue(final Group group, final MeasureDesc measureDesc, final byte[] value, final int offset, final int length, final Object d) {
             switch (meaTypeMap.get(measureDesc)) {
                 case "long":
                     group.append(measureDesc.getName(), BytesUtil.readLong(value, offset, length));
@@ -348,6 +355,11 @@ public class SparkCubingByLayerParquet extends SparkCubingByLayer {
                 case "double":
                     group.append(measureDesc.getName(), ByteBuffer.wrap(value, offset, length).getDouble());
                     break;
+                case "decimal":
+                    BigDecimal decimal = (BigDecimal)d;
+                    decimal = decimal.setScale(4);
+                    group.append(measureDesc.getName(), Binary.fromConstantByteArray(decimal.unscaledValue().toByteArray()));
+                    break;
                 default:
                     group.append(measureDesc.getName(), Binary.fromConstantByteArray(value, offset, length));
                     break;
diff --git a/engine-spark/src/main/java/org/apache/spark/sql/KylinSession.scala b/engine-spark/src/main/java/org/apache/spark/sql/KylinSession.scala
new file mode 100644
index 0000000..20f86e5
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/spark/sql/KylinSession.scala
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import java.io.File
+import java.nio.file.Paths
+import java.util.Properties
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.kylin.common.KylinConfig
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.SparkSession.Builder
+import org.apache.spark.sql.internal.{SessionState, SharedState}
+import org.apache.spark.sql.manager.UdfManager
+import org.apache.spark.util.{KylinReflectUtils, XmlUtils}
+import org.apache.spark.{SparkConf, SparkContext}
+
+import scala.collection.JavaConverters._
+
+class KylinSession(
+                    @transient val sc: SparkContext,
+                    @transient private val existingSharedState: Option[SharedState])
+  extends SparkSession(sc) {
+
+  def this(sc: SparkContext) {
+    this(sc, None)
+  }
+
+//  @transient
+//  override lazy val sessionState: SessionState =
+//    KylinReflectUtils.getSessionState(sc, this).asInstanceOf[SessionState]
+
+  override def newSession(): SparkSession = {
+    new KylinSession(sparkContext, Some(sharedState))
+  }
+}
+
+object KylinSession extends Logging {
+
+  implicit class KylinBuilder(builder: Builder) {
+    def getOrCreateKylinSession(): SparkSession = synchronized {
+      val options =
+        getValue("options", builder)
+          .asInstanceOf[scala.collection.mutable.HashMap[String, String]]
+      val userSuppliedContext: Option[SparkContext] =
+        getValue("userSuppliedContext", builder)
+          .asInstanceOf[Option[SparkContext]]
+      var session: SparkSession = SparkSession.getActiveSession match {
+        case Some(sparkSession: KylinSession) =>
+          if ((sparkSession ne null) && !sparkSession.sparkContext.isStopped) {
+            options.foreach {
+              case (k, v) => sparkSession.sessionState.conf.setConfString(k, v)
+            }
+            sparkSession
+          } else {
+            null
+          }
+        case _ => null
+      }
+      if (session ne null) {
+        return session
+      }
+      // Global synchronization so we will only set the default session once.
+      SparkSession.synchronized {
+        // If the current thread does not have an active session, get it from the global session.
+        session = SparkSession.getDefaultSession match {
+          case Some(sparkSession: KylinSession) =>
+            if ((sparkSession ne null) && !sparkSession.sparkContext.isStopped) {
+              sparkSession
+            } else {
+              null
+            }
+          case _ => null
+        }
+        if (session ne null) {
+          return session
+        }
+        val sparkContext = userSuppliedContext.getOrElse {
+          // set app name if not given
+          val sparkConf = initSparkConf()
+          options.foreach { case (k, v) => sparkConf.set(k, v) }
+          val sc = SparkContext.getOrCreate(sparkConf)
+          // maybe this is an existing SparkContext, update its SparkConf which maybe used
+          // by SparkSession
+          sc
+        }
+        session = new KylinSession(sparkContext)
+        SparkSession.setDefaultSession(session)
+        sparkContext.addSparkListener(new SparkListener {
+          override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+            SparkSession.setDefaultSession(null)
+            SparkSession.sqlListener.set(null)
+          }
+        })
+        UdfManager.create(session)
+        session
+      }
+    }
+
+    def getValue(name: String, builder: Builder): Any = {
+      val currentMirror = scala.reflect.runtime.currentMirror
+      val instanceMirror = currentMirror.reflect(builder)
+      val m = currentMirror
+        .classSymbol(builder.getClass)
+        .toType
+        .members
+        .find { p =>
+          p.name.toString.equals(name)
+        }
+        .get
+        .asTerm
+      instanceMirror.reflectField(m).get
+    }
+
+    private lazy val kylinConfig: KylinConfig = KylinConfig.getInstanceFromEnv
+
+    def initSparkConf(): SparkConf = {
+      val sparkConf = new SparkConf()
+
+      kylinConfig.getSparkConf.asScala.foreach {
+        case (k, v) =>
+          sparkConf.set(k, v)
+      }
+      if (KylinConfig.getInstanceFromEnv.isParquetSeparateFsEnabled) {
+        logInfo("ParquetSeparateFs is enabled : begin override read cluster conf to sparkConf")
+        addReadConfToSparkConf(sparkConf)
+      }
+      val instances = sparkConf.get("spark.executor.instances").toInt
+      val cores = sparkConf.get("spark.executor.cores").toInt
+      val sparkCores = instances * cores
+      if (sparkConf.get("spark.sql.shuffle.partitions", "").isEmpty) {
+        sparkConf.set("spark.sql.shuffle.partitions", sparkCores.toString)
+      }
+      sparkConf.set("spark.sql.session.timeZone", "UTC")
+      sparkConf.set("spark.debug.maxToStringFields", "1000")
+      sparkConf.set("spark.scheduler.mode", "FAIR")
+      if (new File(KylinConfig.getKylinConfDir.getCanonicalPath + "/fairscheduler.xml")
+        .exists()) {
+        val fairScheduler = KylinConfig.getKylinConfDir.getCanonicalPath + "/fairscheduler.xml"
+        sparkConf.set("spark.scheduler.allocation.file", fairScheduler)
+      }
+
+      if (!"true".equalsIgnoreCase(System.getProperty("spark.local"))) {
+        if("yarn-client".equalsIgnoreCase(sparkConf.get("spark.master"))){
+          sparkConf.set("spark.yarn.dist.jars", kylinConfig.sparderJars)
+        } else {
+          sparkConf.set("spark.jars", kylinConfig.sparderJars)
+        }
+
+        val filePath = KylinConfig.getInstanceFromEnv.sparderJars
+          .split(",")
+          .filter(p => p.contains("storage-parquet"))
+          .apply(0)
+        val fileName = filePath.substring(filePath.lastIndexOf('/') + 1)
+        sparkConf.set("spark.executor.extraClassPath", fileName)
+      }
+
+      sparkConf
+    }
+
+
+    /**
+      * For R/W Splitting
+      *
+      * @param sparkConf
+      * @return
+      */
+    def addReadConfToSparkConf(sparkConf: SparkConf): SparkConf = {
+      val readHadoopConfDir = kylinConfig
+        .getColumnarSparkEnv("HADOOP_CONF_DIR")
+      if (!new File(readHadoopConfDir).exists()) {
+        throw new IllegalArgumentException(
+          "kylin.storage.columnar.spark-env.HADOOP_CONF_DIR not found: " + readHadoopConfDir)
+      }
+      overrideHadoopConfigToSparkConf(readHadoopConfDir, sparkConf)
+      changeStagingDir(sparkConf, readHadoopConfDir)
+      sparkConf
+    }
+
+    private def changeStagingDir(sparkConf: SparkConf, readHadoopConf: String) = {
+      val coreProperties: Properties =
+        XmlUtils.loadProp(readHadoopConf + "/core-site.xml")
+      val path = new Path(coreProperties.getProperty("fs.defaultFS"))
+      val homePath =
+        path.getFileSystem(new Configuration()).getHomeDirectory.toString
+      sparkConf.set("spark.yarn.stagingDir", homePath)
+    }
+
+    private def overrideHadoopConfigToSparkConf(
+                                                 readHadoopConf: String,
+                                                 sparkConf: SparkConf): Unit = {
+      val overrideFiles =
+        kylinConfig.getParquetSeparateOverrideFiles.split(",")
+      overrideFiles.foreach { configFile =>
+        logInfo("find config file : " + configFile)
+        val cleanedPath = Paths.get(readHadoopConf, configFile)
+        val properties: Properties =
+          XmlUtils.loadProp(cleanedPath.toString)
+        properties
+          .entrySet()
+          .asScala
+          .filter(_.getValue.asInstanceOf[String].nonEmpty)
+          .foreach { entry =>
+            logInfo("override : " + entry)
+            sparkConf.set(
+              "spark.hadoop." + entry.getKey.asInstanceOf[String],
+              entry.getValue.asInstanceOf[String])
+          }
+      }
+    }
+  }
+
+}
diff --git a/engine-spark/src/main/java/org/apache/spark/sql/SparderEnv.scala b/engine-spark/src/main/java/org/apache/spark/sql/SparderEnv.scala
new file mode 100644
index 0000000..ecaa5c8
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/spark/sql/SparderEnv.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import org.apache.spark.internal.Logging;
+import org.apache.spark.sql.KylinSession._
+import org.apache.kylin.ext.ClassLoaderUtils;
+
+object SparderEnv extends Logging {
+  @volatile
+  private var spark: SparkSession = _
+
+  def getSparkSession: SparkSession = withClassLoad {
+    if (spark == null || spark.sparkContext.isStopped) {
+      logInfo("Init spark.")
+      initSpark()
+    }
+    spark
+  }
+
+  def isSparkAvailable: Boolean = {
+    spark != null && !spark.sparkContext.isStopped
+  }
+
+  def restartSpark(): Unit = withClassLoad {
+    this.synchronized {
+      if (spark != null && !spark.sparkContext.isStopped) {
+        spark.stop()
+      }
+
+      logInfo("Restart Spark")
+      init()
+    }
+  }
+
+  def init(): Unit = withClassLoad {
+    getSparkSession
+  }
+
+  def getSparkConf(key: String): String = {
+    getSparkSession.sparkContext.conf.get(key)
+  }
+
+  def getActiveJobs(): Int = {
+    SparderEnv.getSparkSession.sparkContext.jobProgressListener.activeJobs.size
+  }
+
+  def getFailedJobs(): Int = {
+    SparderEnv.getSparkSession.sparkContext.jobProgressListener.failedJobs.size
+  }
+
+  def getAsyncResultCore: Int = {
+    val sparkConf = getSparkSession.sparkContext.getConf
+    val instances = sparkConf.get("spark.executor.instances").toInt
+    val cores = sparkConf.get("spark.executor.cores").toInt
+    Math.round(instances * cores / 3)
+  }
+
+  def initSpark(): Unit = withClassLoad {
+    this.synchronized {
+      if (spark == null || spark.sparkContext.isStopped) {
+        val sparkSession = System.getProperty("spark.local") match {
+          case "true" =>
+            SparkSession.builder
+              .master("local")
+              .appName("test-local-sql-context")
+//                        .enableHiveSupport()
+              .getOrCreateKylinSession()
+          case _ =>
+            SparkSession.builder
+              .appName("test-sql-context")
+              .enableHiveSupport()
+              .getOrCreateKylinSession()
+        }
+        spark = sparkSession
+
+        logInfo("Spark context started successfully with stack trace:")
+        logInfo(Thread.currentThread().getStackTrace.mkString("\n"))
+        logInfo("Class loader: " + Thread.currentThread().getContextClassLoader.toString)
+      }
+    }
+  }
+
+  /**
+    * To avoid spark being affected by the environment, we use spark classloader load spark.
+    *
+    * @param body Somewhere if you use spark
+    * @tparam T Action function
+    * @return The body return
+    */
+  def withClassLoad[T](body: => T): T = {
+    val originClassLoad = Thread.currentThread().getContextClassLoader
+    Thread.currentThread().setContextClassLoader(ClassLoaderUtils.getSparkClassLoader)
+    val t = body
+    Thread.currentThread().setContextClassLoader(originClassLoad)
+    t
+  }
+}
diff --git a/engine-spark/src/main/java/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala b/engine-spark/src/main/java/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala
new file mode 100644
index 0000000..c399381
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala
@@ -0,0 +1,64 @@
+/*
+ * Copyright (C) 2016 Kyligence Inc. All rights reserved.
+ *
+ * http://kyligence.io
+ *
+ * This software is the confidential and proprietary information of
+ * Kyligence Inc. ("Confidential Information"). You shall not disclose
+ * such Confidential Information and shall use it only in accordance
+ * with the terms of the license agreement you entered into with
+ * Kyligence Inc.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionState}
+
+/**
+  * hive session  hava some rule exp: find datasource table rule
+  *
+  * @param sparkSession
+  * @param parentState
+  */
+class KylinHiveSessionStateBuilder(sparkSession: SparkSession, parentState: Option[SessionState] = None)
+  extends HiveSessionStateBuilder(sparkSession, parentState) {
+  experimentalMethods.extraOptimizations = {
+      Seq()
+  }
+
+  private def externalCatalog: HiveExternalCatalog =
+    session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
+
+  override protected def newBuilder: NewBuilder =
+    new KylinHiveSessionStateBuilder(_, _)
+
+}
+
+/**
+  * use for no hive mode
+  *
+  * @param sparkSession
+  * @param parentState
+  */
+class KylinSessionStateBuilder(sparkSession: SparkSession, parentState: Option[SessionState] = None)
+  extends BaseSessionStateBuilder(sparkSession, parentState) {
+  experimentalMethods.extraOptimizations = {
+      Seq()
+  }
+
+  override protected def newBuilder: NewBuilder =
+    new KylinSessionStateBuilder(_, _)
+
+}
diff --git a/engine-spark/src/main/java/org/apache/spark/sql/manager/UdfManager.scala b/engine-spark/src/main/java/org/apache/spark/sql/manager/UdfManager.scala
new file mode 100644
index 0000000..499cbcd
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/spark/sql/manager/UdfManager.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.spark.sql.manager
+
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicReference
+
+import com.google.common.cache.{Cache, CacheBuilder, RemovalListener, RemovalNotification}
+import org.apache.kylin.metadata.datatype.DataType
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.udf.SparderAggFun
+
+class UdfManager(sparkSession: SparkSession) extends Logging {
+  private var udfCache: Cache[String, String] = _
+
+  udfCache = CacheBuilder.newBuilder
+    .maximumSize(100)
+    .expireAfterWrite(1, TimeUnit.HOURS)
+    .removalListener(new RemovalListener[String, String]() {
+      override def onRemoval(notification: RemovalNotification[String, String]): Unit = {
+        val func = notification.getKey
+        logInfo(s"remove function $func")
+      }
+    })
+    .build
+    .asInstanceOf[Cache[String, String]]
+
+  def destory(): Unit = {
+    udfCache.cleanUp()
+  }
+
+  def doRegister(dataType: DataType, funcName: String): String = {
+    val name = genKey(dataType, funcName)
+    val cacheFunc = udfCache.getIfPresent(name)
+    if (cacheFunc == null) {
+      udfCache.put(name, "")
+      sparkSession.udf.register(name, new SparderAggFun(funcName, dataType))
+    }
+    name
+  }
+
+  def genKey(dataType: DataType, funcName: String): String = {
+    dataType.toString
+      .replace("(", "_")
+      .replace(")", "_")
+      .replace(",", "_") + funcName
+  }
+
+}
+
+object UdfManager {
+
+  private val defaultManager = new AtomicReference[UdfManager]
+  private val defaultSparkSession: AtomicReference[SparkSession] =
+    new AtomicReference[SparkSession]
+
+  def refresh(sc: JavaSparkContext): Unit = {
+    val sparkSession = SparkSession.builder.config(sc.getConf).getOrCreate
+
+    defaultManager.get().destory()
+    create(sparkSession)
+  }
+
+  def create(sparkSession: SparkSession): Unit = {
+    val manager = new UdfManager(sparkSession)
+    defaultManager.set(manager)
+    defaultSparkSession.set(sparkSession)
+  }
+
+  def create(sc: JavaSparkContext): Unit = {
+    val sparkSession = SparkSession.builder.config(sc.getConf).getOrCreate
+    create(sparkSession)
+
+  }
+
+  def register(dataType: DataType, func: String): String = {
+    defaultManager.get().doRegister(dataType, func)
+  }
+
+}
diff --git a/engine-spark/src/main/java/org/apache/spark/sql/udf/SparderAggFun.scala b/engine-spark/src/main/java/org/apache/spark/sql/udf/SparderAggFun.scala
new file mode 100644
index 0000000..d84b31d
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/spark/sql/udf/SparderAggFun.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.spark.sql.udf
+
+import java.nio.ByteBuffer
+
+import org.apache.kylin.gridtable.GTInfo
+import org.apache.kylin.measure.MeasureAggregator
+import org.apache.kylin.metadata.datatype.DataTypeSerializer
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.SparderTypeUtil
+
+class SparderAggFun(funcName: String, dataTp: org.apache.kylin.metadata.datatype.DataType)
+  extends UserDefinedAggregateFunction
+    with Logging {
+
+  protected val _inputDataType = {
+    val schema = StructType(Seq(StructField("inputBinary", BinaryType)))
+    schema
+  }
+
+  protected val _bufferSchema: StructType = {
+    val schema = StructType(Seq(StructField("bufferBinary", BinaryType)))
+    schema
+  }
+
+  protected val _returnDataType: DataType =
+    SparderTypeUtil.kylinTypeToSparkResultType(dataTp)
+
+  protected var byteBuffer: ByteBuffer = null
+  protected var init = false
+  protected var gtInfo: GTInfo = _
+  protected var measureAggregator: MeasureAggregator[Any] = _
+  protected var colId: Int = _
+  protected var serializer: DataTypeSerializer[Any] = _
+  protected var aggregator: MeasureAggregator[Any] = _
+
+  var time = System.currentTimeMillis()
+
+  override def bufferSchema: StructType = _bufferSchema
+
+  override def inputSchema: StructType = _inputDataType
+
+  override def deterministic: Boolean = true
+
+
+  override def initialize(buffer: MutableAggregationBuffer): Unit = {
+    val isSum0 = (funcName == "$SUM0")
+    if (byteBuffer == null) {
+      serializer = DataTypeSerializer.create(dataTp).asInstanceOf[DataTypeSerializer[Any]]
+      byteBuffer = ByteBuffer.allocate(serializer.maxLength)
+    }
+
+    aggregator = MeasureAggregator
+      .create(if (isSum0) "COUNT" else funcName, dataTp)
+      .asInstanceOf[MeasureAggregator[Any]]
+
+    aggregator.reset()
+
+    val initVal = if (isSum0) {
+      // $SUM0 is the rewritten form of COUNT, which should return 0 instead of null in case of no input
+      byteBuffer.clear()
+      serializer.serialize(aggregator.getState, byteBuffer)
+      byteBuffer.array().slice(0, byteBuffer.position())
+    } else {
+      null
+    }
+    buffer.update(0, initVal)
+  }
+
+  val MAX_BUFFER_CAP: Int = 50 * 1024 * 1024
+
+  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+    merge(buffer, input)
+  }
+
+  override def merge(buffer: MutableAggregationBuffer, input: Row): Unit = {
+    if (!input.isNullAt(0)) {
+      try {
+        val byteArray = input.apply(0).asInstanceOf[Array[Byte]]
+        if (byteArray.length == 0) {
+          return
+        }
+        val oldValue = if(buffer.isNullAt(0)) null else serializer.deserialize(ByteBuffer.wrap(buffer.apply(0).asInstanceOf[Array[Byte]]))
+        val newValue = serializer.deserialize(ByteBuffer.wrap(byteArray))
+
+        val aggedValue = aggregator.aggregate(oldValue, newValue)
+
+        if (aggedValue != null) {
+          byteBuffer.clear()
+          serializer.serialize(aggedValue, byteBuffer)
+          buffer.update(0, byteBuffer.array().slice(0, byteBuffer.position()))
+        }
+      } catch {
+        case e: Exception =>
+          throw new Exception(
+            "error data is: " + input
+              .apply(0)
+              .asInstanceOf[Array[Byte]]
+              .mkString(","),
+            e)
+      }
+    }
+  }
+
+  override def evaluate(buffer: Row): Any = {
+    if (buffer.isNullAt(0)) {
+      null
+    } else {
+      val ret = dataTp.getName match {
+        case dt if dt.startsWith("percentile") => buffer.apply(0).asInstanceOf[Array[Byte]]
+        case "hllc" => buffer.apply(0).asInstanceOf[Array[Byte]]
+        case "bitmap" => buffer.apply(0).asInstanceOf[Array[Byte]]
+        case "dim_dc" => buffer.apply(0).asInstanceOf[Array[Byte]]
+        case "extendedcolumn" => buffer.apply(0).asInstanceOf[Array[Byte]]
+        case "raw" => buffer.apply(0).asInstanceOf[Array[Byte]]
+        case t if t startsWith "top" => buffer.apply(0).asInstanceOf[Array[Byte]]
+        case _ => null
+      }
+
+      if (ret != null)
+        ret
+      else
+        throw new IllegalArgumentException("unsupported function")
+    }
+  }
+
+  override def toString: String = {
+    s"SparderAggFun@$funcName${dataType.toString}"
+  }
+
+  override def dataType: DataType = _returnDataType
+}
diff --git a/engine-spark/src/main/java/org/apache/spark/sql/util/SparderTypeUtil.scala b/engine-spark/src/main/java/org/apache/spark/sql/util/SparderTypeUtil.scala
new file mode 100644
index 0000000..f2adbcc
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/spark/sql/util/SparderTypeUtil.scala
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.spark.sql.util
+
+import java.math.BigDecimal
+import java.sql.Timestamp
+import java.util.Locale
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.kylin.common.util.DateFormat
+import org.apache.kylin.metadata.datatype.DataType
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types._
+
+object SparderTypeUtil extends Logging {
+  val DATETIME_FAMILY = List("time", "date", "timestamp", "datetime")
+
+  def isDateTimeFamilyType(dataType: String): Boolean = {
+    DATETIME_FAMILY.contains(dataType.toLowerCase(Locale.ROOT))
+  }
+
+  def isDateType(dataType: String): Boolean = {
+    "date".equalsIgnoreCase(dataType)
+  }
+
+  def isDateTime(sqlTypeName: SqlTypeName): Boolean = {
+    SqlTypeName.DATETIME_TYPES.contains(sqlTypeName)
+  }
+
+  // scalastyle:off
+  def kylinTypeToSparkResultType(dataTp: DataType): org.apache.spark.sql.types.DataType = {
+    dataTp.getName match {
+      case tp if tp.startsWith("hllc") => BinaryType
+      case tp if tp.startsWith("top") => BinaryType
+      case tp if tp.startsWith("percentile") => BinaryType
+      case tp if tp.startsWith("bitmap") => BinaryType
+      case "decimal" => DecimalType(dataTp.getPrecision, dataTp.getScale)
+      case "date" => IntegerType
+      case "time" => LongType
+      case "timestamp" => LongType
+      case "datetime" => LongType
+      case "tinyint" => ByteType
+      case "smallint" => ShortType
+      case "integer" => IntegerType
+      case "int4" => IntegerType
+      case "bigint" => LongType
+      case "long8" => LongType
+      case "float" => FloatType
+      case "double" => DoubleType
+      case "real" => DoubleType
+      case tp if tp.startsWith("varchar") => StringType
+      case tp if tp.startsWith("char") => StringType
+      case "bitmap" => LongType
+      case "dim_dc" => BinaryType
+      case "boolean" => BooleanType
+      case "extendedcolumn" => BinaryType
+      case "raw" => BinaryType
+      case noSupport => throw new IllegalArgumentException(s"No supported data type: $noSupport")
+    }
+  }
+
+  // scalastyle:off
+  def kylinSQLTypeToSparkType(dataTp: DataType): org.apache.spark.sql.types.DataType = {
+    dataTp.getName match {
+      case "decimal" => DecimalType(dataTp.getPrecision, dataTp.getScale)
+      case "date" => DateType
+      case "time" => DateType
+      case "timestamp" => TimestampType
+      case "datetime" => DateType
+      case "tinyint" => ByteType
+      case "smallint" => ShortType
+      case "integer" => IntegerType
+      case "int4" => IntegerType
+      case "bigint" => LongType
+      case "long8" => LongType
+      case "float" => FloatType
+      case "double" => DoubleType
+      case "real" => DoubleType
+      case tp if tp.startsWith("varchar") => StringType
+      case tp if tp.startsWith("char") => StringType
+      case "bitmap" => LongType
+      case "dim_dc" => LongType
+      case "boolean" => BooleanType
+      case noSupport => throw new IllegalArgumentException(s"No supported data type: $noSupport")
+    }
+  }
+
+  // scalastyle:off
+  def convertSqlTypeNameToSparkType(sqlTypeName: SqlTypeName): String = {
+    sqlTypeName match {
+      case SqlTypeName.DECIMAL => "decimal"
+      case SqlTypeName.CHAR => "string"
+      case SqlTypeName.VARCHAR => "string"
+      case SqlTypeName.INTEGER => "int"
+      case SqlTypeName.TINYINT => "byte"
+      case SqlTypeName.SMALLINT => "short"
+      case SqlTypeName.BIGINT => "long"
+      case SqlTypeName.FLOAT => "float"
+      case SqlTypeName.DOUBLE => "double"
+      case SqlTypeName.DATE => "date"
+      case SqlTypeName.TIMESTAMP => "timestamp"
+      case SqlTypeName.BOOLEAN => "boolean"
+      case noSupport => throw new IllegalArgumentException(s"No supported data type: $noSupport")
+    }
+  }
+
+  // scalastyle:off
+  def kylinCubeDataTypeToSparkType(dataTp: DataType): org.apache.spark.sql.types.DataType = {
+    dataTp.getName match {
+      case "decimal" => DecimalType(dataTp.getPrecision, dataTp.getScale)
+      case "date" => DateType
+      case "time" => DateType
+      case "timestamp" => TimestampType
+      case "datetime" => DateType
+      case "tinyint" => ByteType
+      case "smallint" => ShortType
+      case "integer" => IntegerType
+      case "int4" => IntegerType
+      case "bigint" => LongType
+      case "long8" => LongType
+      case "float" => FloatType
+      case "double" => DoubleType
+      case "real" => DoubleType
+      case tp if tp.startsWith("varchar") => StringType
+      case tp if tp.startsWith("char") => StringType
+      case "bitmap" => LongType
+      case "dim_dc" => LongType
+      case "boolean" => BooleanType
+      case noSupport => throw new IllegalArgumentException(s"No supported data type: $noSupport")
+    }
+  }
+
+  // for reader
+  // scalastyle:off
+  def kylinDimensionDataTypeToSparkType(dataTp: String): org.apache.spark.sql.types.DataType = {
+    dataTp match {
+      case "string" => StringType
+      case "date" => LongType
+      case "time" => LongType
+      case "timestamp" => LongType
+      case "datetime" => LongType
+      case "tinyint" => ByteType
+      case "smallint" => ShortType
+      case "integer" => IntegerType
+      case "int4" => IntegerType
+      case "bigint" => LongType
+      case "long8" => LongType
+      case "float" => FloatType
+      case "double" => DoubleType
+      case "real" => DoubleType
+      case tp if tp.startsWith("varchar") => StringType
+      case tp if tp.startsWith("char") => StringType
+      case "bitmap" => LongType
+      case "dim_dc" => LongType
+      case "decimal" => DecimalType(19, 4)
+      case tp if tp.startsWith("decimal") && tp.contains("(") => {
+        try {
+          val precisionAndScale = tp.replace("decimal", "").replace("(", "").replace(")", "").split(",")
+          DataTypes.createDecimalType(precisionAndScale(0).toInt, precisionAndScale(1).toInt)
+        } catch {
+          case e: Exception =>
+            throw new IllegalArgumentException(s"Unsupported data type : $tp", e)
+        }
+      }
+      case "boolean" => BooleanType
+      case noSupport => throw new IllegalArgumentException(s"No supported data type: $noSupport")
+    }
+  }
+
+  // scalastyle:off
+  def kylinRawTableSQLTypeToSparkType(dataTp: DataType): org.apache.spark.sql.types.DataType = {
+    dataTp.getName match {
+      case "decimal" => DecimalType(dataTp.getPrecision, dataTp.getScale)
+      case "date" => DateType
+      case "time" => DateType
+      case "timestamp" => TimestampType
+      case "datetime" => DateType
+      case "tinyint" => ByteType
+      case "smallint" => ShortType
+      case "integer" => IntegerType
+      case "int4" => IntegerType
+      case "bigint" => LongType
+      case "long8" => LongType
+      case "float" => FloatType
+      case "double" => DoubleType
+      case "real" => DoubleType
+      case tp if tp.startsWith("char") => StringType
+      case tp if tp.startsWith("varchar") => StringType
+      case "bitmap" => LongType
+      case "dim_dc" => LongType
+      case "boolean" => BooleanType
+      case noSupport => throw new IllegalArgumentException(s"No supported data type: $noSupport")
+    }
+  }
+
+
+  def convertSqlTypeToSparkType(dt: RelDataType): org.apache.spark.sql.types.DataType = {
+    dt.getSqlTypeName match {
+      case SqlTypeName.DECIMAL => DecimalType(dt.getPrecision, dt.getScale)
+      case SqlTypeName.CHAR => StringType
+      case SqlTypeName.VARCHAR => StringType
+      case SqlTypeName.INTEGER => IntegerType
+      case SqlTypeName.TINYINT => ByteType
+      case SqlTypeName.SMALLINT => ShortType
+      case SqlTypeName.BIGINT => LongType
+      case SqlTypeName.FLOAT => FloatType
+      case SqlTypeName.DOUBLE => DoubleType
+      case SqlTypeName.DATE => DateType
+      case SqlTypeName.TIMESTAMP => TimestampType
+      case SqlTypeName.BOOLEAN => BooleanType
+      case noSupport => throw new IllegalArgumentException(s"No supported data type: $noSupport")
+    }
+  }
+
+  // scalastyle:off
+  def convertStringToValue(s: Any, rowType: RelDataType, toCalcite: Boolean): Any = {
+    val sqlTypeName = rowType.getSqlTypeName
+    if (s == null) {
+      null
+    } else if (s.toString.isEmpty) {
+      val a: Any = sqlTypeName match {
+        case SqlTypeName.DECIMAL => new java.math.BigDecimal(0)
+        case SqlTypeName.CHAR => s.toString
+        case SqlTypeName.VARCHAR => s.toString
+        case SqlTypeName.INTEGER => 0
+        case SqlTypeName.TINYINT => 0.toByte
+        case SqlTypeName.SMALLINT => 0.toShort
+        case SqlTypeName.BIGINT => 0L
+        case SqlTypeName.FLOAT => 0f
+        case SqlTypeName.DOUBLE => 0d
+        case SqlTypeName.DATE => 0
+        case SqlTypeName.TIMESTAMP => 0L
+        case SqlTypeName.TIME => 0L
+        case SqlTypeName.BOOLEAN => null;
+        case null => null
+        case _ => null
+      }
+    } else {
+      try {
+        val a: Any = sqlTypeName match {
+          case SqlTypeName.DECIMAL =>
+            if (s.isInstanceOf[java.lang.Double] || s
+              .isInstanceOf[java.lang.Float] || s.toString.contains(".")) {
+              new java.math.BigDecimal(s.toString)
+                .setScale(rowType.getScale, BigDecimal.ROUND_HALF_EVEN)
+            } else {
+              new java.math.BigDecimal(s.toString)
+            }
+          case SqlTypeName.CHAR => s.toString
+          case SqlTypeName.VARCHAR => s.toString
+          case SqlTypeName.INTEGER => s.toString.toInt
+          case SqlTypeName.TINYINT => s.toString.toByte
+          case SqlTypeName.SMALLINT => s.toString.toShort
+          case SqlTypeName.BIGINT => s.toString.toLong
+          case SqlTypeName.FLOAT => java.lang.Float.parseFloat(s.toString)
+          case SqlTypeName.DOUBLE => java.lang.Double.parseDouble(s.toString)
+          case SqlTypeName.DATE => {
+            // time over here is with timezone.
+            val string = s.toString
+            if (string.contains("-")) {
+              val time = DateFormat.stringToDate(string).getTime
+              if (toCalcite) {
+                (time / (3600 * 24 * 1000)).toInt
+              } else {
+                // ms to s
+                time / 1000
+              }
+            } else {
+              // should not come to here?
+              if (toCalcite) {
+                (toCalciteTimestamp(DateFormat.stringToMillis(string)) / (3600 * 24 * 1000)).toInt
+              } else {
+                DateFormat.stringToMillis(string)
+              }
+            }
+          }
+          case SqlTypeName.TIMESTAMP | SqlTypeName.TIME => {
+            var ts = s.asInstanceOf[Timestamp].getTime
+            if (toCalcite) {
+              ts
+            } else {
+              // ms to s
+              ts / 1000
+            }
+          }
+          case SqlTypeName.BOOLEAN => s;
+          case _ => s.toString
+        }
+        a
+      } catch {
+        case th: Throwable =>
+          logError(s"Error for convert value : $s , class: ${s.getClass}", th)
+          safetyConvertStringToValue(s, rowType, toCalcite)
+      }
+    }
+  }
+
+  def safetyConvertStringToValue(s: Any, rowType: RelDataType, toCalcite: Boolean): Any = {
+    try {
+      rowType.getSqlTypeName match {
+        case SqlTypeName.DECIMAL =>
+          if (s.isInstanceOf[java.lang.Double] || s
+            .isInstanceOf[java.lang.Float] || s.toString.contains(".")) {
+            new java.math.BigDecimal(s.toString)
+              .setScale(rowType.getScale, BigDecimal.ROUND_HALF_EVEN)
+          } else {
+            new java.math.BigDecimal(s.toString)
+          }
+        case SqlTypeName.CHAR => s.toString
+        case SqlTypeName.VARCHAR => s.toString
+        case SqlTypeName.INTEGER => s.toString.toDouble.toInt
+        case SqlTypeName.TINYINT => s.toString.toDouble.toByte
+        case SqlTypeName.SMALLINT => s.toString.toDouble.toShort
+        case SqlTypeName.BIGINT => s.toString.toDouble.toLong
+        case SqlTypeName.FLOAT => java.lang.Float.parseFloat(s.toString)
+        case SqlTypeName.DOUBLE => java.lang.Double.parseDouble(s.toString)
+        case SqlTypeName.DATE => {
+          // time over here is with timezone.
+          val string = s.toString
+          if (string.contains("-")) {
+            val time = DateFormat.stringToDate(string).getTime
+            if (toCalcite) {
+              (time / (3600 * 24 * 1000)).toInt
+            } else {
+              // ms to s
+              time / 1000
+            }
+          } else {
+            // should not come to here?
+            if (toCalcite) {
+              (toCalciteTimestamp(DateFormat.stringToMillis(string)) / (3600 * 24 * 1000)).toInt
+            } else {
+              DateFormat.stringToMillis(string)
+            }
+          }
+        }
+        case SqlTypeName.TIMESTAMP | SqlTypeName.TIME => {
+          var ts = s.asInstanceOf[Timestamp].getTime
+          if (toCalcite) {
+            ts
+          } else {
+            // ms to s
+            ts / 1000
+          }
+        }
+        case SqlTypeName.BOOLEAN => s;
+        case _ => s.toString
+      }
+    } catch {
+      case th: Throwable =>
+        throw new RuntimeException(s"Error for convert value : $s , class: ${s.getClass}", th)
+    }
+  }
+
+  // scalastyle:off
+  def convertStringToResultValue(s: Any, rowType: String, toCalcite: Boolean): Any = {
+    if (s == null) {
+      val a: Any = rowType match {
+        case "DECIMAL" => new java.math.BigDecimal(0)
+        case "CHAR" => null
+        case "VARCHAR" => null
+        case "INTEGER" => 0
+        case "TINYINT" => 0.toByte
+        case "SMALLINT" => 0.toShort
+        case "BIGINT" => 0L
+        case "FLOAT" => 0f
+        case "DOUBLE" => 0d
+        case "DATE" => 0
+        case "TIMESTAMP" => 0L
+        case "TIME" => 0L
+        case "BOOLEAN" => null
+        case null => null
+        case _ => null
+      }
+      a
+    } else {
+      try {
+        val a: Any = rowType match {
+          case "DECIMAL" => new java.math.BigDecimal(s.toString)
+          case "CHAR" => s.toString
+          case "VARCHAR" => s.toString
+          case "INTEGER" => s.toString.toInt
+          case "TINYINT" => s.toString.toByte
+          case "SMALLINT" => s.toString.toShort
+          case "BIGINT" => s.toString.toLong
+          case "FLOAT" => java.lang.Float.parseFloat(s.toString)
+          case "DOUBLE" => java.lang.Double.parseDouble(s.toString)
+          case "DATE" => {
+            if (toCalcite)
+              DateFormat.formatToDateStr(DateFormat.stringToMillis(s.toString))
+            else
+              DateFormat.stringToMillis(s.toString)
+          }
+          case "TIMESTAMP" | "TIME" =>
+            DateFormat.formatToTimeStr(s.asInstanceOf[Timestamp].getTime)
+          case "BOOLEAN" => s
+          case _ => s.toString
+        }
+        a
+      } catch {
+        case th: Throwable =>
+          logError(s"Error for convert value : $s , class: ${s.getClass}", th)
+          throw th
+      }
+    }
+  }
+
+  def convertRowToRow(
+                       rows: Iterator[Row],
+                       typeMap: Map[Int, String],
+                       separator: String): Iterator[String] = {
+    rows.map {
+      row =>
+        var rowIndex = 0
+        row.toSeq
+          .map {
+            cell => {
+              val rType = typeMap.apply(rowIndex)
+              val value =
+                SparderTypeUtil
+                  .convertStringToResultValue(cell, rType, toCalcite = true)
+
+              rowIndex = rowIndex + 1
+              if (value == null) {
+                ""
+              } else {
+                value
+              }
+            }
+          }
+          .mkString(separator)
+    }
+
+  }
+
+  // ms to second
+  def toSparkTimestamp(calciteTimestamp: Long): java.lang.Long = {
+    calciteTimestamp / 1000
+  }
+
+  // ms to microsecond, spark need micro sec.
+  def toSparkMicrosecond(calciteTimestamp: Long): java.lang.Long = {
+    calciteTimestamp * 1000
+  }
+
+  // ms to day
+  def toSparkDate(calciteTimestamp: Long): java.lang.Integer = {
+    (calciteTimestamp / 1000 / 3600 / 24).toInt
+  }
+
+  def toCalciteTimestamp(sparkTimestamp: Long): Long = {
+    sparkTimestamp * 1000
+  }
+
+}
diff --git a/engine-spark/src/main/java/org/apache/spark/util/KylinReflectUtils.scala b/engine-spark/src/main/java/org/apache/spark/util/KylinReflectUtils.scala
new file mode 100644
index 0000000..a139273
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/spark/util/KylinReflectUtils.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.lang.reflect.Field
+
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
+import org.apache.spark.{SPARK_VERSION, SparkContext}
+
+import scala.reflect.runtime._
+import scala.reflect.runtime.universe._
+
+object KylinReflectUtils {
+  private val rm = universe.runtimeMirror(getClass.getClassLoader)
+
+
+  def getSessionState(sparkContext: SparkContext, kylinSession: Object): Any = {
+    if (SPARK_VERSION.startsWith("2.2")) {
+      var className: String =
+        "org.apache.spark.sql.hive.KylinHiveSessionStateBuilder"
+      if (!"hive".equals(sparkContext.getConf
+        .get(CATALOG_IMPLEMENTATION.key, "in-memory"))) {
+        className = "org.apache.spark.sql.hive.KylinSessionStateBuilder"
+      }
+      val tuple = createObject(className, kylinSession, None)
+      val method = tuple._2.getMethod("build")
+      method.invoke(tuple._1)
+    } else {
+      throw new UnsupportedOperationException("Spark version not supported")
+    }
+  }
+
+  /**
+    * Returns the field val from a object through reflection.
+    *
+    * @param name - name of the field being retrieved.
+    * @param obj  - Object from which the field has to be retrieved.
+    * @tparam T
+    * @return
+    */
+  def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): Any = {
+    val im = rm.reflect(obj)
+
+    im.symbol.typeSignature.members.find(_.name.toString.equals(name))
+      .map(l => im.reflectField(l.asTerm).get).getOrElse(null)
+  }
+
+
+  def createObject(className: String, conArgs: Object*): (Any, Class[_]) = {
+    val clazz = Utils.classForName(className)
+    val ctor = clazz.getConstructors.head
+    ctor.setAccessible(true)
+    (ctor.newInstance(conArgs: _*), clazz)
+  }
+
+  def getObjectField(clazz: Class[_], filedName: String): Field = {
+    val field = clazz.getDeclaredField(filedName)
+    field.setAccessible(true)
+    field
+
+  }
+}
diff --git a/engine-spark/src/main/java/org/apache/spark/util/XmlUtils.scala b/engine-spark/src/main/java/org/apache/spark/util/XmlUtils.scala
new file mode 100644
index 0000000..6d3f0cd
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/spark/util/XmlUtils.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.{BufferedInputStream, File, FileInputStream, InputStream}
+import java.util.Properties
+
+import javax.xml.parsers.{DocumentBuilder, DocumentBuilderFactory}
+import org.apache.hadoop.util.StringInterner
+import org.slf4j.LoggerFactory
+import org.w3c.dom.{Document, Element, Text}
+
+import scala.collection.immutable.Range
+
+object XmlUtils {
+
+  private val logger = LoggerFactory.getLogger(XmlUtils.getClass)
+
+  def loadProp(path: String): Properties = {
+    val docBuilderFactory = DocumentBuilderFactory.newInstance
+    docBuilderFactory.setIgnoringComments(true)
+
+    //  allow includes in the xml file
+    docBuilderFactory.setNamespaceAware(true)
+    try docBuilderFactory.setXIncludeAware(true)
+    catch {
+      case e: UnsupportedOperationException =>
+    }
+    val builder = docBuilderFactory.newDocumentBuilder
+    var doc: Document = null
+    var root: Element = null
+    val properties = new Properties()
+    val file = new File(path).getAbsoluteFile
+    if (file.exists) {
+      doc = parse(builder, new BufferedInputStream(new FileInputStream(file)), path)
+    } else {
+      throw new RuntimeException("File not found in path: " + path)
+    }
+    root = doc.getDocumentElement
+    if (!("configuration" == root.getTagName)) {
+      logger.error("bad conf file: top-level element not <configuration>")
+    }
+
+    val props = root.getChildNodes
+    for (i <- Range(0, props.getLength)) {
+      val propNode = props.item(i)
+
+      propNode match {
+        case prop: Element =>
+          if ("configuration".equals(prop.getTagName())) {}
+          if (!"property".equals(prop.getTagName())) {
+            logger.warn("bad conf file: element not <property>")
+          }
+
+          val fields = prop.getChildNodes
+          var attr: String = null
+          var value: String = null
+          var finalParameter: Boolean = false
+          val source: java.util.LinkedList[String] =
+            new java.util.LinkedList[String]
+          for (j <- Range(0, fields.getLength)) {
+            val fieldNode = fields.item(j)
+            fieldNode match {
+              case field: Element =>
+                if ("name" == field.getTagName && field.hasChildNodes) {
+                  attr =
+                    StringInterner.weakIntern(field.getFirstChild.asInstanceOf[Text].getData.trim)
+                }
+                if ("value" == field.getTagName && field.hasChildNodes) {
+                  value = StringInterner.weakIntern(field.getFirstChild.asInstanceOf[Text].getData)
+                }
+              case _ =>
+            }
+            if (attr != null && value != null) {
+              properties.setProperty(attr, value)
+            }
+          }
+        case _ =>
+      }
+
+    }
+    properties
+  }
+
+  def parse(builder: DocumentBuilder, is: InputStream, systemId: String): Document = {
+    if (is == null) {
+      return null
+    }
+    try {
+      builder.parse(is, systemId)
+    } finally {
+      is.close()
+    }
+  }
+}
diff --git a/examples/test_case_data/webapps/META-INF/context.xml b/examples/test_case_data/webapps/META-INF/context.xml
new file mode 100644
index 0000000..0ad90dc
--- /dev/null
+++ b/examples/test_case_data/webapps/META-INF/context.xml
@@ -0,0 +1,38 @@
+<?xml version='1.0' encoding='utf-8'?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~  
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~  
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<!-- The contents of this file will be loaded for each web application -->
+<Context allowLinking="true">
+
+    <!-- Default set of monitored resources -->
+    <WatchedResource>WEB-INF/web.xml</WatchedResource>
+
+    <!-- Uncomment this to disable session persistence across Tomcat restarts -->
+    <!--
+    <Manager pathname="" />
+    -->
+
+    <!-- Uncomment this to enable Comet connection tacking (provides events
+         on session expiration as well as webapp lifecycle) -->
+    <!--
+    <Valve className="org.apache.catalina.valves.CometConnectionManagerValve" />
+    -->
+
+    <Loader loaderClass="org.apache.kylin.ext.DebugTomcatClassLoader"/>
+
+</Context>
diff --git a/kylin-it/jacoco-it.exec b/kylin-it/jacoco-it.exec
new file mode 100644
index 0000000..5af2bbf
Binary files /dev/null and b/kylin-it/jacoco-it.exec differ
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index 18bb1d9..36cbc33 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -96,6 +96,10 @@
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-query</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-test</artifactId>
+        </dependency>
 
         <!-- Env & Test -->
 
@@ -338,6 +342,27 @@
         </dependency>
     </dependencies>
 
+    <profiles>
+        <profile>
+            <id>hbaseStorage</id>
+            <properties>
+                <storageType>2</storageType>
+                <excludeTest>**/ITCombination2Test.java,**/ITFailfastQuery2Test.java,**/ITJDBCDriver2Test.java</excludeTest>
+            </properties>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+        </profile>
+
+        <profile>
+            <id>parquetStorage</id>
+            <properties>
+                <storageType>4</storageType>
+                <excludeTest>**/ITCombinationTest.java,**/ITFailfastQueryTest.java,**/ITJDBCDriverTest.java,**/ITAclTableMigrationToolTest.java</excludeTest>
+            </properties>
+        </profile>
+    </profiles>
+
     <build>
         <plugins>
             <plugin>
@@ -361,123 +386,114 @@
                     </execution>
                 </executions>
             </plugin>
-        </plugins>
-    </build>
 
-    <profiles>
-        <profile>
-            <id>sandbox</id>
-            <activation>
-                <activeByDefault>true</activeByDefault>
-            </activation>
-            <build>
-                <plugins>
-                    <plugin>
-                        <groupId>org.apache.maven.plugins</groupId>
-                        <artifactId>maven-failsafe-plugin</artifactId>
-                        <executions>
-                            <execution>
-                                <id>integration-tests</id>
-                                <goals>
-                                    <goal>integration-test</goal>
-                                </goals>
-                            </execution>
-                            <execution>
-                                <id>verify</id>
-                                <goals>
-                                    <goal>verify</goal>
-                                </goals>
-                            </execution>
-                        </executions>
+            <!--CI plugins-->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>integration-tests</id>
+                        <goals>
+                            <goal>integration-test</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>verify</id>
+                        <goals>
+                            <goal>verify</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <excludes>
+                        <exclude>**/*$*</exclude>
+                        <exclude>${excludeTest}</exclude>
+                    </excludes>
+                    <systemProperties>
+                        <property>
+                            <name>log4j.configuration</name>
+                            <value>
+                                file:${project.basedir}/..//build/conf/kylin-tools-log4j.properties
+                            </value>
+                        </property>
+                    </systemProperties>
+                    <argLine>-Xms1G -Xmx2G -XX:PermSize=128M -XX:MaxPermSize=512M
+                        -Dkylin.server.cluster-servers=localhost:7070
+                        -javaagent:${project.basedir}/..//dev-support/jacocoagent.jar=includes=org.apache.kylin.*,output=file,destfile=jacoco-it.exec
+                    </argLine>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>exec-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>build_cube_with_engine</id>
+                        <goals>
+                            <goal>exec</goal>
+                        </goals>
+                        <phase>pre-integration-test</phase>
                         <configuration>
-                            <excludes>
-                                <exclude>**/*$*</exclude>
-                            </excludes>
-                            <systemProperties>
-                                <property>
-                                    <name>log4j.configuration</name>
-                                    <value>
-                                        file:${project.basedir}/..//build/conf/kylin-tools-log4j.properties
-                                    </value>
-                                </property>
-                            </systemProperties>
-                            <argLine>-Xms1G -Xmx2G -XX:PermSize=128M -XX:MaxPermSize=512M
-                                -Dkylin.server.cluster-servers=localhost:7070
-                                -javaagent:${project.basedir}/..//dev-support/jacocoagent.jar=includes=org.apache.kylin.*,output=file,destfile=jacoco-it.exec
-                            </argLine>
+                            <skip>${skipTests}</skip>
+                            <classpathScope>test</classpathScope>
+                            <executable>java</executable>
+                            <arguments>
+                                <argument>-Dhdp.version=${hdp.version}</argument>
+                                <argument>-DfastBuildMode=${fastBuildMode}</argument>
+                                <argument>
+                                    -DbuildCubeUsingProvidedData=${buildCubeUsingProvidedData}
+                                </argument>
+                                <argument>-DengineType=${engineType}</argument>
+                                <argument>-DstorageType=${storageType}</argument>
+                                <argument>
+                                    -Dlog4j.configuration=file:${project.basedir}/..//build/conf/kylin-tools-log4j.properties
+                                </argument>
+                                <argument>
+                                    -javaagent:${project.basedir}/..//dev-support/jacocoagent.jar=includes=org.apache.kylin.*,output=file,destfile=jacoco-it-engine.exec
+                                </argument>
+                                <argument>-classpath</argument>
+                                <classpath />
+                                <argument>org.apache.kylin.provision.BuildCubeWithEngine
+                                </argument>
+                            </arguments>
+                            <workingDirectory>${project.basedir}</workingDirectory>
                         </configuration>
-                    </plugin>
-                    <plugin>
-                        <groupId>org.codehaus.mojo</groupId>
-                        <artifactId>exec-maven-plugin</artifactId>
-                        <executions>
-                            <execution>
-                                <id>build_cube_with_engine</id>
-                                <goals>
-                                    <goal>exec</goal>
-                                </goals>
-                                <phase>pre-integration-test</phase>
-                                <configuration>
-                                    <skip>${skipTests}</skip>
-                                    <classpathScope>test</classpathScope>
-                                    <executable>java</executable>
-                                    <arguments>
-                                        <argument>-Dhdp.version=${hdp.version}</argument>
-                                        <argument>-DfastBuildMode=${fastBuildMode}</argument>
-                                        <argument>
-                                            -DbuildCubeUsingProvidedData=${buildCubeUsingProvidedData}
-                                        </argument>
-                                        <argument>-DengineType=${engineType}</argument>
-                                        <argument>
-                                            -Dlog4j.configuration=file:${project.basedir}/..//build/conf/kylin-tools-log4j.properties
-                                        </argument>
-                                        <argument>
-                                            -javaagent:${project.basedir}/..//dev-support/jacocoagent.jar=includes=org.apache.kylin.*,output=file,destfile=jacoco-it-engine.exec
-                                        </argument>
-                                        <argument>-classpath</argument>
-                                        <classpath />
-                                        <argument>org.apache.kylin.provision.BuildCubeWithEngine
-                                        </argument>
-                                    </arguments>
-                                    <workingDirectory>${project.basedir}</workingDirectory>
-                                </configuration>
-                            </execution>
-                            <execution>
-                                <id>build_cube_with_stream</id>
-                                <goals>
-                                    <goal>exec</goal>
-                                </goals>
-                                <phase>pre-integration-test</phase>
-                                <configuration>
-                                    <skip>${skipTests}</skip>
-                                    <classpathScope>test</classpathScope>
-                                    <executable>java</executable>
-                                    <arguments>
-                                        <argument>-Dhdp.version=${hdp.version}</argument>
-                                        <argument>-DfastBuildMode=${fastBuildMode}</argument>
-                                        <argument>
-                                            -DbuildCubeUsingProvidedData=${buildCubeUsingProvidedData}
-                                        </argument>
-                                        <argument>
-                                            -Dlog4j.configuration=file:${project.basedir}/..//build/conf/kylin-tools-log4j.properties
-                                        </argument>
-                                        <argument>
-                                            -javaagent:${project.basedir}/..//dev-support/jacocoagent.jar=includes=org.apache.kylin.*,output=file,destfile=jacoco-it-stream.exec
-                                        </argument>
-                                        <argument>-classpath</argument>
-                                        <classpath />
-                                        <argument>org.apache.kylin.provision.BuildCubeWithStream
-                                        </argument>
-                                    </arguments>
-                                    <workingDirectory>${project.basedir}</workingDirectory>
-                                </configuration>
-                            </execution>
-                        </executions>
-                    </plugin>
-
-                </plugins>
-            </build>
-        </profile>
-    </profiles>
+                    </execution>
+                    <execution>
+                        <id>build_cube_with_stream</id>
+                        <goals>
+                            <goal>exec</goal>
+                        </goals>
+                        <phase>pre-integration-test</phase>
+                        <configuration>
+                            <skip>${skipTests}</skip>
+                            <classpathScope>test</classpathScope>
+                            <executable>java</executable>
+                            <arguments>
+                                <argument>-Dhdp.version=${hdp.version}</argument>
+                                <argument>-DfastBuildMode=${fastBuildMode}</argument>
+                                <argument>
+                                    -DbuildCubeUsingProvidedData=${buildCubeUsingProvidedData}
+                                </argument>
+                                <argument>
+                                    -Dlog4j.configuration=file:${project.basedir}/..//build/conf/kylin-tools-log4j.properties
+                                </argument>
+                                <argument>
+                                    -javaagent:${project.basedir}/..//dev-support/jacocoagent.jar=includes=org.apache.kylin.*,output=file,destfile=jacoco-it-stream.exec
+                                </argument>
+                                <argument>-classpath</argument>
+                                <classpath />
+                                <argument>org.apache.kylin.provision.BuildCubeWithStream
+                                </argument>
+                            </arguments>
+                            <workingDirectory>${project.basedir}</workingDirectory>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
 
+        </plugins>
+    </build>
 </project>
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java b/kylin-it/src/test/java/org/apache/kylin/jdbc/ITJDBCDriver2Test.java
similarity index 51%
copy from storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
copy to kylin-it/src/test/java/org/apache/kylin/jdbc/ITJDBCDriver2Test.java
index 6a3ad59..558e046 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
+++ b/kylin-it/src/test/java/org/apache/kylin/jdbc/ITJDBCDriver2Test.java
@@ -14,30 +14,16 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- */
-
-package org.apache.kylin.storage.parquet.cube;
+*/
 
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase;
+package org.apache.kylin.jdbc;
 
-public class CubeStorageQuery extends GTCubeStorageQueryBase {
+import org.apache.kylin.junit.SparkTestRunner;
+import org.junit.runner.RunWith;
 
-    public CubeStorageQuery(CubeInstance cube) {
-        super(cube);
-    }
-
-    @Override
-    public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
-        return super.search(context, sqlDigest, returnTupleInfo);
-    }
+/**
+ */
+@RunWith(SparkTestRunner.class)
+public class ITJDBCDriver2Test extends ITJDBCDriverTest {
 
-    @Override
-    protected String getGTStorage() {
-        return null;
-    }
 }
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index ec5bc35..9bd741c 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -18,25 +18,9 @@
 
 package org.apache.kylin.provision;
 
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -74,9 +58,24 @@ import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 public class BuildCubeWithEngine {
 
@@ -86,6 +85,7 @@ public class BuildCubeWithEngine {
     protected ExecutableManager jobService;
     private static boolean fastBuildMode = false;
     private static int engineType;
+    private static int storageType;
 
     private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithEngine.class);
 
@@ -131,10 +131,14 @@ public class BuildCubeWithEngine {
         String specifiedEngineType = System.getProperty("engineType");
         if (StringUtils.isNotEmpty(specifiedEngineType)) {
             engineType = Integer.parseInt(specifiedEngineType);
-        } else {
-            engineType = 2;
         }
 
+        String specifiedStorageType = System.getProperty("storageType");
+        if (StringUtils.isNotEmpty(specifiedEngineType)) {
+            storageType = Integer.parseInt(specifiedStorageType);
+        }
+        logger.info("==storageType: " + specifiedStorageType);
+
         System.setProperty(KylinConfig.KYLIN_CONF, confDir);
         System.setProperty("SPARK_HOME", "/usr/local/spark"); // need manually create and put spark to this folder on Jenkins
         System.setProperty("kylin.hadoop.conf.dir", confDir);
@@ -195,6 +199,9 @@ public class BuildCubeWithEngine {
         }
 
         cubeDescManager = CubeDescManager.getInstance(kylinConfig);
+
+        // update enginType and storageTpye
+        updateCubeDesc("ci_inner_join_cube", "ci_left_join_cube");
     }
 
     public void after() {
@@ -353,12 +360,30 @@ public class BuildCubeWithEngine {
         return doBuildAndMergeOnCube(cubeName);
     }
 
-    @SuppressWarnings("unused")
     private void updateCubeEngineType(String cubeName) throws IOException {
-        CubeDesc cubeDesc = cubeDescManager.getCubeDesc(cubeName);
-        if (cubeDesc.getEngineType() != engineType) {
-            cubeDesc.setEngineType(engineType);
-            cubeDescManager.updateCubeDesc(cubeDesc);
+        if (engineType != 0) {
+            CubeDesc cubeDesc = cubeDescManager.getCubeDesc(cubeName);
+            if (cubeDesc.getEngineType() != engineType) {
+                cubeDesc.setEngineType(engineType);
+                cubeDescManager.updateCubeDesc(cubeDesc);
+            }
+        }
+    }
+
+    private void updateCubeStorageType(String cubeName) throws IOException {
+        if (storageType != 0) {
+            CubeDesc cubeDesc = cubeDescManager.getCubeDesc(cubeName);
+            if (cubeDesc.getStorageType() != storageType) {
+                cubeDesc.setStorageType(storageType);
+                cubeDescManager.updateCubeDesc(cubeDesc);
+            }
+        }
+    }
+
+    private void updateCubeDesc(String... cubeNames) throws IOException {
+        for (String cubeName : cubeNames) {
+            updateCubeEngineType(cubeName);
+            updateCubeStorageType(cubeName);
         }
     }
 
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITCombination2Test.java b/kylin-it/src/test/java/org/apache/kylin/query/ITCombination2Test.java
new file mode 100644
index 0000000..f266c34
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITCombination2Test.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.kylin.junit.SparkTestRunnerWithParametersFactory;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.query.routing.Candidate;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+/**
+ */
+@RunWith(Parameterized.class)
+@Parameterized.UseParametersRunnerFactory(SparkTestRunnerWithParametersFactory.class)
+public class ITCombination2Test extends ITKylinQuery2Test {
+
+    private static final Logger logger = LoggerFactory.getLogger(ITCombination2Test.class);
+
+    @BeforeClass
+    public static void setUp() {
+        logger.info("setUp in ITCombination2Test");
+    }
+
+    @AfterClass
+    public static void tearDown() {
+        logger.info("tearDown in ITCombination2Test");
+        clean();
+        Candidate.restorePriorities();
+    }
+
+    /**
+     * return all config combinations, where first setting specifies join type
+     * (inner or left), and the second setting specifies whether to force using
+     * coprocessors(on, off or unset).
+     */
+    @Parameterized.Parameters
+    public static Collection<Object[]> configs() {
+        return Arrays.asList(new Object[][] { //
+                { "inner", "on", "v2" }, //
+                { "left", "on", "v2" }, //
+        });
+    }
+
+    public ITCombination2Test(String joinType, String coprocessorToggle, String queryEngine) throws Exception {
+        logger.info("Into combination join type: " + joinType + ", coprocessor toggle: " + coprocessorToggle + ", query engine: " + queryEngine);
+        Map<RealizationType, Integer> priorities = Maps.newHashMap();
+        priorities.put(RealizationType.HYBRID, 0);
+        priorities.put(RealizationType.CUBE, 0);
+        priorities.put(RealizationType.INVERTED_INDEX, 0);
+        Candidate.setPriorities(priorities);
+        ITKylinQuery2Test.joinType = joinType;
+        setupAll();
+    }
+}
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQuery2Test.java b/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQuery2Test.java
new file mode 100644
index 0000000..228a09b
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQuery2Test.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.query;
+
+import java.util.Map;
+
+import org.apache.kylin.common.QueryContextFacade;
+import org.apache.kylin.ext.ClassLoaderUtils;
+import org.apache.kylin.junit.SparkTestRunner;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.query.routing.Candidate;
+import org.apache.spark.sql.SparderEnv;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+@RunWith(SparkTestRunner.class)
+public class ITFailfastQuery2Test extends ITFailfastQueryTest {
+
+    private static final Logger logger = LoggerFactory.getLogger(ITFailfastQuery2Test.class);
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        logger.info("setUp in ITFailfastQueryTest");
+        Map<RealizationType, Integer> priorities = Maps.newHashMap();
+        priorities.put(RealizationType.HYBRID, 0);
+        priorities.put(RealizationType.CUBE, 0);
+        priorities.put(RealizationType.INVERTED_INDEX, 0);
+        Candidate.setPriorities(priorities);
+        joinType = "left";
+        setupAll();
+
+        // init spark
+        ClassLoader originClassLoader = Thread.currentThread().getContextClassLoader();
+        Thread.currentThread().setContextClassLoader(ClassLoaderUtils.getSparkClassLoader());
+        SparderEnv.init();
+        Thread.currentThread().setContextClassLoader(originClassLoader);
+    }
+
+    @After
+    public void cleanUp() {
+        QueryContextFacade.resetCurrent();
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        logger.info("tearDown in ITFailfastQuery2Test");
+        Candidate.restorePriorities();
+        clean();
+    }
+
+    @Override
+    @Test
+    public void testQueryExceedMaxScanBytes() throws Exception {
+        logger.info("testQueryExceedMaxScanBytes ignored");
+    }
+
+    @Override
+    @Test
+    public void testQueryNotExceedMaxScanBytes() throws Exception {
+        logger.info("testQueryNotExceedMaxScanBytes ignored");
+    }
+}
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQuery2Test.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQuery2Test.java
new file mode 100644
index 0000000..942e256
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQuery2Test.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query;
+
+import java.sql.DriverManager;
+import java.util.Map;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.ext.ClassLoaderUtils;
+import org.apache.kylin.junit.SparkTestRunner;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.query.routing.Candidate;
+import org.apache.spark.sql.SparderEnv;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+@Ignore("@RunWith(SparkTestRunner.class) is contained by ITCombination2Test")
+@RunWith(SparkTestRunner.class)
+public class ITKylinQuery2Test extends ITKylinQueryTest {
+
+    private static final Logger logger = LoggerFactory.getLogger(ITKylinQuery2Test.class);
+
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        logger.info("setUp in ITKylinQuery2Test");
+        Map<RealizationType, Integer> priorities = Maps.newHashMap();
+        priorities.put(RealizationType.HYBRID, 0);
+        priorities.put(RealizationType.CUBE, 0);
+        priorities.put(RealizationType.INVERTED_INDEX, 0);
+        Candidate.setPriorities(priorities);
+
+        joinType = "left";
+
+        setupAll();
+    }
+
+    protected static void setupAll() throws Exception {
+        //setup env
+        HBaseMetadataTestCase.staticCreateTestMetadata();
+        config = KylinConfig.getInstanceFromEnv();
+
+        //setup cube conn
+        String project = ProjectInstance.DEFAULT_PROJECT_NAME;
+        cubeConnection = QueryConnection.getConnection(project);
+
+        //setup h2
+        h2Connection = DriverManager.getConnection("jdbc:h2:mem:db" + (h2InstanceCount++) + ";CACHE_SIZE=32072", "sa",
+                "");
+        // Load H2 Tables (inner join)
+        H2Database h2DB = new H2Database(h2Connection, config, project);
+        h2DB.loadAllTables();
+
+        // init spark
+        ClassLoader originClassLoader = Thread.currentThread().getContextClassLoader();
+        Thread.currentThread().setContextClassLoader(ClassLoaderUtils.getSparkClassLoader());
+        SparderEnv.init();
+        Thread.currentThread().setContextClassLoader(originClassLoader);
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        logger.info("tearDown in ITKylin2QueryTest");
+        Candidate.restorePriorities();
+        clean();
+    }
+
+    @Override
+    @Test
+    public void testTimeoutQuery() throws Exception {
+        logger.info("TimeoutQuery ignored.");
+    }
+
+    @Override
+    @Test
+    public void testExpressionQuery() throws Exception {
+        logger.info("ExpressionQuery ignored.");
+    }
+
+    @Override
+    @Test
+    public void testStreamingTableQuery() throws Exception {
+        logger.info("StreamingTableQuery ignored.");
+    }
+}
+                                                  
\ No newline at end of file
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
index c6d1f62..bb35322 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -6,15 +6,15 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.query;
 
@@ -461,8 +461,6 @@ public class ITKylinQueryTest extends KylinTestBase {
         execAndCompQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_values", null, true);
     }
 
-
-
     @Test
     public void testPlan() throws Exception {
         String originProp = System.getProperty("calcite.debug");
diff --git a/kylin-test/pom.xml b/kylin-test/pom.xml
new file mode 100644
index 0000000..ad1437f
--- /dev/null
+++ b/kylin-test/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>kylin</artifactId>
+        <groupId>org.apache.kylin</groupId>
+        <version>2.6.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <name>Apache Kylin - Test</name>
+    <artifactId>kylin-test</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-tomcat-ext</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-test</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/kylin-test/src/main/java/org/apache/kylin/junit/EnvUtils.java b/kylin-test/src/main/java/org/apache/kylin/junit/EnvUtils.java
new file mode 100644
index 0000000..19a5b50
--- /dev/null
+++ b/kylin-test/src/main/java/org/apache/kylin/junit/EnvUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.junit;
+
+import com.google.common.collect.Maps;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.Map;
+
+public final class EnvUtils {
+
+    public static boolean checkEnv(String env) {
+        return System.getenv(env) != null;
+    }
+
+    public static void setNormalEnv() throws Exception {
+
+        Map<String, String> newenv = Maps.newHashMap();
+        setDefaultEnv("SPARK_HOME", "../../build/spark", newenv);
+        //setDefaultEnv("hdp.version", "2.4.0.0-169", newenv);
+        setDefaultEnv("ZIPKIN_HOSTNAME", "sandbox", newenv);
+        setDefaultEnv("ZIPKIN_SCRIBE_PORT", "9410", newenv);
+        setDefaultEnv("KAP_HDFS_WORKING_DIR", "/kylin", newenv);
+        changeEnv(newenv);
+
+    }
+
+    protected static void changeEnv(Map<String, String> newenv) throws Exception {
+        Class[] classes = Collections.class.getDeclaredClasses();
+        Map<String, String> env = System.getenv();
+        for (Class cl : classes) {
+            if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
+                Field field = cl.getDeclaredField("m");
+                field.setAccessible(true);
+                Object obj = field.get(env);
+                Map<String, String> map = (Map<String, String>) obj;
+                map.putAll(newenv);
+            }
+        }
+    }
+
+    private static void setDefaultEnv(String env, String defaultValue, Map<String, String> newenv) {
+        if (System.getenv(env) == null) {
+            newenv.put(env, defaultValue);
+        }
+    }
+}
diff --git a/kylin-test/src/main/java/org/apache/kylin/junit/SparkTestRunner.java b/kylin-test/src/main/java/org/apache/kylin/junit/SparkTestRunner.java
new file mode 100644
index 0000000..bcc8c17
--- /dev/null
+++ b/kylin-test/src/main/java/org/apache/kylin/junit/SparkTestRunner.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.junit;
+
+import org.apache.kylin.ext.ItClassLoader;
+import org.junit.runner.notification.RunNotifier;
+import org.junit.runners.BlockJUnit4ClassRunner;
+import org.junit.runners.model.InitializationError;
+
+import java.io.IOException;
+
+public class SparkTestRunner extends BlockJUnit4ClassRunner {
+
+    static public ItClassLoader customClassLoader;
+
+    public SparkTestRunner(Class<?> clazz) throws Exception {
+        super(loadFromCustomClassloader(clazz));
+    }
+
+    // Loads a class in the custom classloader
+    private static Class<?> loadFromCustomClassloader(Class<?> clazz) throws Exception {
+        if(!EnvUtils.checkEnv("SPARK_HOME")){
+            EnvUtils.setNormalEnv();
+        }
+        try {
+            // Only load once to support parallel tests
+            if (customClassLoader == null) {
+                customClassLoader = new ItClassLoader(Thread.currentThread().getContextClassLoader());
+            }
+            return Class.forName(clazz.getName(), false, customClassLoader);
+        } catch (ClassNotFoundException | IOException e) {
+            throw new InitializationError(e);
+        }
+    }
+
+    public static ItClassLoader get() throws IOException {
+        if (customClassLoader == null) {
+            customClassLoader = new ItClassLoader(Thread.currentThread().getContextClassLoader());
+        }
+        return customClassLoader;
+    }
+
+    // Runs junit tests in a separate thread using the custom class loader
+    @Override
+    public void run(final RunNotifier notifier) {
+        Runnable runnable = new Runnable() {
+            @Override
+            public void run() {
+                SparkTestRunner.super.run(notifier);
+            }
+        };
+        Thread thread = new Thread(runnable);
+        thread.setContextClassLoader(customClassLoader);
+        thread.start();
+        try {
+            thread.join();
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/kylin-test/src/main/java/org/apache/kylin/junit/SparkTestRunnerRunnerWithParameters.java b/kylin-test/src/main/java/org/apache/kylin/junit/SparkTestRunnerRunnerWithParameters.java
new file mode 100644
index 0000000..3df9831
--- /dev/null
+++ b/kylin-test/src/main/java/org/apache/kylin/junit/SparkTestRunnerRunnerWithParameters.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.junit;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
+import java.util.List;
+
+import org.junit.runner.notification.RunNotifier;
+import org.junit.runners.BlockJUnit4ClassRunner;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.model.FrameworkField;
+import org.junit.runners.model.FrameworkMethod;
+import org.junit.runners.model.Statement;
+import org.junit.runners.parameterized.TestWithParameters;
+
+/**
+ * A {@link BlockJUnit4ClassRunner} with parameters support. Parameters can be
+ * injected via constructor or into annotated fields.
+ */
+public class SparkTestRunnerRunnerWithParameters extends SparkTestRunner {
+    private final Object[] parameters;
+
+    private final String name;
+
+    public SparkTestRunnerRunnerWithParameters(TestWithParameters test) throws Exception {
+        super(test.getTestClass().getJavaClass());
+        parameters = test.getParameters().toArray(new Object[test.getParameters().size()]);
+        name = test.getName();
+    }
+
+    @Override
+    public Object createTest() throws Exception {
+        if (fieldsAreAnnotated()) {
+            return createTestUsingFieldInjection();
+        } else {
+            return createTestUsingConstructorInjection();
+        }
+    }
+
+    private Object createTestUsingConstructorInjection() throws Exception {
+        return getTestClass().getOnlyConstructor().newInstance(parameters);
+    }
+
+    private Object createTestUsingFieldInjection() throws Exception {
+        List<FrameworkField> annotatedFieldsByParameter = getAnnotatedFieldsByParameter();
+        if (annotatedFieldsByParameter.size() != parameters.length) {
+            throw new Exception("Wrong number of parameters and @Parameter fields." + " @Parameter fields counted: "
+                    + annotatedFieldsByParameter.size() + ", available parameters: " + parameters.length + ".");
+        }
+        Object testClassInstance = getTestClass().getJavaClass().newInstance();
+        for (FrameworkField each : annotatedFieldsByParameter) {
+            Field field = each.getField();
+            Parameter annotation = field.getAnnotation(Parameter.class);
+            int index = annotation.value();
+            try {
+                field.set(testClassInstance, parameters[index]);
+            } catch (IllegalArgumentException iare) {
+                throw new Exception(getTestClass().getName() + ": Trying to set " + field.getName() + " with the value "
+                        + parameters[index] + " that is not the right type ("
+                        + parameters[index].getClass().getSimpleName() + " instead of "
+                        + field.getType().getSimpleName() + ").", iare);
+            }
+        }
+        return testClassInstance;
+    }
+
+    @Override
+    protected String getName() {
+        return name;
+    }
+
+    @Override
+    protected String testName(FrameworkMethod method) {
+        return method.getName() + getName();
+    }
+
+    @Override
+    protected void validateConstructor(List<Throwable> errors) {
+        validateOnlyOneConstructor(errors);
+        if (fieldsAreAnnotated()) {
+            validateZeroArgConstructor(errors);
+        }
+    }
+
+    @Override
+    protected void validateFields(List<Throwable> errors) {
+        super.validateFields(errors);
+        if (fieldsAreAnnotated()) {
+            List<FrameworkField> annotatedFieldsByParameter = getAnnotatedFieldsByParameter();
+            int[] usedIndices = new int[annotatedFieldsByParameter.size()];
+            for (FrameworkField each : annotatedFieldsByParameter) {
+                int index = each.getField().getAnnotation(Parameter.class).value();
+                if (index < 0 || index > annotatedFieldsByParameter.size() - 1) {
+                    errors.add(new Exception("Invalid @Parameter value: " + index + ". @Parameter fields counted: "
+                            + annotatedFieldsByParameter.size() + ". Please use an index between 0 and "
+                            + (annotatedFieldsByParameter.size() - 1) + "."));
+                } else {
+                    usedIndices[index]++;
+                }
+            }
+            for (int index = 0; index < usedIndices.length; index++) {
+                int numberOfUse = usedIndices[index];
+                if (numberOfUse == 0) {
+                    errors.add(new Exception("@Parameter(" + index + ") is never used."));
+                } else if (numberOfUse > 1) {
+                    errors.add(
+                            new Exception("@Parameter(" + index + ") is used more than once (" + numberOfUse + ")."));
+                }
+            }
+        }
+    }
+
+    @Override
+    protected Statement classBlock(RunNotifier notifier) {
+        return childrenInvoker(notifier);
+    }
+
+    @Override
+    protected Annotation[] getRunnerAnnotations() {
+        return new Annotation[0];
+    }
+
+    private List<FrameworkField> getAnnotatedFieldsByParameter() {
+        return getTestClass().getAnnotatedFields(Parameter.class);
+    }
+
+    private boolean fieldsAreAnnotated() {
+        return !getAnnotatedFieldsByParameter().isEmpty();
+    }
+}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java b/kylin-test/src/main/java/org/apache/kylin/junit/SparkTestRunnerWithParametersFactory.java
similarity index 51%
copy from storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
copy to kylin-test/src/main/java/org/apache/kylin/junit/SparkTestRunnerWithParametersFactory.java
index 6a3ad59..68ec888 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
+++ b/kylin-test/src/main/java/org/apache/kylin/junit/SparkTestRunnerWithParametersFactory.java
@@ -16,28 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.kylin.storage.parquet.cube;
+package org.apache.kylin.junit;
 
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase;
+import org.junit.runner.Runner;
+import org.junit.runners.model.InitializationError;
+import org.junit.runners.parameterized.ParametersRunnerFactory;
+import org.junit.runners.parameterized.TestWithParameters;
 
-public class CubeStorageQuery extends GTCubeStorageQueryBase {
-
-    public CubeStorageQuery(CubeInstance cube) {
-        super(cube);
-    }
-
-    @Override
-    public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
-        return super.search(context, sqlDigest, returnTupleInfo);
-    }
-
-    @Override
-    protected String getGTStorage() {
-        return null;
+public class SparkTestRunnerWithParametersFactory implements ParametersRunnerFactory {
+    public Runner createRunnerForTestWithParameters(TestWithParameters test) throws InitializationError {
+        try {
+            return new SparkTestRunnerRunnerWithParameters(test);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
-}
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 3585927..2325fc0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -61,6 +61,7 @@
 
     <!-- Spark versions -->
     <spark.version>2.3.2</spark.version>
+
     <kryo.version>4.0.0</kryo.version>
 
     <!-- mysql versions -->
@@ -297,6 +298,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.kylin</groupId>
+        <artifactId>kylin-storage-parquet</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.kylin</groupId>
         <artifactId>kylin-query</artifactId>
         <version>${project.version}</version>
       </dependency>
@@ -337,6 +343,16 @@
       </dependency>
       <dependency>
         <groupId>org.apache.kylin</groupId>
+        <artifactId>kylin-tomcat-ext</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.kylin</groupId>
+        <artifactId>kylin-test</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.kylin</groupId>
         <artifactId>kylin-core-common</artifactId>
         <version>${project.version}</version>
         <type>test-jar</type>
@@ -1283,6 +1299,7 @@
     <module>source-jdbc</module>
     <module>source-kafka</module>
     <module>storage-hbase</module>
+    <module>storage-parquet</module>
     <module>query</module>
     <module>server-base</module>
     <module>server</module>
@@ -1297,6 +1314,7 @@
     <module>metrics-reporter-kafka</module>
     <module>cache</module>
     <module>datasource-sdk</module>
+    <module>kylin-test</module>
   </modules>
 
   <reporting>
diff --git a/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java b/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java
index 467ef82..4fdcf5d 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java
@@ -20,8 +20,10 @@ package org.apache.kylin.rest.init;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.ext.ClassLoaderUtils;
 import org.apache.kylin.rest.metrics.QueryMetrics2Facade;
 import org.apache.kylin.rest.metrics.QueryMetricsFacade;
+import org.apache.spark.sql.SparderEnv;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.InitializingBean;
@@ -37,6 +39,8 @@ public class InitialTaskManager implements InitializingBean {
     public void afterPropertiesSet() throws Exception {
         logger.info("Kylin service is starting.....");
 
+        checkAndInitSpark();
+
         runInitialTasks();
     }
 
@@ -62,4 +66,26 @@ public class InitialTaskManager implements InitializingBean {
             logger.info("All initial tasks finished.");
         }
     }
+
+    private void checkAndInitSpark() {
+        boolean hasSparkJar = true;
+        // if in ut, has not spark jar.
+        try {
+            Class.forName("org.apache.spark.sql.SparkSession");
+        } catch (ClassNotFoundException e) {
+            logger.info("Can not find org.apache.spark.sql.SparkSession.Spark has not started.");
+            hasSparkJar = false;
+        }
+
+        if (hasSparkJar) {
+            ClassLoader originClassLoader = Thread.currentThread().getContextClassLoader();
+            Thread.currentThread().setContextClassLoader(ClassLoaderUtils.getSparkClassLoader());
+            try {
+                SparderEnv.init();
+            } catch (Throwable ex) {
+                logger.error("Initial Spark Context at starting failed", ex);
+            }
+            Thread.currentThread().setContextClassLoader(originClassLoader);
+        }
+    }
 }
diff --git a/server/pom.xml b/server/pom.xml
index a898eff..d4427c2 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -305,6 +305,13 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>${mysql-connector.version}</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java b/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
index db28595..c0519e6 100644
--- a/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
+++ b/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.rest;
 
 import java.io.File;
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 
@@ -80,6 +81,12 @@ public class DebugTomcat {
     private static void overrideDevJobJarLocations() {
         KylinConfig conf = KylinConfig.getInstanceFromEnv();
         File devJobJar = findFile("../assembly/target", "kylin-assembly-.*-SNAPSHOT-job.jar");
+        File sparkJar = findFile("../storage-parquet/target", "kylin-storage-parquet-.*-SNAPSHOT-spark.jar");
+        try {
+            System.setProperty("kylin.query.parquet-additional-jars", sparkJar.getCanonicalPath());
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
         if (devJobJar != null) {
             conf.overrideMRJobJarPath(devJobJar.getAbsolutePath());
         }
@@ -110,8 +117,11 @@ public class DebugTomcat {
 
         File webBase = new File("../webapp/app");
         File webInfDir = new File(webBase, "WEB-INF");
+        File metaInfDir = new File(webBase, "META-INF");
         FileUtils.deleteDirectory(webInfDir);
+        FileUtils.deleteDirectory(metaInfDir);
         FileUtils.copyDirectoryToDirectory(new File("../server/src/main/webapp/WEB-INF"), webBase);
+        FileUtils.copyDirectoryToDirectory(new File("../examples/test_case_data/webapps/META-INF"), webBase);
 
         Tomcat tomcat = new Tomcat();
         tomcat.setPort(port);
diff --git a/server/src/main/webapp/META-INF/context.xml b/server/src/main/webapp/META-INF/context.xml
new file mode 100644
index 0000000..d988ae1
--- /dev/null
+++ b/server/src/main/webapp/META-INF/context.xml
@@ -0,0 +1,38 @@
+<?xml version='1.0' encoding='utf-8'?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<!-- The contents of this file will be loaded for each web application -->
+<Context allowLinking="true">
+
+    <!-- Default set of monitored resources -->
+    <WatchedResource>WEB-INF/web.xml</WatchedResource>
+
+    <!-- Uncomment this to disable session persistence across Tomcat restarts -->
+    <!--
+    <Manager pathname="" />
+    -->
+
+    <!-- Uncomment this to enable Comet connection tacking (provides events
+         on session expiration as well as webapp lifecycle) -->
+    <!--
+    <Valve className="org.apache.catalina.valves.CometConnectionManagerValve" />
+    -->
+
+    <Loader loaderClass="org.apache.kylin.ext.TomcatClassLoader"/>
+
+</Context>
diff --git a/storage-parquet/pom.xml b/storage-parquet/pom.xml
new file mode 100644
index 0000000..3778031
--- /dev/null
+++ b/storage-parquet/pom.xml
@@ -0,0 +1,159 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+ 
+     http://www.apache.org/licenses/LICENSE-2.0
+ 
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>kylin-storage-parquet</artifactId>
+    <packaging>jar</packaging>
+    <name>Apache Kylin - Parquet Storage</name>
+    <description>Apache Kylin - Parquet Storage</description>
+
+    <parent>
+        <groupId>org.apache.kylin</groupId>
+        <artifactId>kylin</artifactId>
+        <version>2.6.0-SNAPSHOT</version>
+    </parent>
+
+    <properties>
+        <shadeBase>org.apache.kylin.coprocessor.shaded</shadeBase>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-metadata</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-engine-mr</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-engine-spark</artifactId>
+        </dependency>
+
+        <!-- Env & Test -->
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-common</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-storage</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.mrunit</groupId>
+            <artifactId>mrunit</artifactId>
+            <classifier>hadoop2</classifier>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4-rule-agent</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Spark dependency -->
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.11</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.11</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-hive_2.11</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <shadedArtifactAttached>true</shadedArtifactAttached>
+                            <shadedClassifierName>spark</shadedClassifierName>
+                            <transformers>
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                            </transformers>
+                            <artifactSet>
+                                <includes>
+                                    <include>org.apache.kylin:kylin-core-common</include>
+                                    <include>org.apache.kylin:kylin-core-metadata</include>
+                                    <include>org.apache.kylin:kylin-core-dictionary</include>
+                                    <include>org.apache.kylin:kylin-core-cube</include>
+                                    <include>org.apache.kylin:kylin-engine-spark</include>
+                                    <include>com.tdunning:t-digest</include>
+                                </includes>
+                            </artifactSet>
+                            <relocations>
+                                <relocation>
+                                    <pattern>com.tdunning</pattern>
+                                    <shadedPattern>${shadeBase}.com.tdunning</shadedPattern>
+                                </relocation>
+                            </relocations>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                      s
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeSparkRPC.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeSparkRPC.java
index 1322da8..5009a51 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeSparkRPC.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeSparkRPC.java
@@ -73,6 +73,11 @@ public class CubeSparkRPC implements IGTStorage {
 
         JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(cubeSegment, "");
 
+<<<<<<< HEAD
+=======
+        String cubooidRootPath = jobBuilderSupport.getCuboidRootPath();
+
+>>>>>>> 198041d63... KYLIN-3625 Init query
         List<List<Long>> layeredCuboids = cubeSegment.getCuboidScheduler().getCuboidsByLayer();
         int level = 0;
         for (List<Long> levelCuboids : layeredCuboids) {
@@ -82,9 +87,13 @@ public class CubeSparkRPC implements IGTStorage {
             level++;
         }
 
+<<<<<<< HEAD
         String dataFolderName;
         String parquetRootPath = jobBuilderSupport.getParquetOutputPath();
         dataFolderName = JobBuilderSupport.getCuboidOutputPathsByLevel(parquetRootPath, level) + "/" + cuboid.getId();
+=======
+        String dataFolderName = JobBuilderSupport.getCuboidOutputPathsByLevel(cubooidRootPath, level) + "/" + cuboid.getId();
+>>>>>>> 198041d63... KYLIN-3625 Init query
 
         builder.setGtScanRequest(scanRequest.toByteArray()).setGtScanRequestId(scanReqId)
                 .setKylinProperties(KylinConfig.getInstanceFromEnv().exportAllToString())
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
index 6a3ad59..a3ce76f 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.storage.parquet.cube;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.metadata.realization.SQLDigest;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
@@ -38,6 +39,6 @@ public class CubeStorageQuery extends GTCubeStorageQueryBase {
 
     @Override
     protected String getGTStorage() {
-        return null;
+        return KylinConfig.getInstanceFromEnv().getSparkCubeGTStorage();
     }
 }
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetPayload.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetPayload.java
new file mode 100644
index 0000000..9096679
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetPayload.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.parquet.spark;
+
+import java.util.List;
+
+public class ParquetPayload {
+    private byte[] gtScanRequest;
+    private String gtScanRequestId;
+    private String kylinProperties;
+    private String realizationId;
+    private String segmentId;
+    private String dataFolderName;
+    private int maxRecordLength;
+    private List<Integer> parquetColumns;
+    private boolean isUseII;
+    private String realizationType;
+    private String queryId;
+    private boolean spillEnabled;
+    private long maxScanBytes;
+    private long startTime;
+    private int storageType;
+
+    private ParquetPayload(byte[] gtScanRequest, String gtScanRequestId, String kylinProperties, String realizationId,
+                           String segmentId, String dataFolderName, int maxRecordLength, List<Integer> parquetColumns,
+                           boolean isUseII, String realizationType, String queryId, boolean spillEnabled, long maxScanBytes,
+                           long startTime, int storageType) {
+        this.gtScanRequest = gtScanRequest;
+        this.gtScanRequestId = gtScanRequestId;
+        this.kylinProperties = kylinProperties;
+        this.realizationId = realizationId;
+        this.segmentId = segmentId;
+        this.dataFolderName = dataFolderName;
+        this.maxRecordLength = maxRecordLength;
+        this.parquetColumns = parquetColumns;
+        this.isUseII = isUseII;
+        this.realizationType = realizationType;
+        this.queryId = queryId;
+        this.spillEnabled = spillEnabled;
+        this.maxScanBytes = maxScanBytes;
+        this.startTime = startTime;
+        this.storageType = storageType;
+    }
+
+    public byte[] getGtScanRequest() {
+        return gtScanRequest;
+    }
+
+    public String getGtScanRequestId() {
+        return gtScanRequestId;
+    }
+
+    public String getKylinProperties() {
+        return kylinProperties;
+    }
+
+    public String getRealizationId() {
+        return realizationId;
+    }
+
+    public String getSegmentId() {
+        return segmentId;
+    }
+
+    public String getDataFolderName() {
+        return dataFolderName;
+    }
+
+    public int getMaxRecordLength() {
+        return maxRecordLength;
+    }
+
+    public List<Integer> getParquetColumns() {
+        return parquetColumns;
+    }
+
+    public boolean isUseII() {
+        return isUseII;
+    }
+
+    public String getRealizationType() {
+        return realizationType;
+    }
+
+    public String getQueryId() {
+        return queryId;
+    }
+
+    public boolean isSpillEnabled() {
+        return spillEnabled;
+    }
+
+    public long getMaxScanBytes() {
+        return maxScanBytes;
+    }
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public int getStorageType() {
+        return storageType;
+    }
+
+    static public class ParquetPayloadBuilder {
+        private byte[] gtScanRequest;
+        private String gtScanRequestId;
+        private String kylinProperties;
+        private String realizationId;
+        private String segmentId;
+        private String dataFolderName;
+        private int maxRecordLength;
+        private List<Integer> parquetColumns;
+        private boolean isUseII;
+        private String realizationType;
+        private String queryId;
+        private boolean spillEnabled;
+        private long maxScanBytes;
+        private long startTime;
+        private int storageType;
+
+        public ParquetPayloadBuilder() {
+        }
+
+        public ParquetPayloadBuilder setGtScanRequest(byte[] gtScanRequest) {
+            this.gtScanRequest = gtScanRequest;
+            return this;
+        }
+
+        public ParquetPayloadBuilder setGtScanRequestId(String gtScanRequestId) {
+            this.gtScanRequestId = gtScanRequestId;
+            return this;
+        }
+
+        public ParquetPayloadBuilder setKylinProperties(String kylinProperties) {
+            this.kylinProperties = kylinProperties;
+            return this;
+        }
+
+        public ParquetPayloadBuilder setRealizationId(String realizationId) {
+            this.realizationId = realizationId;
+            return this;
+        }
+
+        public ParquetPayloadBuilder setSegmentId(String segmentId) {
+            this.segmentId = segmentId;
+            return this;
+        }
+
+        public ParquetPayloadBuilder setDataFolderName(String dataFolderName) {
+            this.dataFolderName = dataFolderName;
+            return this;
+        }
+
+        public ParquetPayloadBuilder setMaxRecordLength(int maxRecordLength) {
+            this.maxRecordLength = maxRecordLength;
+            return this;
+        }
+
+        public ParquetPayloadBuilder setParquetColumns(List<Integer> parquetColumns) {
+            this.parquetColumns = parquetColumns;
+            return this;
+        }
+
+        public ParquetPayloadBuilder setIsUseII(boolean isUseII) {
+            this.isUseII = isUseII;
+            return this;
+        }
+
+        public ParquetPayloadBuilder setRealizationType(String realizationType) {
+            this.realizationType = realizationType;
+            return this;
+        }
+
+        public ParquetPayloadBuilder setQueryId(String queryId) {
+            this.queryId = queryId;
+            return this;
+        }
+
+        public ParquetPayloadBuilder setSpillEnabled(boolean spillEnabled) {
+            this.spillEnabled = spillEnabled;
+            return this;
+        }
+
+        public ParquetPayloadBuilder setMaxScanBytes(long maxScanBytes) {
+            this.maxScanBytes = maxScanBytes;
+            return this;
+        }
+
+        public ParquetPayloadBuilder setStartTime(long startTime) {
+            this.startTime = startTime;
+            return this;
+        }
+
+        public ParquetPayloadBuilder setStorageType(int storageType) {
+            this.storageType = storageType;
+            return this;
+        }
+
+        public ParquetPayload createParquetPayload() {
+            return new ParquetPayload(gtScanRequest, gtScanRequestId, kylinProperties, realizationId, segmentId,
+                    dataFolderName, maxRecordLength, parquetColumns, isUseII, realizationType, queryId, spillEnabled,
+                    maxScanBytes, startTime, storageType);
+        }
+    }
+}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetTask.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetTask.java
new file mode 100644
index 0000000..611ee44
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetTask.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.parquet.spark;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparderEnv;
+import org.apache.spark.sql.SparderEnv$;
+import org.apache.spark.sql.manager.UdfManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.spark.sql.functions.asc;
+import static org.apache.spark.sql.functions.callUDF;
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.max;
+import static org.apache.spark.sql.functions.min;
+import static org.apache.spark.sql.functions.sum;
+
+@SuppressWarnings("serial")
+public class ParquetTask implements Serializable {
+    public static final Logger logger = LoggerFactory.getLogger(org.apache.kylin.storage.parquet.spark.ParquetTask.class);
+
+    private final transient JavaSparkContext sc;
+    private final KylinConfig kylinConfig;
+    private final transient Configuration conf;
+    private final transient String[] parquetPaths;
+    private final transient GTScanRequest scanRequest;
+    private final transient CuboidToGridTableMapping mapping;
+
+    ParquetTask(ParquetPayload request) {
+        try {
+            this.sc = JavaSparkContext.fromSparkContext(SparderEnv$.MODULE$.getSparkSession().sparkContext());
+            this.kylinConfig = KylinConfig.getInstanceFromEnv();
+            this.conf = HadoopUtil.getCurrentConfiguration();
+
+            scanRequest = GTScanRequest.serializer
+                    .deserialize(ByteBuffer.wrap(request.getGtScanRequest()));
+
+            long startTime = System.currentTimeMillis();
+            sc.setLocalProperty("spark.job.description", Thread.currentThread().getName());
+
+            if (QueryContext.current().isHighPriorityQuery()) {
+                sc.setLocalProperty("spark.scheduler.pool", "vip_tasks");
+            } else {
+                sc.setLocalProperty("spark.scheduler.pool", "lightweight_tasks");
+            }
+
+            String dataFolderName = request.getDataFolderName();
+
+            String baseFolder = dataFolderName.substring(0, dataFolderName.lastIndexOf('/'));
+            String cuboidId = dataFolderName.substring(dataFolderName.lastIndexOf("/") + 1);
+            String prefix = "cuboid_" + cuboidId + "_";
+
+            CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(request.getRealizationId());
+            CubeSegment cubeSegment = cubeInstance.getSegmentById(request.getSegmentId());
+            mapping = new CuboidToGridTableMapping(Cuboid.findById(cubeSegment.getCuboidScheduler(), Long.valueOf(cuboidId)));
+
+            Path[] filePaths = HadoopUtil.getFilteredPath(HadoopUtil.getWorkingFileSystem(conf), new Path(baseFolder), prefix);
+            parquetPaths = new String[filePaths.length];
+
+            for (int i = 0; i < filePaths.length; i++) {
+                parquetPaths[i] = filePaths[i].toString();
+            }
+
+            cleanHadoopConf(conf);
+
+            logger.info("SparkVisit Init takes {} ms", System.currentTimeMillis() - startTime);
+
+            StringBuilder pathBuilder = new StringBuilder();
+            for (Path p : filePaths) {
+                pathBuilder.append(p.toString()).append(";");
+            }
+
+            logger.info("Columnar path is " + pathBuilder.toString());
+            logger.info("Required Measures: " + StringUtils.join(request.getParquetColumns(), ","));
+            logger.info("Max GT length: " + request.getMaxRecordLength());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * the updatingResource part of Configuration will incur gzip compression in Configuration.write
+     *  we cleaned them out to improve qps
+     */
+    private void cleanHadoopConf(Configuration c) {
+        try {
+            //updatingResource will get compressed by gzip, which is costly
+            Field updatingResourceField = Configuration.class.getDeclaredField("updatingResource");
+            updatingResourceField.setAccessible(true);
+            Map<String, String[]> map = (Map<String, String[]>) updatingResourceField.get(c);
+            map.clear();
+
+        } catch (IllegalAccessException | NoSuchFieldException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public Iterator<Object[]> executeTask() {
+        logger.info("Start to visit cube data with Spark SQL <<<<<<");
+
+        SQLContext sqlContext = new SQLContext(SparderEnv.getSparkSession().sparkContext());
+
+        Dataset<Row> dataset = sqlContext.read().parquet(parquetPaths);
+        ImmutableBitSet dimensions = scanRequest.getDimensions();
+        ImmutableBitSet metrics = scanRequest.getAggrMetrics();
+        ImmutableBitSet groupBy = scanRequest.getAggrGroupBy();
+
+        // select
+        Column[] selectColumn = getSelectColumn(dimensions, metrics, mapping);
+        dataset = dataset.select(selectColumn);
+
+        // where
+        String where = scanRequest.getFilterPushDownSQL();
+        if (where != null) {
+            dataset = dataset.filter(where);
+        }
+
+        //groupby agg
+        Column[] aggCols = getAggColumns(metrics, mapping);
+        Column[] tailCols;
+
+        if (aggCols.length >= 1) {
+            tailCols = new Column[aggCols.length - 1];
+            System.arraycopy(aggCols, 1, tailCols, 0, tailCols.length);
+            dataset = dataset.groupBy(getGroupByColumn(dimensions, mapping)).agg(aggCols[0], tailCols);
+        }
+
+        // sort
+        dataset = dataset.sort(getSortColumn(groupBy, mapping));
+
+        JavaRDD<Row> rowRDD = dataset.javaRDD();
+
+        JavaRDD<Object[]> objRDD = rowRDD.map(new Function<Row, Object[]>() {
+            @Override
+            public Object[] call(Row row) throws Exception {
+                Object[] objects = new Object[row.length()];
+                for (int i = 0; i < row.length(); i++) {
+                    objects[i] = row.get(i);
+                }
+                return objects;
+            }
+        });
+
+        logger.info("partitions: {}", objRDD.getNumPartitions());
+
+        List<Object[]> result = objRDD.collect();
+        return result.iterator();
+    }
+
+    private Column[] getAggColumns(ImmutableBitSet metrics, CuboidToGridTableMapping mapping) {
+        Column[] columns = new Column[metrics.trueBitCount()];
+        Map<MeasureDesc, Integer> met2gt = mapping.getMet2gt();
+
+        for (int i = 0; i < metrics.trueBitCount(); i++) {
+            int c = metrics.trueBitAt(i);
+            for (Map.Entry<MeasureDesc, Integer> entry : met2gt.entrySet()) {
+                if (entry.getValue() == c) {
+                    MeasureDesc measureDesc = entry.getKey();
+                    String func = measureDesc.getFunction().getExpression();
+                    columns[i] = getAggColumn(measureDesc.getName(), func, measureDesc.getFunction().getReturnDataType());
+                    break;
+                }
+            }
+        }
+
+        return columns;
+    }
+
+    private Column getAggColumn(String metName, String func, DataType dataType) {
+        Column column;
+        switch (func) {
+            case "SUM":
+                column = sum(metName);
+                break;
+            case "MIN":
+                column = min(metName);
+                break;
+            case "MAX":
+                column = max(metName);
+                break;
+            case "COUNT":
+                column = sum(metName);
+                break;
+            case "TOP_N":
+            case "COUNT_DISTINCT":
+            case "EXTENDED_COLUMN":
+            case "PERCENTILE_APPROX":
+            case "RAW":
+                String udf = UdfManager.register(dataType, func);
+                column = callUDF(udf, col(metName));
+                break;
+            default:
+                throw new IllegalArgumentException("Function " + func + " is not supported");
+
+        }
+        return column.alias(metName);
+    }
+
+    private void getDimColumn(ImmutableBitSet dimensions, Column[] columns, int from, CuboidToGridTableMapping mapping) {
+        Map<TblColRef, Integer> dim2gt = mapping.getDim2gt();
+
+        for (int i = 0; i < dimensions.trueBitCount(); i++) {
+            int c = dimensions.trueBitAt(i);
+            for (Map.Entry<TblColRef, Integer> entry : dim2gt.entrySet()) {
+                if (entry.getValue() == c) {
+                    columns[i + from] = col(entry.getKey().getTableAlias() + "_" + entry.getKey().getName());
+                    break;
+                }
+            }
+        }
+    }
+
+    private void getMetColumn(ImmutableBitSet metrics, Column[] columns, int from, CuboidToGridTableMapping mapping) {
+        Map<MeasureDesc, Integer> met2gt = mapping.getMet2gt();
+
+        for (int i = 0; i < metrics.trueBitCount(); i++) {
+            int m = metrics.trueBitAt(i);
+            for (Map.Entry<MeasureDesc, Integer> entry : met2gt.entrySet()) {
+                if (entry.getValue() == m) {
+                    columns[i + from] = col(entry.getKey().getName());
+                    break;
+                }
+            }
+        }
+    }
+
+    private Column[] getSelectColumn(ImmutableBitSet dimensions, ImmutableBitSet metrics, CuboidToGridTableMapping mapping) {
+        Column[] columns = new Column[dimensions.trueBitCount() + metrics.trueBitCount()];
+
+        getDimColumn(dimensions, columns, 0, mapping);
+
+        getMetColumn(metrics, columns, dimensions.trueBitCount(), mapping);
+
+        return columns;
+    }
+
+    private Column[] getGroupByColumn(ImmutableBitSet dimensions, CuboidToGridTableMapping mapping) {
+        Column[] columns = new Column[dimensions.trueBitCount()];
+
+        getDimColumn(dimensions, columns, 0, mapping);
+
+        return columns;
+    }
+
+    private Column[] getSortColumn(ImmutableBitSet dimensions, CuboidToGridTableMapping mapping) {
+        Column[] columns = new Column[dimensions.trueBitCount()];
+        Map<TblColRef, Integer> dim2gt = mapping.getDim2gt();
+
+        for (int i = 0; i < dimensions.trueBitCount(); i++) {
+            int c = dimensions.trueBitAt(i);
+            for (Map.Entry<TblColRef, Integer> entry : dim2gt.entrySet()) {
+                if (entry.getValue() == c) {
+                    columns[i] = asc(entry.getKey().getTableAlias() + "_" + entry.getKey().getName());
+                    break;
+                }
+            }
+        }
+
+        return columns;
+    }
+}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/SparkSubmitter.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/SparkSubmitter.java
new file mode 100644
index 0000000..1c8c246
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/SparkSubmitter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.parquet.spark;
+
+import org.apache.kylin.ext.ClassLoaderUtils;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.storage.parquet.spark.gtscanner.ParquetRecordGTScanner;
+import org.apache.kylin.storage.parquet.spark.gtscanner.ParquetRecordGTScanner4Cube;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SparkSubmitter {
+    public static final Logger logger = LoggerFactory.getLogger(SparkSubmitter.class);
+
+    public static IGTScanner submitParquetTask(GTScanRequest scanRequest, ParquetPayload payload) {
+
+        Thread.currentThread().setContextClassLoader(ClassLoaderUtils.getSparkClassLoader());
+        ParquetTask parquetTask = new ParquetTask(payload);
+
+        ParquetRecordGTScanner scanner = new ParquetRecordGTScanner4Cube(scanRequest.getInfo(), parquetTask.executeTask(), scanRequest,
+                payload.getMaxScanBytes());
+
+        return scanner;
+    }
+}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/gtscanner/ParquetRecordGTScanner.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/gtscanner/ParquetRecordGTScanner.java
new file mode 100644
index 0000000..322aec1
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/gtscanner/ParquetRecordGTScanner.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.parquet.spark.gtscanner;
+
+import com.google.common.collect.Iterators;
+import org.apache.kylin.common.exceptions.KylinTimeoutException;
+import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.IGTScanner;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * this class tracks resource 
+ */
+public abstract class ParquetRecordGTScanner implements IGTScanner {
+
+    private Iterator<Object[]> iterator;
+    private GTInfo info;
+    private GTRecord gtrecord;
+    private ImmutableBitSet columns;
+
+    private long maxScannedBytes;
+
+    private long scannedRows;
+    private long scannedBytes;
+
+    private ImmutableBitSet[] columnBlocks;
+
+    public ParquetRecordGTScanner(GTInfo info, Iterator<Object[]> iterator, GTScanRequest scanRequest,
+                                  long maxScannedBytes) {
+        this.iterator = iterator;
+        this.info = info;
+        this.gtrecord = new GTRecord(info);
+        this.columns = scanRequest.getColumns();
+        this.maxScannedBytes = maxScannedBytes;
+        this.columnBlocks = getParquetCoveredColumnBlocks(scanRequest);
+    }
+
+    @Override
+    public GTInfo getInfo() {
+        return info;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    public long getTotalScannedRowCount() {
+        return scannedRows;
+    }
+
+    public long getTotalScannedRowBytes() {
+        return scannedBytes;
+    }
+
+    @Override
+    public Iterator<GTRecord> iterator() {
+        return Iterators.transform(iterator, new com.google.common.base.Function<Object[], GTRecord>() {
+            @Nullable
+            @Override
+            public GTRecord apply(@Nullable Object[] input) {
+                gtrecord.setValuesParquet(ParquetRecordGTScanner.this.columns, new ByteArray(info.getMaxColumnLength(ParquetRecordGTScanner.this.columns)), input);
+
+                scannedBytes += info.getMaxColumnLength(ParquetRecordGTScanner.this.columns);
+                if ((++scannedRows % GTScanRequest.terminateCheckInterval == 1) && Thread.interrupted()) {
+                    throw new KylinTimeoutException("Query timeout");
+                }
+                if (scannedBytes > maxScannedBytes) {
+                    throw new ResourceLimitExceededException(
+                            "Partition scanned bytes " + scannedBytes + " exceeds threshold " + maxScannedBytes
+                                    + ", consider increase kylin.storage.partition.max-scan-bytes");
+                }
+                return gtrecord;
+            }
+        });
+    }
+
+    abstract protected ImmutableBitSet getParquetCoveredColumns(GTScanRequest scanRequest);
+
+    abstract protected ImmutableBitSet[] getParquetCoveredColumnBlocks(GTScanRequest scanRequest);
+
+}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/gtscanner/ParquetRecordGTScanner4Cube.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/gtscanner/ParquetRecordGTScanner4Cube.java
new file mode 100644
index 0000000..3bf670e
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/gtscanner/ParquetRecordGTScanner4Cube.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.parquet.spark.gtscanner;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTScanRequest;
+
+import java.util.BitSet;
+import java.util.Iterator;
+
+public class ParquetRecordGTScanner4Cube extends ParquetRecordGTScanner {
+    public ParquetRecordGTScanner4Cube(GTInfo info, Iterator<Object[]> iterator, GTScanRequest scanRequest,
+                                       long maxScannedBytes) {
+        super(info, iterator, scanRequest, maxScannedBytes);
+    }
+
+    protected ImmutableBitSet getParquetCoveredColumns(GTScanRequest scanRequest) {
+        BitSet bs = new BitSet();
+
+        ImmutableBitSet dimensions = scanRequest.getInfo().getPrimaryKey();
+        for (int i = 0; i < dimensions.trueBitCount(); ++i) {
+            bs.set(dimensions.trueBitAt(i));
+        }
+
+        ImmutableBitSet queriedColumns = scanRequest.getColumns();
+        for (int i = 0; i < queriedColumns.trueBitCount(); ++i) {
+            bs.set(queriedColumns.trueBitAt(i));
+        }
+        return new ImmutableBitSet(bs);
+    }
+
+    protected ImmutableBitSet[] getParquetCoveredColumnBlocks(GTScanRequest scanRequest) {
+        
+        ImmutableBitSet selectedColBlocksBitSet = scanRequest.getSelectedColBlocks();
+        
+        ImmutableBitSet[] selectedColBlocks = new ImmutableBitSet[selectedColBlocksBitSet.trueBitCount()];
+        
+        for(int i = 0; i < selectedColBlocksBitSet.trueBitCount(); i++) {
+            
+            selectedColBlocks[i] = scanRequest.getInfo().getColumnBlock(selectedColBlocksBitSet.trueBitAt(i));
+            
+        }
+
+        return selectedColBlocks;
+    }
+
+}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/SparkCubeParquet.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/SparkCubeParquet.java
index 2a7c1ee..def4d8d 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/SparkCubeParquet.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/SparkCubeParquet.java
@@ -46,6 +46,7 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.kv.RowKeyDecoder;
+import org.apache.kylin.cube.kv.RowKeyDecoderParquet;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.dimension.AbstractDateDimEnc;
 import org.apache.kylin.dimension.DimensionEncoding;
@@ -67,6 +68,8 @@ import org.apache.kylin.measure.basic.BasicMeasureType;
 import org.apache.kylin.measure.basic.BigDecimalIngester;
 import org.apache.kylin.measure.basic.DoubleIngester;
 import org.apache.kylin.measure.basic.LongIngester;
+import org.apache.kylin.metadata.datatype.BigDecimalSerializer;
+import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.parquet.example.data.Group;
@@ -90,10 +93,10 @@ import scala.Tuple2;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 
 
 public class SparkCubeParquet extends AbstractApplication implements Serializable{
@@ -151,6 +154,8 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
         KylinSparkJobListener jobListener = new KylinSparkJobListener();
         try (JavaSparkContext sc = new JavaSparkContext(conf)){
             sc.sc().addSparkListener(jobListener);
+
+            HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
             final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
 
             final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
@@ -158,7 +163,6 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
             final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
             final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
 
-
             final FileSystem fs = new Path(inputPath).getFileSystem(sc.hadoopConfiguration());
 
             final int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel();
@@ -175,6 +179,8 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
                 saveToParquet(allRDDs[level], metaUrl, cubeName, cubeSegment, outputPath, level, job, envConfig);
             }
 
+            logger.info("HDFS: Number of bytes written={}", jobListener.metrics.getBytesWritten());
+
             Map<String, String> counterMap = Maps.newHashMap();
             counterMap.put(ExecutableConstants.HDFS_BYTES_WRITTEN, String.valueOf(jobListener.metrics.getBytesWritten()));
 
@@ -217,11 +223,12 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
     }
 
     static class CuboidPartitioner extends Partitioner {
-
         private CuboidToPartitionMapping mapping;
+        private boolean enableSharding;
 
-        public CuboidPartitioner(CuboidToPartitionMapping cuboidToPartitionMapping) {
+        public CuboidPartitioner(CuboidToPartitionMapping cuboidToPartitionMapping, boolean enableSharding) {
             this.mapping = cuboidToPartitionMapping;
+            this.enableSharding =enableSharding;
         }
 
         @Override
@@ -330,6 +337,7 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
             int partition = (int) (cuboidSize / (rddCut * 10));
             partition = Math.max(kylinConfig.getSparkMinPartition(), partition);
             partition = Math.min(kylinConfig.getSparkMaxPartition(), partition);
+            logger.info("cuboid:{}, est_size:{}, partitions:{}", cuboidId, cuboidSize, partition);
             return partition;
         }
 
@@ -377,6 +385,8 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
         private Map<MeasureDesc, String> meaTypeMap;
         private GroupFactory factory;
         private BufferedMeasureCodec measureCodec;
+        private BigDecimalSerializer serializer;
+        private int count = 0;
 
         public GenerateGroupRDDFunction(String cubeName, String segmentId, String metaurl, SerializableConfiguration conf, Map<TblColRef, String> colTypeMap, Map<MeasureDesc, String> meaTypeMap) {
             this.cubeName = cubeName;
@@ -394,15 +404,15 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
             CubeDesc cubeDesc = cubeInstance.getDescriptor();
             CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
             measureDescs = cubeDesc.getMeasures();
-            decoder = new RowKeyDecoder(cubeSegment);
+            decoder = new RowKeyDecoderParquet(cubeSegment);
             factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(conf.get()));
             measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures());
+            serializer = new BigDecimalSerializer(DataType.getType("decimal"));
+            initialized = true;
         }
 
         @Override
         public Tuple2<Void, Group> call(Tuple2<Text, Text> tuple) throws Exception {
-
-            logger.debug("call: transfer Text to byte[]");
             if (initialized == false) {
                 synchronized (SparkCubeParquet.class) {
                     if (initialized == false) {
@@ -412,7 +422,7 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
                 }
             }
 
-            long cuboid = decoder.decode(tuple._1.getBytes());
+            long cuboid = decoder.decode4Parquet(tuple._1.getBytes());
             List<String> values = decoder.getValues();
             List<TblColRef> columns = decoder.getColumns();
 
@@ -426,8 +436,9 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
                 parseColValue(group, column, values.get(i));
             }
 
+            count ++;
 
-            byte[] encodedBytes = tuple._2().copyBytes();
+            byte[] encodedBytes = tuple._2().getBytes();
             int[] valueLengths = measureCodec.getCodec().getPeekLength(ByteBuffer.wrap(encodedBytes));
 
             int valueOffset = 0;
@@ -465,11 +476,16 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
             }
             switch (meaTypeMap.get(measureDesc)) {
                 case "long":
-                    group.append(measureDesc.getName(), BytesUtil.readLong(value, offset, length));
+                    group.append(measureDesc.getName(), BytesUtil.readVLong(ByteBuffer.wrap(value, offset, length)));
                     break;
                 case "double":
                     group.append(measureDesc.getName(), ByteBuffer.wrap(value, offset, length).getDouble());
                     break;
+                case "decimal":
+                    BigDecimal decimal = serializer.deserialize(ByteBuffer.wrap(value, offset, length));
+                    decimal = decimal.setScale(4);
+                    group.append(measureDesc.getName(), Binary.fromConstantByteArray(decimal.unscaledValue().toByteArray()));
+                    break;
                 default:
                     group.append(measureDesc.getName(), Binary.fromConstantByteArray(value, offset, length));
                     break;
@@ -492,14 +508,8 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
                 colTypeMap.put(colRef, "long");
             } else if (dimEnc instanceof FixedLenDimEnc || dimEnc instanceof FixedLenHexDimEnc) {
                 org.apache.kylin.metadata.datatype.DataType colDataType = colRef.getType();
-                if (colDataType.isNumberFamily() || colDataType.isDateTimeFamily()){
-                    builder.optional(PrimitiveType.PrimitiveTypeName.INT64).named(getColName(colRef));
-                    colTypeMap.put(colRef, "long");
-                } else {
-                    // stringFamily && default
-                    builder.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(getColName(colRef));
-                    colTypeMap.put(colRef, "string");
-                }
+                builder.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(getColName(colRef));
+                colTypeMap.put(colRef, "string");
             } else {
                 builder.optional(PrimitiveType.PrimitiveTypeName.INT32).named(getColName(colRef));
                 colTypeMap.put(colRef, "int");
diff --git a/tomcat-ext/src/main/java/org/apache/kylin/ext/ClassLoaderUtils.java b/tomcat-ext/src/main/java/org/apache/kylin/ext/ClassLoaderUtils.java
new file mode 100644
index 0000000..3888883
--- /dev/null
+++ b/tomcat-ext/src/main/java/org/apache/kylin/ext/ClassLoaderUtils.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.ext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URLClassLoader;
+
+public final class ClassLoaderUtils {
+    static URLClassLoader sparkClassLoader = null;
+    static URLClassLoader originClassLoader = null;
+    private static Logger logger = LoggerFactory.getLogger(ClassLoaderUtils.class);
+    private static Boolean isPlus;
+    static {
+        isPlus = true;
+    }
+
+    public static File findFile(String dir, String ptn) {
+        File[] files = new File(dir).listFiles();
+        if (files != null) {
+            for (File f : files) {
+                if (f.getName().matches(ptn))
+                    return f;
+            }
+        }
+        return null;
+    }
+
+    public static ClassLoader getSparkClassLoader() {
+        if (!isPlus) {
+            return Thread.currentThread().getContextClassLoader();
+        }
+        if (sparkClassLoader == null) {
+            logger.error("sparkClassLoader not init");
+            return Thread.currentThread().getContextClassLoader();
+        } else {
+            return sparkClassLoader;
+        }
+    }
+
+    public static void setSparkClassLoader(URLClassLoader classLoader) {
+        if (sparkClassLoader != null) {
+            logger.error("sparkClassLoader already initialized");
+        }
+        logger.info("set sparkClassLoader :" + classLoader);
+        if (System.getenv("DEBUG_SPARK_CLASSLOADER") != null) {
+            return;
+        }
+        sparkClassLoader = classLoader;
+    }
+
+    public static ClassLoader getOriginClassLoader() {
+        if (!isPlus) {
+            return Thread.currentThread().getContextClassLoader();
+        }
+        if (originClassLoader == null) {
+            logger.error("originClassLoader not init");
+            return Thread.currentThread().getContextClassLoader();
+        } else {
+            return originClassLoader;
+        }
+    }
+
+    public static void setOriginClassLoader(URLClassLoader classLoader) {
+        if (originClassLoader != null) {
+            logger.error("originClassLoader already initialized");
+        }
+        logger.info("set originClassLoader :" + classLoader);
+        originClassLoader = classLoader;
+    }
+}
diff --git a/tomcat-ext/src/main/java/org/apache/kylin/ext/DebugTomcatClassLoader.java b/tomcat-ext/src/main/java/org/apache/kylin/ext/DebugTomcatClassLoader.java
new file mode 100644
index 0000000..a0c212c
--- /dev/null
+++ b/tomcat-ext/src/main/java/org/apache/kylin/ext/DebugTomcatClassLoader.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.ext;
+
+import org.apache.catalina.loader.ParallelWebappClassLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.HashSet;
+import java.util.Set;
+
+public class DebugTomcatClassLoader extends ParallelWebappClassLoader {
+    private static final String[] PARENT_CL_PRECEDENT_CLASSES = new String[] {
+            // Java standard library:
+            "com.sun.", "launcher.", "javax.", "org.ietf", "java", "org.omg", "org.w3c", "org.xml", "sunw.",
+            // logging
+            "org.slf4j", "org.apache.commons.logging", "org.apache.log4j", "org.apache.catalina", "org.apache.tomcat" };
+    private static final String[] THIS_CL_PRECEDENT_CLASSES = new String[] { "org.apache.kylin",
+            "org.apache.calcite" };
+    private static final String[] CODE_GEN_CLASS = new String[] { "org.apache.spark.sql.catalyst.expressions.Object",
+            "Baz" };
+
+    private static final Set<String> wontFindClasses = new HashSet<>();
+
+    static {
+        wontFindClasses.add("Class");
+        wontFindClasses.add("Object");
+        wontFindClasses.add("org");
+        wontFindClasses.add("java.lang.org");
+        wontFindClasses.add("java.lang$org");
+        wontFindClasses.add("java$lang$org");
+        wontFindClasses.add("org.apache");
+        wontFindClasses.add("org.apache.calcite");
+        wontFindClasses.add("org.apache.calcite.runtime");
+        wontFindClasses.add("org.apache.calcite.linq4j");
+        wontFindClasses.add("Long");
+        wontFindClasses.add("String");
+    }
+
+    private static Logger logger = LoggerFactory.getLogger(DebugTomcatClassLoader.class);
+    private SparkClassLoader sparkClassLoader;
+
+    /**
+     * Creates a DynamicClassLoader that can load classes dynamically
+     * from jar files under a specific folder.
+     *
+     * @param parent the parent ClassLoader to set.
+     */
+    public DebugTomcatClassLoader(ClassLoader parent) throws IOException {
+        super(parent);
+        sparkClassLoader = new SparkClassLoader(this);
+        ClassLoaderUtils.setSparkClassLoader(sparkClassLoader);
+        ClassLoaderUtils.setOriginClassLoader(this);
+        init();
+    }
+
+    public void init() {
+
+        String classPath = System.getProperty("java.class.path");
+        if (classPath == null) {
+            throw new RuntimeException("");
+        }
+        String[] jars = classPath.split(File.pathSeparator);
+        for (String jar : jars) {
+            try {
+                URL url = new File(jar).toURI().toURL();
+                addURL(url);
+            } catch (MalformedURLException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Override
+    public Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+        if (isWontFind(name)) {
+            throw new ClassNotFoundException();
+        }
+        if (isCodeGen(name)) {
+            throw new ClassNotFoundException();
+        }
+
+        if (name.startsWith("org.apache.kylin.ext")) {
+            return parent.loadClass(name);
+        }
+
+        if (name.startsWith("org.apache.spark.sql.execution.datasources.sparder.batch.SparderBatchFileFormat")) {
+            return super.loadClass(name, resolve);
+        }
+
+        if (sparkClassLoader.classNeedPreempt(name)) {
+            return sparkClassLoader.loadClass(name);
+        }
+        if (isParentCLPrecedent(name)) {
+            logger.debug("Skipping exempt class " + name + " - delegating directly to parent");
+            return parent.loadClass(name);
+        }
+        return super.loadClass(name, resolve);
+    }
+
+    @Override
+    public InputStream getResourceAsStream(String name) {
+        if (sparkClassLoader.fileNeedPreempt(name)) {
+            return sparkClassLoader.getResourceAsStream(name);
+        }
+        return super.getResourceAsStream(name);
+
+    }
+
+    private boolean isParentCLPrecedent(String name) {
+        for (String exemptPrefix : PARENT_CL_PRECEDENT_CLASSES) {
+            if (name.startsWith(exemptPrefix)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean isWontFind(String name) {
+        return wontFindClasses.contains(name);
+    }
+
+    private boolean isCodeGen(String name) {
+        for (String exemptPrefix : CODE_GEN_CLASS) {
+            if (name.startsWith(exemptPrefix)) {
+                return true;
+            }
+        }
+        return false;
+    }
+}
diff --git a/tomcat-ext/src/main/java/org/apache/kylin/ext/ItClassLoader.java b/tomcat-ext/src/main/java/org/apache/kylin/ext/ItClassLoader.java
new file mode 100644
index 0000000..0590999
--- /dev/null
+++ b/tomcat-ext/src/main/java/org/apache/kylin/ext/ItClassLoader.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.ext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import static org.apache.kylin.ext.ClassLoaderUtils.findFile;
+
+public class ItClassLoader extends URLClassLoader {
+    private static final String[] PARENT_CL_PRECEDENT_CLASS = new String[] {
+            // Java standard library:
+            "com.sun.", "launcher.", "javax.", "org.ietf", "java", "org.omg", "org.w3c", "org.xml", "sunw.",
+            // logging
+            "org.slf4j", "org.apache.commons.logging", "org.apache.log4j", "sun", "org.apache.catalina",
+            "org.apache.tomcat", };
+    private static final String[] THIS_CL_PRECEDENT_CLASS = new String[] { "org.apache.kylin",
+            "org.apache.calcite" };
+    private static final String[] CODE_GEN_CLASS = new String[] { "org.apache.spark.sql.catalyst.expressions.Object" };
+    public static ItClassLoader defaultClassLoad = null;
+    private static Logger logger = LoggerFactory.getLogger(ItClassLoader.class);
+    public ItSparkClassLoader sparkClassLoader;
+    ClassLoader parent;
+
+    /**
+     * Creates a DynamicClassLoader that can load classes dynamically
+     * from jar files under a specific folder.
+     *
+     * @param parent the parent ClassLoader to set.
+     */
+    public ItClassLoader(ClassLoader parent) throws IOException {
+        super(((URLClassLoader) getSystemClassLoader()).getURLs());
+        this.parent = parent;
+        sparkClassLoader = new ItSparkClassLoader(this);
+        ClassLoaderUtils.setSparkClassLoader(sparkClassLoader);
+        ClassLoaderUtils.setOriginClassLoader(this);
+        defaultClassLoad = this;
+        init();
+    }
+
+    public void init() {
+
+        String classPath = System.getProperty("java.class.path");
+        if (classPath == null) {
+            throw new RuntimeException("");
+        }
+        String[] jars = classPath.split(File.pathSeparator);
+        for (String jar : jars) {
+            if (jar.contains("spark-")) {
+                continue;
+            }
+            try {
+                URL url = new File(jar).toURI().toURL();
+                addURL(url);
+            } catch (MalformedURLException e) {
+                e.printStackTrace();
+            }
+        }
+        String spark_home = System.getenv("SPARK_HOME");
+        try {
+            File sparkJar = findFile(spark_home + "/jars", "spark-yarn_.*.jar");
+            addURL(sparkJar.toURI().toURL());
+            addURL(new File("../examples/test_case_data/sandbox").toURI().toURL());
+        } catch (MalformedURLException e) {
+            e.printStackTrace();
+        }
+
+    }
+
+    @Override
+    public Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+        if (isCodeGen(name)) {
+            throw new ClassNotFoundException();
+        }
+        if (name.startsWith("org.apache.kylin.ext")) {
+            return parent.loadClass(name);
+        }
+        if (isThisCLPrecedent(name)) {
+            synchronized (getClassLoadingLock(name)) {
+                // Check whether the class has already been loaded:
+                Class<?> clasz = findLoadedClass(name);
+                if (clasz != null) {
+                    logger.debug("Class " + name + " already loaded");
+                } else {
+                    try {
+                        // Try to find this class using the URLs passed to this ClassLoader
+                        logger.debug("Finding class: " + name);
+                        clasz = super.findClass(name);
+                    } catch (ClassNotFoundException e) {
+                        // Class not found using this ClassLoader, so delegate to parent
+                        logger.debug("Class " + name + " not found - delegating to parent");
+                        try {
+                            clasz = parent.loadClass(name);
+                        } catch (ClassNotFoundException e2) {
+                            // Class not found in this ClassLoader or in the parent ClassLoader
+                            // Log some debug output before re-throwing ClassNotFoundException
+                            logger.debug("Class " + name + " not found in parent loader");
+                            throw e2;
+                        }
+                    }
+                }
+                return clasz;
+            }
+        }
+        //交换位置 为了让codehua 被父类加载
+        if (isParentCLPrecedent(name)) {
+            logger.debug("Skipping exempt class " + name + " - delegating directly to parent");
+            return parent.loadClass(name);
+        }
+        if (sparkClassLoader.classNeedPreempt(name)) {
+            return sparkClassLoader.loadClass(name);
+        }
+        return super.loadClass(name, resolve);
+    }
+
+    @Override
+    public InputStream getResourceAsStream(String name) {
+        if (sparkClassLoader.fileNeedPreempt(name)) {
+            return sparkClassLoader.getResourceAsStream(name);
+        }
+        return super.getResourceAsStream(name);
+
+    }
+
+    private boolean isParentCLPrecedent(String name) {
+        for (String exemptPrefix : PARENT_CL_PRECEDENT_CLASS) {
+            if (name.startsWith(exemptPrefix)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean isThisCLPrecedent(String name) {
+        for (String exemptPrefix : THIS_CL_PRECEDENT_CLASS) {
+            if (name.startsWith(exemptPrefix)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean isCodeGen(String name) {
+        for (String exemptPrefix : CODE_GEN_CLASS) {
+            if (name.startsWith(exemptPrefix)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+}
diff --git a/tomcat-ext/src/main/java/org/apache/kylin/ext/ItSparkClassLoader.java b/tomcat-ext/src/main/java/org/apache/kylin/ext/ItSparkClassLoader.java
new file mode 100644
index 0000000..c69ef9c
--- /dev/null
+++ b/tomcat-ext/src/main/java/org/apache/kylin/ext/ItSparkClassLoader.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.ext;
+
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Set;
+
+import static org.apache.kylin.ext.ClassLoaderUtils.findFile;
+
+public class ItSparkClassLoader extends URLClassLoader {
+    private static final String[] SPARK_CL_PREEMPT_CLASSES = new String[] { "org.apache.spark", "scala.",
+            "org.spark_project"
+            //            "javax.ws.rs.core.Application",
+            //            "javax.ws.rs.core.UriBuilder", "org.glassfish.jersey", "javax.ws.rs.ext"
+            //user javax.ws.rs.api 2.01  not jersey-core-1.9.jar
+    };
+    private static final String[] SPARK_CL_PREEMPT_FILES = new String[] { "spark-version-info.properties",
+            "HiveClientImpl", "org/apache/spark" };
+
+    private static final String[] THIS_CL_PRECEDENT_CLASSES = new String[] { "javax.ws.rs", "org.apache.hadoop.hive" };
+
+    private static final String[] PARENT_CL_PRECEDENT_CLASSES = new String[] {
+            //            // Java standard library:
+            "com.sun.", "launcher.", "java.", "javax.", "org.ietf", "org.omg", "org.w3c", "org.xml", "sunw.", "sun.",
+            // logging
+            "org.apache.commons.logging", "org.apache.log4j", "com.hadoop", "org.slf4j",
+            // Hadoop/HBase/ZK:
+            "org.apache.hadoop", "org.apache.zookeeper", "org.apache.kylin", "com.intellij",
+            "org.apache.calcite", "org.roaringbitmap", "org.apache.parquet" };
+    private static final Set<String> classNotFoundCache = Sets.newHashSet();
+    private static Logger logger = LoggerFactory.getLogger(ItSparkClassLoader.class);
+
+    /**
+     * Creates a DynamicClassLoader that can load classes dynamically
+     * from jar files under a specific folder.
+     *       CubeControllerTest
+     * @param parent the parent ClassLoader to set.
+     */
+    protected ItSparkClassLoader(ClassLoader parent) throws IOException {
+        super(new URL[] {}, parent);
+        init();
+    }
+
+    public void init() throws MalformedURLException {
+        String spark_home = System.getenv("SPARK_HOME");
+        if (spark_home == null) {
+            spark_home = System.getProperty("SPARK_HOME");
+            if (spark_home == null) {
+                throw new RuntimeException(
+                        "Spark home not found; set it explicitly or use the SPARK_HOME environment variable.");
+            }
+        }
+        File file = new File(spark_home + "/jars");
+        File[] jars = file.listFiles();
+        for (File jar : jars) {
+            addURL(jar.toURI().toURL());
+        }
+        File sparkJar = findFile("../storage-parquet/target", "kylin-storage-parquet-.*-SNAPSHOT-spark.jar");
+
+        try {
+            // sparder and query module has org.apache.spark class ,if not add,
+            // that will be load by system classloader
+            // (find class api will be find the parent classloader first,
+            // so ,parent classloader can not load it ,spark class will not found)
+            // why SparkClassLoader is unnecessary?
+            // DebugTomcatClassLoader and TomcatClassLoader  find class api will be find itself first
+            // so, parent classloader can load it , spark class will be found
+            addURL(new File("../engine-spark/target/classes").toURI().toURL());
+            addURL(new File("../engine-spark/target/test-classes").toURI().toURL());
+            addURL(new File("../storage-parquet/target/classes").toURI().toURL());
+            addURL(new File("../query/target/classes").toURI().toURL());
+            addURL(new File("../query/target/test-classes").toURI().toURL());
+            addURL(new File("../udf/target/classes").toURI().toURL());
+            System.setProperty("kylin.query.parquet-additional-jars", sparkJar.getCanonicalPath());
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+        if (needToUseGlobal(name)) {
+            logger.debug("Skipping exempt class " + name + " - delegating directly to parent");
+            try {
+                return getParent().loadClass(name);
+            } catch (ClassNotFoundException e) {
+                return super.findClass(name);
+            }
+        }
+
+        synchronized (getClassLoadingLock(name)) {
+            // Check whether the class has already been loaded:
+            Class<?> clasz = findLoadedClass(name);
+            if (clasz != null) {
+                logger.debug("Class " + name + " already loaded");
+            } else {
+                try {
+                    // Try to find this class using the URLs passed to this ClassLoader
+                    logger.debug("Finding class: " + name);
+                    clasz = super.findClass(name);
+                    if (clasz == null) {
+                        logger.debug("cannot find class" + name);
+                    }
+                } catch (ClassNotFoundException e) {
+                    classNotFoundCache.add(name);
+                    // Class not found using this ClassLoader, so delegate to parent
+                    logger.debug("Class " + name + " not found - delegating to parent");
+                    try {
+                        clasz = getParent().loadClass(name);
+                    } catch (ClassNotFoundException e2) {
+                        // Class not found in this ClassLoader or in the parent ClassLoader
+                        // Log some debug output before re-throwing ClassNotFoundException
+                        logger.debug("Class " + name + " not found in parent loader");
+                        throw e2;
+                    }
+                }
+            }
+            return clasz;
+        }
+    }
+
+    private boolean isThisCLPrecedent(String name) {
+        for (String exemptPrefix : THIS_CL_PRECEDENT_CLASSES) {
+            if (name.startsWith(exemptPrefix)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean isParentCLPrecedent(String name) {
+        for (String exemptPrefix : PARENT_CL_PRECEDENT_CLASSES) {
+            if (name.startsWith(exemptPrefix)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean needToUseGlobal(String name) {
+        return !isThisCLPrecedent(name) && !classNeedPreempt(name) && isParentCLPrecedent(name);
+    }
+
+    boolean classNeedPreempt(String name) {
+        if (classNotFoundCache.contains(name)) {
+            return false;
+        }
+        for (String exemptPrefix : SPARK_CL_PREEMPT_CLASSES) {
+            if (name.startsWith(exemptPrefix)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    boolean fileNeedPreempt(String name) {
+
+        for (String exemptPrefix : SPARK_CL_PREEMPT_FILES) {
+            if (name.contains(exemptPrefix)) {
+                return true;
+            }
+        }
+        return false;
+    }
+}
diff --git a/tomcat-ext/src/main/java/org/apache/kylin/ext/SparkClassLoader.java b/tomcat-ext/src/main/java/org/apache/kylin/ext/SparkClassLoader.java
new file mode 100644
index 0000000..dba782b
--- /dev/null
+++ b/tomcat-ext/src/main/java/org/apache/kylin/ext/SparkClassLoader.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.ext;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.kylin.ext.ClassLoaderUtils.findFile;
+
+public class SparkClassLoader extends URLClassLoader {
+    //preempt these classes from parent
+    private static String[] SPARK_CL_PREEMPT_CLASSES = new String[] { "org.apache.spark", "scala.",
+            "org.spark_project" };
+
+    //preempt these files from parent
+    private static String[] SPARK_CL_PREEMPT_FILES = new String[] { "spark-version-info.properties", "HiveClientImpl",
+            "org/apache/spark" };
+
+    //when loading class (indirectly used by SPARK_CL_PREEMPT_CLASSES), some of them should NOT use parent's first
+    private static String[] THIS_CL_PRECEDENT_CLASSES = new String[] { "javax.ws.rs", "org.apache.hadoop.hive" };
+
+    //when loading class (indirectly used by SPARK_CL_PREEMPT_CLASSES), some of them should use parent's first
+    private static String[] PARENT_CL_PRECEDENT_CLASSES = new String[] {
+            //            // Java standard library:
+            "com.sun.", "launcher.", "java.", "javax.", "org.ietf", "org.omg", "org.w3c", "org.xml", "sunw.", "sun.",
+            // logging
+            "org.apache.commons.logging", "org.apache.log4j", "org.slf4j", "org.apache.hadoop",
+            // Hadoop/HBase/ZK:
+            "org.apache.kylin", "com.intellij", "org.apache.calcite" };
+
+    private static final Set<String> classNotFoundCache = new HashSet<>();
+    private static Logger logger = LoggerFactory.getLogger(SparkClassLoader.class);
+
+    static {
+        String sparkclassloader_spark_cl_preempt_classes = System.getenv("SPARKCLASSLOADER_SPARK_CL_PREEMPT_CLASSES");
+        if (!StringUtils.isEmpty(sparkclassloader_spark_cl_preempt_classes)) {
+            SPARK_CL_PREEMPT_CLASSES = StringUtils.split(sparkclassloader_spark_cl_preempt_classes, ",");
+        }
+
+        String sparkclassloader_spark_cl_preempt_files = System.getenv("SPARKCLASSLOADER_SPARK_CL_PREEMPT_FILES");
+        if (!StringUtils.isEmpty(sparkclassloader_spark_cl_preempt_files)) {
+            SPARK_CL_PREEMPT_FILES = StringUtils.split(sparkclassloader_spark_cl_preempt_files, ",");
+        }
+
+        String sparkclassloader_this_cl_precedent_classes = System.getenv("SPARKCLASSLOADER_THIS_CL_PRECEDENT_CLASSES");
+        if (!StringUtils.isEmpty(sparkclassloader_this_cl_precedent_classes)) {
+            THIS_CL_PRECEDENT_CLASSES = StringUtils.split(sparkclassloader_this_cl_precedent_classes, ",");
+        }
+
+        String sparkclassloader_parent_cl_precedent_classes = System
+                .getenv("SPARKCLASSLOADER_PARENT_CL_PRECEDENT_CLASSES");
+        if (!StringUtils.isEmpty(sparkclassloader_parent_cl_precedent_classes)) {
+            PARENT_CL_PRECEDENT_CLASSES = StringUtils.split(sparkclassloader_parent_cl_precedent_classes, ",");
+        }
+
+        try {
+            final Method registerParallel = ClassLoader.class.getDeclaredMethod("registerAsParallelCapable");
+            AccessController.doPrivileged(new PrivilegedAction<Object>() {
+                public Object run() {
+                    registerParallel.setAccessible(true);
+                    return null;
+                }
+            });
+            Boolean result = (Boolean) registerParallel.invoke(null);
+            if (!result) {
+                logger.warn("registrationFailed");
+            }
+        } catch (Exception ignore) {
+
+        }
+    }
+
+    /**
+     * Creates a DynamicClassLoader that can load classes dynamically
+     * from jar files under a specific folder.
+     *
+     * @param parent the parent ClassLoader to set.
+     */
+    protected SparkClassLoader(ClassLoader parent) throws IOException {
+        super(new URL[] {}, parent);
+        init();
+    }
+
+    public void init() throws MalformedURLException {
+        String spark_home = System.getenv("SPARK_HOME");
+        if (spark_home == null) {
+            spark_home = System.getProperty("SPARK_HOME");
+            if (spark_home == null) {
+                throw new RuntimeException(
+                        "Spark home not found; set it explicitly or use the SPARK_HOME environment variable.");
+            }
+        }
+        File file = new File(spark_home + "/jars");
+        File[] jars = file.listFiles();
+        for (File jar : jars) {
+            addURL(jar.toURI().toURL());
+        }
+        if (System.getenv("KYLIN_HOME") != null) {
+            // for prod
+            String kylin_home = System.getenv("KYLIN_HOME");
+            File sparkJar = findFile(kylin_home + "/lib", "kylin-udf-.*.jar");
+            if (sparkJar != null) {
+                logger.info("Add kylin UDF jar to spark classloader : " + sparkJar.getName());
+                addURL(sparkJar.toURI().toURL());
+            } else {
+                logger.warn("Can not found kylin UDF jar, please set KYLIN_HOME and make sure the kylin-udf-*.jar exists in $KYLIN_HOME/lib");
+            }
+        } else if (Files.exists(Paths.get("../udf/target/classes"))) {
+            //  for debugtomcat
+            logger.info("Add kylin UDF classes to spark classloader");
+            addURL(new File("../udf/target/classes").toURI().toURL());
+        }
+
+    }
+
+    @Override
+    public Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+
+        if (needToUseGlobal(name)) {
+            logger.debug("delegate " + name + " directly to parent");
+            return super.loadClass(name, resolve);
+        }
+        return doLoadclass(name);
+    }
+
+    private Class<?> doLoadclass(String name) throws ClassNotFoundException {
+        synchronized (getClassLoadingLock(name)) {
+            // Check whether the class has already been loaded:
+            Class<?> clasz = findLoadedClass(name);
+            if (clasz != null) {
+                logger.debug("Class " + name + " already loaded");
+            } else {
+                try {
+                    // Try to find this class using the URLs passed to this ClassLoader
+                    logger.debug("Finding class: " + name);
+                    clasz = super.findClass(name);
+                    if (clasz == null) {
+                        logger.debug("cannot find class" + name);
+                    }
+                } catch (ClassNotFoundException e) {
+                    classNotFoundCache.add(name);
+                    // Class not found using this ClassLoader, so delegate to parent
+                    logger.debug("Class " + name + " not found - delegating to parent");
+                    try {
+                        // sparder and query module has some class start with org.apache.spark,
+                        // We need to use some lib that does not exist in spark/jars
+                        clasz = getParent().loadClass(name);
+                    } catch (ClassNotFoundException e2) {
+                        // Class not found in this ClassLoader or in the parent ClassLoader
+                        // Log some debug output before re-throwing ClassNotFoundException
+                        logger.debug("Class " + name + " not found in parent loader");
+                        throw e2;
+                    }
+                }
+            }
+            return clasz;
+        }
+    }
+
+    private boolean isThisCLPrecedent(String name) {
+        for (String exemptPrefix : THIS_CL_PRECEDENT_CLASSES) {
+            if (name.startsWith(exemptPrefix)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean isParentCLPrecedent(String name) {
+        for (String exemptPrefix : PARENT_CL_PRECEDENT_CLASSES) {
+            if (name.startsWith(exemptPrefix)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean needToUseGlobal(String name) {
+        if (name.startsWith("org.apache.spark.sql.execution.datasources.sparder.batch.SparderBatchFileFormat")) {
+            return true;
+        }
+
+        return !isThisCLPrecedent(name) && !classNeedPreempt(name) && isParentCLPrecedent(name);
+    }
+
+    boolean classNeedPreempt(String name) {
+        if (classNotFoundCache.contains(name)) {
+            return false;
+        }
+        for (String exemptPrefix : SPARK_CL_PREEMPT_CLASSES) {
+            if (name.startsWith(exemptPrefix)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    boolean fileNeedPreempt(String name) {
+        for (String exemptPrefix : SPARK_CL_PREEMPT_FILES) {
+            if (name.contains(exemptPrefix)) {
+                return true;
+            }
+        }
+        return false;
+    }
+}
diff --git a/tomcat-ext/src/main/java/org/apache/kylin/ext/TomcatClassLoader.java b/tomcat-ext/src/main/java/org/apache/kylin/ext/TomcatClassLoader.java
new file mode 100644
index 0000000..89717ec
--- /dev/null
+++ b/tomcat-ext/src/main/java/org/apache/kylin/ext/TomcatClassLoader.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.ext;
+
+import org.apache.catalina.loader.ParallelWebappClassLoader;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.kylin.ext.ClassLoaderUtils.findFile;
+
+public class TomcatClassLoader extends ParallelWebappClassLoader {
+    private static String[] PARENT_CL_PRECEDENT_CLASSES = new String[] {
+            // Java standard library:
+            "com.sun.", "launcher.", "javax.", "org.ietf", "java", "org.omg", "org.w3c", "org.xml", "sunw.",
+            // logging
+            "org.slf4j", "org.apache.commons.logging", "org.apache.log4j", "org.apache.catalina", "org.apache.tomcat" };
+
+    private static String[] THIS_CL_PRECEDENT_CLASSES = new String[] { "org.apache.kylin",
+            "org.apache.calcite" };
+
+    private static String[] CODEGEN_CLASSES = new String[] { "org.apache.spark.sql.catalyst.expressions.Object",
+            "Baz" };
+
+    private static final Set<String> wontFindClasses = new HashSet<>();
+
+    static {
+        String tomcatclassloader_parent_cl_precedent_classes = System
+                .getenv("TOMCATCLASSLOADER_PARENT_CL_PRECEDENT_CLASSES");
+        if (!StringUtils.isEmpty(tomcatclassloader_parent_cl_precedent_classes)) {
+            PARENT_CL_PRECEDENT_CLASSES = StringUtils.split(tomcatclassloader_parent_cl_precedent_classes, ",");
+        }
+
+        String tomcatclassloader_this_cl_precedent_classes = System
+                .getenv("TOMCATCLASSLOADER_THIS_CL_PRECEDENT_CLASSES");
+        if (!StringUtils.isEmpty(tomcatclassloader_this_cl_precedent_classes)) {
+            THIS_CL_PRECEDENT_CLASSES = StringUtils.split(tomcatclassloader_this_cl_precedent_classes, ",");
+        }
+
+        String tomcatclassloader_codegen_classes = System.getenv("TOMCATCLASSLOADER_CODEGEN_CLASSES");
+        if (!StringUtils.isEmpty(tomcatclassloader_codegen_classes)) {
+            CODEGEN_CLASSES = StringUtils.split(tomcatclassloader_codegen_classes, ",");
+        }
+
+        wontFindClasses.add("Class");
+        wontFindClasses.add("Object");
+        wontFindClasses.add("org");
+        wontFindClasses.add("java.lang.org");
+        wontFindClasses.add("java.lang$org");
+        wontFindClasses.add("java$lang$org");
+        wontFindClasses.add("org.apache");
+        wontFindClasses.add("org.apache.calcite");
+        wontFindClasses.add("org.apache.calcite.runtime");
+        wontFindClasses.add("org.apache.calcite.linq4j");
+        wontFindClasses.add("Long");
+        wontFindClasses.add("String");
+    }
+
+    public static TomcatClassLoader defaultClassLoad = null;
+    private static Logger logger = LoggerFactory.getLogger(TomcatClassLoader.class);
+    public SparkClassLoader sparkClassLoader;
+
+    /**
+     * Creates a DynamicClassLoader that can load classes dynamically
+     * from jar files under a specific folder.
+     *
+     * @param parent the parent ClassLoader to set.
+     */
+    public TomcatClassLoader(ClassLoader parent) throws IOException {
+        super(parent);
+        sparkClassLoader = new SparkClassLoader(this);
+        ClassLoaderUtils.setSparkClassLoader(sparkClassLoader);
+        ClassLoaderUtils.setOriginClassLoader(this);
+        defaultClassLoad = this;
+        init();
+    }
+
+    public void init() {
+        String spark_home = System.getenv("SPARK_HOME");
+        try {
+            //  SparkContext use spi to match deploy mode
+            //  otherwise SparkContext init fail ,can not find yarn deploy mode
+            File yarnJar = findFile(spark_home + "/jars", "spark-yarn.*.jar");
+            addURL(yarnJar.toURI().toURL());
+            //  jersey in spark will attempt find @Path class file in current classloader.
+            // Not possible to delegate to spark loader
+            // otherwise spark web ui executors tab can not render
+            File coreJar = findFile(spark_home + "/jars", "spark-core.*.jar");
+            addURL(coreJar.toURI().toURL());
+        } catch (MalformedURLException e) {
+            e.printStackTrace();
+        }
+
+    }
+
+    @Override
+    public Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+        // when calcite compile class, some stupid class name will be proposed, not worth to actually lookup
+        if (isWontFind(name)) {
+            throw new ClassNotFoundException();
+        }
+        // spark codegen classload parent is Thread.currentThread().getContextClassLoader()
+        // and calcite baz classloader is EnumerableInterpretable.class's classloader
+        if (isCodeGen(name)) {
+            throw new ClassNotFoundException();
+        }
+        // class loaders should conform to global's
+        if (name.startsWith("org.apache.kylin.ext")) {
+            return parent.loadClass(name);
+        }
+
+        if (name.startsWith("org.apache.spark.sql.execution.datasources.sparder.batch.SparderBatchFileFormat")) {
+            return super.loadClass(name, resolve);
+        }
+
+        // if spark CL needs preempt
+        if (sparkClassLoader.classNeedPreempt(name)) {
+            return sparkClassLoader.loadClass(name);
+        }
+        // tomcat classpath include KAP_HOME/lib , ensure this classload can load kap class
+        if (isParentCLPrecedent(name) && !isThisCLPrecedent(name)) {
+            logger.debug("delegate " + name + " directly to parent");
+            return parent.loadClass(name);
+        }
+        return super.loadClass(name, resolve);
+    }
+
+    @Override
+    public InputStream getResourceAsStream(String name) {
+        if (sparkClassLoader.fileNeedPreempt(name)) {
+            return sparkClassLoader.getResourceAsStream(name);
+        }
+        return super.getResourceAsStream(name);
+
+    }
+
+    private boolean isParentCLPrecedent(String name) {
+        for (String exemptPrefix : PARENT_CL_PRECEDENT_CLASSES) {
+            if (name.startsWith(exemptPrefix)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean isThisCLPrecedent(String name) {
+        for (String exemptPrefix : THIS_CL_PRECEDENT_CLASSES) {
+            if (name.startsWith(exemptPrefix)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean isWontFind(String name) {
+        return wontFindClasses.contains(name);
+    }
+
+    private boolean isCodeGen(String name) {
+        for (String exemptPrefix : CODEGEN_CLASSES) {
+            if (name.startsWith(exemptPrefix)) {
+                return true;
+            }
+        }
+        return false;
+    }
+}
diff --git a/tool/pom.xml b/tool/pom.xml
index 958bd55..1b3e305 100644
--- a/tool/pom.xml
+++ b/tool/pom.xml
@@ -47,6 +47,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-storage-parquet</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-engine-mr</artifactId>
         </dependency>
 
diff --git a/webapp/app/META-INF/context.xml b/webapp/app/META-INF/context.xml
new file mode 100644
index 0000000..0ad90dc
--- /dev/null
+++ b/webapp/app/META-INF/context.xml
@@ -0,0 +1,38 @@
+<?xml version='1.0' encoding='utf-8'?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~  
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~  
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<!-- The contents of this file will be loaded for each web application -->
+<Context allowLinking="true">
+
+    <!-- Default set of monitored resources -->
+    <WatchedResource>WEB-INF/web.xml</WatchedResource>
+
+    <!-- Uncomment this to disable session persistence across Tomcat restarts -->
+    <!--
+    <Manager pathname="" />
+    -->
+
+    <!-- Uncomment this to enable Comet connection tacking (provides events
+         on session expiration as well as webapp lifecycle) -->
+    <!--
+    <Valve className="org.apache.catalina.valves.CometConnectionManagerValve" />
+    -->
+
+    <Loader loaderClass="org.apache.kylin.ext.DebugTomcatClassLoader"/>
+
+</Context>