You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/12/10 03:41:30 UTC

[kylin] 06/06: KYLIN-3625 code review

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch kylin-on-parquet
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 4d79e0de1593660fa808fcf842592c5fe8dc9b15
Author: shaofengshi <sh...@apache.org>
AuthorDate: Mon Dec 10 11:40:21 2018 +0800

    KYLIN-3625 code review
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   2 +-
 .../src/main/resources/kylin-defaults.properties   |  12 +-
 .../java/org/apache/kylin/gridtable/GTRecord.java  |  45 ---
 .../java/org/apache/kylin/gridtable/GTUtil.java    |  36 ++
 .../filter/BuiltInFunctionTupleFilter.java         |  16 +-
 .../kylin/metadata/filter/CaseTupleFilter.java     |   6 +-
 .../kylin/metadata/filter/ColumnTupleFilter.java   |   2 +-
 .../kylin/metadata/filter/CompareTupleFilter.java  |   8 +-
 .../kylin/metadata/filter/ConstantTupleFilter.java |   2 +-
 .../kylin/metadata/filter/DynamicTupleFilter.java  |   2 +-
 .../kylin/metadata/filter/ExtractTupleFilter.java  |   2 +-
 .../kylin/metadata/filter/LogicalTupleFilter.java  |   4 +-
 .../apache/kylin/metadata/filter/TupleFilter.java  |   2 +-
 .../metadata/filter/UDF/MassInTupleFilter.java     |   2 +-
 .../metadata/filter/UnsupportedTupleFilter.java    |   2 +-
 .../storage/gtrecord/CubeScanRangePlanner.java     |   2 +-
 .../storage/path/DefaultStoragePathBuilder.java    |   1 -
 .../kylin/storage/path/IStoragePathBuilder.java    |   8 +-
 .../apache/kylin/engine/mr/JobBuilderSupport.java  |   4 +-
 .../engine/spark/SparkCubingByLayerParquet.java    | 433 ---------------------
 .../apache/kylin/rest/job/StorageCleanupJob.java   |   2 +-
 .../execute/d861b8b7-c773-47ab-bb1e-c8782ae8d930   |   2 +-
 .../org/apache/kylin/source/hive/HiveMRInput.java  |  10 +-
 .../apache/kylin/source/hive/HiveSparkInput.java   |   2 +-
 .../apache/kylin/source/hive/HiveMRInputTest.java  |   4 +-
 .../apache/kylin/source/kafka/KafkaMRInput.java    |   4 +-
 .../apache/kylin/source/kafka/KafkaSparkInput.java |   2 +-
 .../storage/hbase/lookup/HBaseLookupMRSteps.java   |   2 +-
 .../hbase/steps/HDFSPathGarbageCollectionStep.java |   2 +-
 .../kylin/storage/hbase/util/CubeMigrationCLI.java |   4 +-
 .../storage/hbase/util/StorageCleanupJob.java      |   2 +-
 .../kylin/storage/parquet/ParquetStorage.java      |   2 +-
 .../storage/parquet/cube/CubeStorageQuery.java     |   9 -
 .../storage/parquet/spark/ParquetPayload.java      |   5 +-
 .../kylin/storage/parquet/spark/ParquetTask.java   |  57 +--
 .../storage/parquet/spark/SparkSubmitter.java      |   9 +-
 .../spark/gtscanner/ParquetRecordGTScanner.java    |  54 ++-
 .../gtscanner/ParquetRecordGTScanner4Cube.java     |  34 +-
 .../parquet/steps/ConvertToParquetReducer.java     |  21 +-
 .../parquet/steps/CuboidToPartitionMapping.java    |  25 +-
 .../storage/parquet/steps/MRCubeParquetJob.java    |  12 +-
 .../storage/parquet/steps/ParquetConvertor.java    |   9 +-
 .../storage/parquet/steps/ParquetJobSteps.java     |   6 +-
 .../storage/parquet/steps/ParquetMROutput.java     |   6 -
 .../storage/parquet/steps/ParquetMRSteps.java      |   1 -
 .../storage/parquet/steps/SparkCubeParquet.java    |  61 +--
 .../apache/kylin/ext/DebugTomcatClassLoader.java   |   2 +-
 .../org/apache/kylin/ext/SparkClassLoader.java     |   4 +-
 .../org/apache/kylin/tool/CubeMigrationCLI.java    |   4 +-
 webapp/app/js/model/cubeConfig.js                  |   4 +-
 50 files changed, 239 insertions(+), 713 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 2633ddf..e7f7236 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1864,7 +1864,7 @@ abstract public class KylinConfigBase implements Serializable {
         return getOptional("kylin.source.jdbc.adaptor");
     }
 
-    public String getStorageSystemPathBuilderClz() {
+    public String getStoragePathBuilder() {
         return getOptional("storage.path.builder", "org.apache.kylin.storage.path.DefaultStoragePathBuilder");
     }
 }
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index 8115a50..d0a4ae2 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -357,23 +357,21 @@ kylin.engine.spark-conf-mergedict.spark.memory.fraction=0.2
 
 kylin.storage.columnar.spark-env.HADOOP_CONF_DIR=${kylin_hadoop_conf_dir}
 ## for any spark config entry in http://spark.apache.org/docs/latest/configuration.html#spark-properties, prefix it with "kylin.storage.columnar.spark-conf" and append here
-kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dzipkin.collector-hostname=${ZIPKIN_HOSTNAME} -Dzipkin.collector-port=${ZIPKIN_SCRIBE_PORT} -DinfluxDB.address=${INFLUXDB_ADDRESS} -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkap.spark.identifier=${KAP_SPARK_IDENTIFIER} -Dkap.hdfs.working.dir=${KAP_HDFS_WORKING_DIR} -Dkap.metadata.url=${KAP_METADATA_IDENTIFIER} -XX:MaxDirectMemorySize=896M -Dsparder.dict.cache.size=${SPARDER [...]
+kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current
 kylin.storage.columnar.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current
 kylin.storage.columnar.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
-#kylin.storage.columnar.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializer
+kylin.storage.columnar.spark-conf.yarn.am.memory=512m
 kylin.storage.columnar.spark-conf.spark.driver.memory=1g
 kylin.storage.columnar.spark-conf.spark.executor.memory=1g
-kylin.storage.columnar.spark-conf.spark.yarn.executor.memoryOverhead=512
-kylin.storage.columnar.spark-conf.yarn.am.memory=512m
 kylin.storage.columnar.spark-conf.spark.executor.cores=1
 kylin.storage.columnar.spark-conf.spark.executor.instances=1
+kylin.storage.columnar.spark-conf.spark.yarn.executor.memoryOverhead=378
+kylin.storage.columnar.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false
 kylin.storage.columnar.spark-conf.spark.task.maxFailures=1
-kylin.storage.columnar.spark-conf.spark.ui.port=4041
 kylin.storage.columnar.spark-conf.spark.locality.wait=0s
 kylin.storage.columnar.spark-conf.spark.sql.dialect=hiveql
-kylin.storage.columnar.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false
 kylin.storage.columnar.spark-conf.hive.execution.engine=MR
 kylin.storage.columnar.spark-conf.spark.scheduler.listenerbus.eventqueue.size=100000000
 kylin.storage.columnar.spark-conf.spark.master=yarn-client
 kylin.storage.columnar.spark-conf.spark.broadcast.compress=false
-
+#kylin.storage.columnar.spark-conf.spark.yarn.archive=hdfs://namenode:8020/kylin/spark/spark-libs.jar
\ No newline at end of file
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index 24278c4..8f0e0fb 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -20,17 +20,7 @@ package org.apache.kylin.gridtable;
 
 import com.google.common.base.Preconditions;
 import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.dimension.DictionaryDimEnc;
-import org.apache.kylin.measure.bitmap.BitmapSerializer;
-import org.apache.kylin.measure.dim.DimCountDistincSerializer;
-import org.apache.kylin.measure.extendedcolumn.ExtendedColumnSerializer;
-import org.apache.kylin.measure.hllc.HLLCSerializer;
-import org.apache.kylin.measure.percentile.PercentileSerializer;
-import org.apache.kylin.measure.raw.RawSerializer;
-import org.apache.kylin.measure.topn.TopNCounterSerializer;
-import org.apache.kylin.metadata.datatype.DataTypeSerializer;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -112,41 +102,6 @@ public class GTRecord implements Comparable<GTRecord> {
         return this;
     }
 
-    /** set record to the codes of specified values, reuse given space to hold the codes */
-    @SuppressWarnings("checkstyle:BooleanExpressionComplexity")
-    public GTRecord setValuesParquet(ImmutableBitSet selectedCols, ByteArray space, Object... values) {
-        assert selectedCols.cardinality() == values.length;
-
-        ByteBuffer buf = space.asBuffer();
-        int pos = buf.position();
-        for (int i = 0; i < selectedCols.trueBitCount(); i++) {
-            int c = selectedCols.trueBitAt(i);
-
-            DataTypeSerializer serializer = info.codeSystem.getSerializer(c);
-            if (serializer instanceof DictionaryDimEnc.DictionarySerializer) {
-                int len = serializer.peekLength(buf);
-                BytesUtil.writeUnsigned((Integer) values[i], len, buf);
-                int newPos = buf.position();
-                cols[c].reset(buf.array(), buf.arrayOffset() + pos, newPos - pos);
-                pos = newPos;
-            } else if (serializer instanceof TopNCounterSerializer ||
-                    serializer instanceof HLLCSerializer ||
-                    serializer instanceof BitmapSerializer ||
-                    serializer instanceof ExtendedColumnSerializer ||
-                    serializer instanceof PercentileSerializer ||
-                    serializer instanceof DimCountDistincSerializer ||
-                    serializer instanceof RawSerializer) {
-                cols[c].reset((byte[]) values[i], 0, ((byte[]) values[i]).length);
-            } else {
-                info.codeSystem.encodeColumnValue(c, values[i], buf);
-                int newPos = buf.position();
-                cols[c].reset(buf.array(), buf.arrayOffset() + pos, newPos - pos);
-                pos = newPos;
-            }
-        }
-        return this;
-    }
-
     /** decode and return the values of this record */
     public Object[] getValues() {
         return getValues(info.colAll, new Object[info.getColumnCount()]);
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
index 49c68c5..8821a8f 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
@@ -456,4 +456,40 @@ public class GTUtil {
             }
         }
     }
+
+    /** set record to the codes of specified values, reuse given space to hold the codes */
+    public static GTRecord setValuesParquet(GTRecord record, ByteBuffer buf, Map<Integer, Integer> dictCols,
+            Map<Integer, Integer> binaryCols, Map<Integer, Integer> otherCols, Object[] values) {
+
+        int pos = buf.position();
+        int i, c;
+        for (Map.Entry<Integer, Integer> entry : dictCols.entrySet()) {
+            i = entry.getKey();
+            c = entry.getValue();
+            DictionaryDimEnc.DictionarySerializer serializer = (DictionaryDimEnc.DictionarySerializer) record.info.codeSystem
+                    .getSerializer(c);
+            int len = serializer.peekLength(buf);
+            BytesUtil.writeUnsigned((Integer) values[i], len, buf);
+            int newPos = buf.position();
+            record.cols[c].reset(buf.array(), buf.arrayOffset() + pos, newPos - pos);
+            pos = newPos;
+        }
+
+        for (Map.Entry<Integer, Integer> entry : binaryCols.entrySet()) {
+            i = entry.getKey();
+            c = entry.getValue();
+            record.cols[c].reset((byte[]) values[i], 0, ((byte[]) values[i]).length);
+        }
+
+        for (Map.Entry<Integer, Integer> entry : otherCols.entrySet()) {
+            i = entry.getKey();
+            c = entry.getValue();
+            record.info.codeSystem.encodeColumnValue(c, values[i], buf);
+            int newPos = buf.position();
+            record.cols[c].reset(buf.array(), buf.arrayOffset() + pos, newPos - pos);
+            pos = newPos;
+        }
+
+        return record;
+    }
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/BuiltInFunctionTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/BuiltInFunctionTupleFilter.java
index 38cbd66..36b5369 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/BuiltInFunctionTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/BuiltInFunctionTupleFilter.java
@@ -177,18 +177,18 @@ public class BuiltInFunctionTupleFilter extends FunctionTupleFilter {
     }
 
     @Override
-    public String toSparkSqlFilter() {
+    public String toSQL() {
         List<? extends TupleFilter> childFilter = this.getChildren();
         String op = this.getName();
         switch (op) {
             case "LIKE":
                 assert childFilter.size() == 2;
-                return childFilter.get(0).toSparkSqlFilter() + toSparkFuncMap.get(op) + childFilter.get(1).toSparkSqlFilter();
+                return childFilter.get(0).toSQL() + toSparkFuncMap.get(op) + childFilter.get(1).toSQL();
             case "||":
                 StringBuilder result = new StringBuilder().append(toSparkFuncMap.get(op)).append("(");
                 int index = 0;
                 for (TupleFilter filter : childFilter) {
-                    result.append(filter.toSparkSqlFilter());
+                    result.append(filter.toSQL());
                     if (index < childFilter.size() - 1) {
                         result.append(",");
                     }
@@ -200,17 +200,17 @@ public class BuiltInFunctionTupleFilter extends FunctionTupleFilter {
             case "UPPER":
             case "CHAR_LENGTH":
                 assert childFilter.size() == 1;
-                return toSparkFuncMap.get(op) + "(" + childFilter.get(0).toSparkSqlFilter() + ")";
+                return toSparkFuncMap.get(op) + "(" + childFilter.get(0).toSQL() + ")";
             case "SUBSTRING":
                 assert childFilter.size() == 3;
-                return toSparkFuncMap.get(op) + "(" + childFilter.get(0).toSparkSqlFilter() + "," + childFilter.get(1).toSparkSqlFilter() + "," + childFilter.get(2).toSparkSqlFilter() + ")";
+                return toSparkFuncMap.get(op) + "(" + childFilter.get(0).toSQL() + "," + childFilter.get(1).toSQL() + "," + childFilter.get(2).toSQL() + ")";
             default:
                 if (childFilter.size() == 1) {
-                    return op + "(" + childFilter.get(0).toSparkSqlFilter() + ")";
+                    return op + "(" + childFilter.get(0).toSQL() + ")";
                 } else if (childFilter.size() == 2) {
-                    return childFilter.get(0).toSparkSqlFilter() + op + childFilter.get(1).toSparkSqlFilter();
+                    return childFilter.get(0).toSQL() + op + childFilter.get(1).toSQL();
                 } else if (childFilter.size() == 3) {
-                    return op + "(" + childFilter.get(0).toSparkSqlFilter() + "," + childFilter.get(1).toSparkSqlFilter() + "," + childFilter.get(2).toSparkSqlFilter() + ")";
+                    return op + "(" + childFilter.get(0).toSQL() + "," + childFilter.get(1).toSQL() + "," + childFilter.get(2).toSQL() + ")";
                 }
                 throw new IllegalArgumentException("Operator " + op + " is not supported");
         }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java
index 4305557..16e645b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java
@@ -134,16 +134,16 @@ public class CaseTupleFilter extends TupleFilter implements IOptimizeableTupleFi
     }
 
     @Override
-    public String toSparkSqlFilter() {
+    public String toSQL() {
         String result = "(case ";
         TupleFilter whenFilter;
         TupleFilter thenFilter;
         for (int i = 0; i < this.getWhenFilters().size(); i++) {
             whenFilter = this.getWhenFilters().get(i);
             thenFilter = this.getThenFilters().get(i);
-            result += " when " + whenFilter.toSparkSqlFilter() + " then " + thenFilter.toSparkSqlFilter();
+            result += " when " + whenFilter.toSQL() + " then " + thenFilter.toSQL();
         }
-        result += " else " + this.getElseFilter().toSparkSqlFilter();
+        result += " else " + this.getElseFilter().toSQL();
         result += " end)";
         return result;
     }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
index 09a16f5..caab5a7 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
@@ -163,7 +163,7 @@ public class ColumnTupleFilter extends TupleFilter {
     }
 
     @Override
-    public String toSparkSqlFilter() {
+    public String toSQL() {
         return this.columnRef.getTableAlias() + "_" + this.columnRef.getName();
     }
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
index b63ac0a..33e9819 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
@@ -280,7 +280,7 @@ public class CompareTupleFilter extends TupleFilter implements IOptimizeableTupl
     }
 
     @Override
-    public String toSparkSqlFilter() {
+    public String toSQL() {
         List<? extends TupleFilter> childFilter = this.getChildren();
         switch (this.getOperator()) {
             case EQ:
@@ -290,15 +290,15 @@ public class CompareTupleFilter extends TupleFilter implements IOptimizeableTupl
             case GTE:
             case LTE:
                 assert childFilter.size() == 2;
-                return childFilter.get(0).toSparkSqlFilter() + toSparkOpMap.get(this.getOperator()) + childFilter.get(1).toSparkSqlFilter();
+                return childFilter.get(0).toSQL() + toSparkOpMap.get(this.getOperator()) + childFilter.get(1).toSQL();
             case IN:
             case NOTIN:
                 assert childFilter.size() == 2;
-                return childFilter.get(0).toSparkSqlFilter() + toSparkOpMap.get(this.getOperator()) + "(" + childFilter.get(1).toSparkSqlFilter() + ")";
+                return childFilter.get(0).toSQL() + toSparkOpMap.get(this.getOperator()) + "(" + childFilter.get(1).toSQL() + ")";
             case ISNULL:
             case ISNOTNULL:
                 assert childFilter.size() == 1;
-                return childFilter.get(0).toSparkSqlFilter() + toSparkOpMap.get(this.getOperator());
+                return childFilter.get(0).toSQL() + toSparkOpMap.get(this.getOperator());
             default:
                 throw new IllegalStateException("operator " + this.getOperator() + " not supported: ");
         }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java
index e9ecf16..61d7fb6 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java
@@ -114,7 +114,7 @@ public class ConstantTupleFilter extends TupleFilter {
     }
 
     @Override
-    public String toSparkSqlFilter() {
+    public String toSQL() {
         if (this.equals(TRUE)) {
             return "true";
         } else if (this.equals(FALSE)) {
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java
index c4490e5..ffc1527 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java
@@ -79,7 +79,7 @@ public class DynamicTupleFilter extends TupleFilter {
     }
 
     @Override
-    public String toSparkSqlFilter() {
+    public String toSQL() {
         return "1=1";
     }
 
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
index 36ea021..59c3e5f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
@@ -123,7 +123,7 @@ public class ExtractTupleFilter extends TupleFilter {
     }
 
     @Override
-    public String toSparkSqlFilter() {
+    public String toSQL() {
         return "1=1";
     }
 
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
index 99cf3f3..ebea184 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
@@ -155,7 +155,7 @@ public class LogicalTupleFilter extends TupleFilter implements IOptimizeableTupl
     }
 
     @Override
-    public String toSparkSqlFilter() {
+    public String toSQL() {
         StringBuilder result = new StringBuilder("");
         switch (this.getOperator()) {
             case AND:
@@ -164,7 +164,7 @@ public class LogicalTupleFilter extends TupleFilter implements IOptimizeableTupl
                 String op = toSparkOpMap.get(this.getOperator());
                 int index = 0;
                 for (TupleFilter filter : this.getChildren()) {
-                    result.append(filter.toSparkSqlFilter());
+                    result.append(filter.toSQL());
                     if (index < this.getChildren().size() - 1) {
                         result.append(op);
                     }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
index 28a8c6c..0d3fb3e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
@@ -421,6 +421,6 @@ public abstract class TupleFilter {
         }
     }
 
-    public abstract String toSparkSqlFilter();
+    public abstract String toSQL();
 
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInTupleFilter.java
index b153003..ab7f6e0 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInTupleFilter.java
@@ -154,7 +154,7 @@ public class MassInTupleFilter extends FunctionTupleFilter {
     }
 
     @Override
-    public String toSparkSqlFilter() {
+    public String toSQL() {
         return "1=1";
     }
 
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UnsupportedTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UnsupportedTupleFilter.java
index 143ee6d..3bd147c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UnsupportedTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UnsupportedTupleFilter.java
@@ -58,7 +58,7 @@ public class UnsupportedTupleFilter extends TupleFilter {
     }
 
     @Override
-    public String toSparkSqlFilter() {
+    public String toSQL() {
         return "1=1";
     }
 }
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
index 60fe33f..9c21d82 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
@@ -111,7 +111,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
         this.havingFilter = havingFilter;
 
         if (convertedFilter != null) {
-            this.filterPushDownSQL = convertedFilter.toSparkSqlFilter();
+            this.filterPushDownSQL = convertedFilter.toSQL();
             logger.info("--filterPushDownSQL--: {}", this.filterPushDownSQL);
         }
 
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/path/DefaultStoragePathBuilder.java b/core-storage/src/main/java/org/apache/kylin/storage/path/DefaultStoragePathBuilder.java
index 1e50ba0..858a431 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/path/DefaultStoragePathBuilder.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/path/DefaultStoragePathBuilder.java
@@ -34,7 +34,6 @@ public class DefaultStoragePathBuilder implements IStoragePathBuilder {
     @Override
     public String getJobRealizationRootPath(CubeSegment cubeSegment, String jobId) {
         String jobWorkingDir = getJobWorkingDir(cubeSegment.getConfig().getHdfsWorkingDirectory(), jobId);
-
         return jobWorkingDir + SLASH + cubeSegment.getRealization().getName();
     }
 
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/path/IStoragePathBuilder.java b/core-storage/src/main/java/org/apache/kylin/storage/path/IStoragePathBuilder.java
index 7a70f98..5ad20d1 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/path/IStoragePathBuilder.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/path/IStoragePathBuilder.java
@@ -21,11 +21,11 @@ package org.apache.kylin.storage.path;
 import org.apache.kylin.cube.CubeSegment;
 
 public interface IStoragePathBuilder {
-    public static final String SLASH = "/";
+    String SLASH = "/";
 
-    public String getJobWorkingDir(String workingDir, String jobId);
+    String getJobWorkingDir(String workingDir, String jobId);
 
-    public String getJobRealizationRootPath(CubeSegment cubeSegment, String jobId);
+    String getJobRealizationRootPath(CubeSegment cubeSegment, String jobId);
 
-    public String getRealizationFinalDataPath(CubeSegment cubeSegment);
+    String getRealizationFinalDataPath(CubeSegment cubeSegment);
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 8525dcc..934c04e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -77,8 +77,8 @@ public class JobBuilderSupport {
         this.config = new JobEngineConfig(seg.getConfig());
         this.seg = seg;
         this.submitter = submitter;
-        String pathBuilderClz = seg.getConfig().getStorageSystemPathBuilderClz();
-        this.storagePathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(pathBuilderClz);
+        String pathBuilder = seg.getConfig().getStoragePathBuilder();
+        this.storagePathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(pathBuilder);
     }
 
     public MapReduceExecutable createFactDistinctColumnsStep(String jobId) {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerParquet.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerParquet.java
deleted file mode 100644
index 154c058..0000000
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerParquet.java
+++ /dev/null
@@ -1,433 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.spark;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.kv.RowKeyDecoder;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dimension.AbstractDateDimEnc;
-import org.apache.kylin.dimension.DimensionEncoding;
-import org.apache.kylin.dimension.FixedLenDimEnc;
-import org.apache.kylin.dimension.FixedLenHexDimEnc;
-import org.apache.kylin.dimension.IDimensionEncodingMap;
-import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.CubeStatsReader;
-import org.apache.kylin.engine.mr.common.SerializableConfiguration;
-import org.apache.kylin.measure.BufferedMeasureCodec;
-import org.apache.kylin.measure.MeasureIngester;
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.measure.basic.BasicMeasureType;
-import org.apache.kylin.measure.basic.BigDecimalIngester;
-import org.apache.kylin.measure.basic.DoubleIngester;
-import org.apache.kylin.measure.basic.LongIngester;
-import org.apache.kylin.measure.percentile.PercentileSerializer;
-import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.parquet.example.data.Group;
-import org.apache.parquet.example.data.GroupFactory;
-import org.apache.parquet.example.data.simple.SimpleGroupFactory;
-import org.apache.parquet.hadoop.ParquetOutputFormat;
-import org.apache.parquet.hadoop.example.GroupWriteSupport;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.OriginalType;
-import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.Types;
-import org.apache.spark.Partitioner;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.PairFunction;
-import scala.Tuple2;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-public class SparkCubingByLayerParquet extends SparkCubingByLayer {
-    @Override
-    protected void saveToHDFS(JavaPairRDD<ByteArray, Object[]> rdd, String metaUrl, String cubeName, CubeSegment cubeSeg, String hdfsBaseLocation, int level, Job job, KylinConfig kylinConfig) throws Exception {
-        final IDimensionEncodingMap dimEncMap = cubeSeg.getDimensionEncodingMap();
-
-        Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeSeg.getCubeDesc());
-
-        final Map<TblColRef, String> colTypeMap = Maps.newHashMap();
-        final Map<MeasureDesc, String> meaTypeMap = Maps.newHashMap();
-
-        MessageType schema = cuboidToMessageType(baseCuboid, dimEncMap, cubeSeg.getCubeDesc(), colTypeMap, meaTypeMap);
-
-        logger.info("Schema: {}", schema.toString());
-
-        final CuboidToPartitionMapping cuboidToPartitionMapping = new CuboidToPartitionMapping(cubeSeg, kylinConfig, level);
-
-        logger.info("CuboidToPartitionMapping: {}", cuboidToPartitionMapping.toString());
-
-        JavaPairRDD<ByteArray, Object[]> repartitionedRDD = rdd.repartitionAndSortWithinPartitions(new CuboidPartitioner(cuboidToPartitionMapping));
-
-        String output = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
-
-        job.setOutputFormatClass(CustomParquetOutputFormat.class);
-        GroupWriteSupport.setSchema(schema, job.getConfiguration());
-        CustomParquetOutputFormat.setOutputPath(job, new Path(output));
-        CustomParquetOutputFormat.setWriteSupportClass(job, GroupWriteSupport.class);
-        CustomParquetOutputFormat.setCuboidToPartitionMapping(job, cuboidToPartitionMapping);
-
-        JavaPairRDD<Void, Group> groupRDD = repartitionedRDD.mapToPair(new GenerateGroupRDDFunction(cubeName, cubeSeg.getUuid(), metaUrl, new SerializableConfiguration(job.getConfiguration()), colTypeMap, meaTypeMap));
-
-        groupRDD.saveAsNewAPIHadoopDataset(job.getConfiguration());
-    }
-
-    static class CuboidPartitioner extends Partitioner {
-
-        private CuboidToPartitionMapping mapping;
-
-        public CuboidPartitioner(CuboidToPartitionMapping cuboidToPartitionMapping) {
-            this.mapping = cuboidToPartitionMapping;
-        }
-
-        @Override
-        public int numPartitions() {
-            return mapping.getNumPartitions();
-        }
-
-        @Override
-        public int getPartition(Object key) {
-            ByteArray byteArray = (ByteArray) key;
-            long cuboidId = Bytes.toLong(byteArray.array(), RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN);
-
-            return mapping.getRandomPartitionForCuboidId(cuboidId);
-        }
-    }
-
-    public static class CuboidToPartitionMapping implements Serializable {
-        private Map<Long, List<Integer>> cuboidPartitions;
-        private int partitionNum;
-
-        public CuboidToPartitionMapping(Map<Long, List<Integer>> cuboidPartitions) {
-            this.cuboidPartitions = cuboidPartitions;
-            int partitions = 0;
-            for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
-                partitions = partitions + entry.getValue().size();
-            }
-            this.partitionNum = partitions;
-        }
-
-        public CuboidToPartitionMapping(CubeSegment cubeSeg, KylinConfig kylinConfig, int level) throws IOException {
-            cuboidPartitions = Maps.newHashMap();
-
-            List<Long> layeredCuboids = cubeSeg.getCuboidScheduler().getCuboidsByLayer().get(level);
-            CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSeg, kylinConfig);
-
-            int position = 0;
-            for (Long cuboidId : layeredCuboids) {
-                int partition = estimateCuboidPartitionNum(cuboidId, cubeStatsReader, kylinConfig);
-                List<Integer> positions = Lists.newArrayListWithCapacity(partition);
-
-                for (int i = position; i < position + partition; i++) {
-                    positions.add(i);
-                }
-
-                cuboidPartitions.put(cuboidId, positions);
-                position = position + partition;
-            }
-
-            this.partitionNum = position;
-        }
-
-        public String serialize() throws JsonProcessingException {
-            return JsonUtil.writeValueAsString(cuboidPartitions);
-        }
-
-        public static CuboidToPartitionMapping deserialize(String jsonMapping) throws IOException {
-            Map<Long, List<Integer>> cuboidPartitions = JsonUtil.readValue(jsonMapping, new TypeReference<Map<Long, List<Integer>>>() {});
-            return new CuboidToPartitionMapping(cuboidPartitions);
-        }
-
-        public int getNumPartitions() {
-            return this.partitionNum;
-        }
-
-        public long getCuboidIdByPartition(int partition) {
-            for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
-                if (entry.getValue().contains(partition)) {
-                    return entry.getKey();
-                }
-            }
-
-            throw new IllegalArgumentException("No cuboidId for partition id: " + partition);
-        }
-
-        public int getRandomPartitionForCuboidId(long cuboidId) {
-            List<Integer> partitions = cuboidPartitions.get(cuboidId);
-            return partitions.get(new Random().nextInt(partitions.size()));
-        }
-
-        public int getPartitionNumForCuboidId(long cuboidId) {
-            return cuboidPartitions.get(cuboidId).size();
-        }
-
-        public String getPartitionFilePrefix(int partition) {
-            String prefix = "cuboid_";
-            long cuboid = getCuboidIdByPartition(partition);
-            int partNum = partition % getPartitionNumForCuboidId(cuboid);
-            prefix = prefix + cuboid + "_part" + partNum;
-
-            return prefix;
-        }
-
-        private int estimateCuboidPartitionNum(long cuboidId, CubeStatsReader cubeStatsReader, KylinConfig kylinConfig) {
-            double cuboidSize = cubeStatsReader.estimateCuboidSize(cuboidId);
-            float rddCut = kylinConfig.getSparkRDDPartitionCutMB();
-            int partition = (int) (cuboidSize / rddCut);
-            partition = Math.max(kylinConfig.getSparkMinPartition(), partition);
-            partition = Math.min(kylinConfig.getSparkMaxPartition(), partition);
-            return partition;
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder sb = new StringBuilder();
-            for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
-                sb.append("cuboidId:").append(entry.getKey()).append(" [").append(StringUtils.join(entry.getValue(), ",")).append("]\n");
-            }
-
-            return sb.toString();
-        }
-    }
-
-    public static class CustomParquetOutputFormat extends ParquetOutputFormat {
-        public static final String CUBOID_TO_PARTITION_MAPPING = "cuboidToPartitionMapping";
-
-        @Override
-        public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException {
-            FileOutputCommitter committer = (FileOutputCommitter)this.getOutputCommitter(context);
-            TaskID taskId = context.getTaskAttemptID().getTaskID();
-            int partition = taskId.getId();
-
-            CuboidToPartitionMapping mapping = CuboidToPartitionMapping.deserialize(context.getConfiguration().get(CUBOID_TO_PARTITION_MAPPING));
-
-            return new Path(committer.getWorkPath(), getUniqueFile(context, mapping.getPartitionFilePrefix(partition)+ "-" + getOutputName(context), extension));
-        }
-
-        public static void setCuboidToPartitionMapping(Job job, CuboidToPartitionMapping cuboidToPartitionMapping) throws IOException {
-            String jsonStr = cuboidToPartitionMapping.serialize();
-
-            job.getConfiguration().set(CUBOID_TO_PARTITION_MAPPING, jsonStr);
-        }
-    }
-
-    static class GenerateGroupRDDFunction implements PairFunction<Tuple2<ByteArray, Object[]>, Void, Group> {
-        private volatile transient boolean initialized = false;
-        private String cubeName;
-        private String segmentId;
-        private String metaUrl;
-        private SerializableConfiguration conf;
-        private List<MeasureDesc> measureDescs;
-        private RowKeyDecoder decoder;
-        private Map<TblColRef, String> colTypeMap;
-        private Map<MeasureDesc, String> meaTypeMap;
-        private GroupFactory factory;
-        private BufferedMeasureCodec measureCodec;
-        private PercentileSerializer serializer;
-        private ByteBuffer byteBuffer;
-
-        public GenerateGroupRDDFunction(String cubeName, String segmentId, String metaurl, SerializableConfiguration conf, Map<TblColRef, String> colTypeMap, Map<MeasureDesc, String> meaTypeMap) {
-            this.cubeName = cubeName;
-            this.segmentId = segmentId;
-            this.metaUrl = metaurl;
-            this.conf = conf;
-            this.colTypeMap = colTypeMap;
-            this.meaTypeMap = meaTypeMap;
-        }
-
-        private void init() {
-            KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
-            KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
-            CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
-            CubeDesc cubeDesc = cubeInstance.getDescriptor();
-            CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
-            measureDescs = cubeDesc.getMeasures();
-            decoder = new RowKeyDecoder(cubeSegment);
-            factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(conf.get()));
-            measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures());
-            serializer = new PercentileSerializer(DataType.getType("percentile(100)"));
-
-        }
-
-        @Override
-        public Tuple2<Void, Group> call(Tuple2<ByteArray, Object[]> tuple) throws Exception {
-            if (initialized == false) {
-                synchronized (SparkCubingByLayer.class) {
-                    if (initialized == false) {
-                        init();
-                    }
-                }
-            }
-
-            long cuboid = decoder.decode(tuple._1.array());
-            List<String> values = decoder.getValues();
-            List<TblColRef> columns = decoder.getColumns();
-
-            Group group = factory.newGroup();
-
-            // for check
-            group.append("cuboidId", cuboid);
-
-            for (int i = 0; i < columns.size(); i++) {
-                TblColRef column = columns.get(i);
-                parseColValue(group, column, values.get(i));
-            }
-
-            ByteBuffer valueBuf = measureCodec.encode(tuple._2());
-            byte[] encodedBytes = new byte[valueBuf.position()];
-            System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position());
-
-            int[] valueLengths = measureCodec.getCodec().getPeekLength(ByteBuffer.wrap(encodedBytes));
-
-            int valueOffset = 0;
-            for (int i = 0; i < valueLengths.length; ++i) {
-                MeasureDesc measureDesc = measureDescs.get(i);
-                parseMeaValue(group, measureDesc, encodedBytes, valueOffset, valueLengths[i], tuple._2[i]);
-                valueOffset += valueLengths[i];
-            }
-
-            return new Tuple2<>(null, group);
-        }
-
-        private void parseColValue(final Group group, final TblColRef colRef, final String value) {
-            switch (colTypeMap.get(colRef)) {
-                case "int":
-                    group.append(colRef.getTableAlias() + "_" + colRef.getName(), Integer.valueOf(value));
-                    break;
-                case "long":
-                    group.append(colRef.getTableAlias() + "_" + colRef.getName(), Long.valueOf(value));
-                    break;
-                default:
-                    group.append(colRef.getTableAlias() + "_" + colRef.getName(), Binary.fromString(value));
-                    break;
-            }
-        }
-
-        private void parseMeaValue(final Group group, final MeasureDesc measureDesc, final byte[] value, final int offset, final int length, final Object d) {
-            switch (meaTypeMap.get(measureDesc)) {
-                case "long":
-                    group.append(measureDesc.getName(), BytesUtil.readLong(value, offset, length));
-                    break;
-                case "double":
-                    group.append(measureDesc.getName(), ByteBuffer.wrap(value, offset, length).getDouble());
-                    break;
-                case "decimal":
-                    BigDecimal decimal = (BigDecimal)d;
-                    decimal = decimal.setScale(4);
-                    group.append(measureDesc.getName(), Binary.fromConstantByteArray(decimal.unscaledValue().toByteArray()));
-                    break;
-                default:
-                    group.append(measureDesc.getName(), Binary.fromConstantByteArray(value, offset, length));
-                    break;
-            }
-        }
-    }
-
-    private MessageType cuboidToMessageType(Cuboid cuboid, IDimensionEncodingMap dimEncMap, CubeDesc cubeDesc, Map<TblColRef, String> colTypeMap, Map<MeasureDesc, String> meaTypeMap) {
-        Types.MessageTypeBuilder builder = Types.buildMessage();
-
-        List<TblColRef> colRefs = cuboid.getColumns();
-
-        builder.optional(PrimitiveType.PrimitiveTypeName.INT64).named("cuboidId");
-
-        for (TblColRef colRef : colRefs) {
-            DimensionEncoding dimEnc = dimEncMap.get(colRef);
-
-            if (dimEnc instanceof AbstractDateDimEnc) {
-                builder.optional(PrimitiveType.PrimitiveTypeName.INT64).named(getColName(colRef));
-                colTypeMap.put(colRef, "long");
-            } else if (dimEnc instanceof FixedLenDimEnc || dimEnc instanceof FixedLenHexDimEnc) {
-                org.apache.kylin.metadata.datatype.DataType colDataType = colRef.getType();
-                if (colDataType.isNumberFamily() || colDataType.isDateTimeFamily()){
-                    builder.optional(PrimitiveType.PrimitiveTypeName.INT64).named(getColName(colRef));
-                    colTypeMap.put(colRef, "long");
-                } else {
-                    // stringFamily && default
-                    builder.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(getColName(colRef));
-                    colTypeMap.put(colRef, "string");
-                }
-            } else {
-                builder.optional(PrimitiveType.PrimitiveTypeName.INT32).named(getColName(colRef));
-                colTypeMap.put(colRef, "int");
-            }
-        }
-
-        MeasureIngester[] aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures());
-
-        for (int i = 0; i < cubeDesc.getMeasures().size(); i++) {
-            MeasureDesc measureDesc = cubeDesc.getMeasures().get(i);
-            org.apache.kylin.metadata.datatype.DataType meaDataType = measureDesc.getFunction().getReturnDataType();
-            MeasureType measureType = measureDesc.getFunction().getMeasureType();
-
-            if (measureType instanceof BasicMeasureType) {
-                MeasureIngester measureIngester = aggrIngesters[i];
-                if (measureIngester instanceof LongIngester) {
-                    builder.required(PrimitiveType.PrimitiveTypeName.INT64).named(measureDesc.getName());
-                    meaTypeMap.put(measureDesc, "long");
-                } else if (measureIngester instanceof DoubleIngester) {
-                    builder.required(PrimitiveType.PrimitiveTypeName.DOUBLE).named(measureDesc.getName());
-                    meaTypeMap.put(measureDesc, "double");
-                } else if (measureIngester instanceof BigDecimalIngester) {
-                    builder.required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.DECIMAL).precision(meaDataType.getPrecision()).scale(meaDataType.getScale()).named(measureDesc.getName());
-                    meaTypeMap.put(measureDesc, "decimal");
-                } else {
-                    builder.required(PrimitiveType.PrimitiveTypeName.BINARY).named(measureDesc.getName());
-                    meaTypeMap.put(measureDesc, "binary");
-                }
-            } else {
-                builder.required(PrimitiveType.PrimitiveTypeName.BINARY).named(measureDesc.getName());
-                meaTypeMap.put(measureDesc, "binary");
-            }
-        }
-
-        return builder.named(String.valueOf(cuboid.getId()));
-    }
-
-    private String getColName(TblColRef colRef) {
-        return colRef.getTableAlias() + "_" + colRef.getName();
-    }
-}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
index 6900ca7..bfc0976 100755
--- a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
@@ -102,7 +102,7 @@ public class StorageCleanupJob extends AbstractApplication {
         this.defaultFs = defaultFs;
         this.hbaseFs = hbaseFs;
         this.executableManager = ExecutableManager.getInstance(config);
-        this.pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(config.getStorageSystemPathBuilderClz());
+        this.pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(config.getStoragePathBuilder());
     }
 
     public void setDelete(boolean delete) {
diff --git a/server-base/src/test/resources/test_meta/execute/d861b8b7-c773-47ab-bb1e-c8782ae8d930 b/server-base/src/test/resources/test_meta/execute/d861b8b7-c773-47ab-bb1e-c8782ae8d930
index ed6d6fa..208a0c9e 100644
--- a/server-base/src/test/resources/test_meta/execute/d861b8b7-c773-47ab-bb1e-c8782ae8d930
+++ b/server-base/src/test/resources/test_meta/execute/d861b8b7-c773-47ab-bb1e-c8782ae8d930
@@ -21,7 +21,7 @@
     "version" : "2.3.0.20500",
     "name" : "Redistribute Flat Hive Table",
     "tasks" : null,
-    "type" : "org.apache.kylin.source.hive.HiveMRInput$RedistributeFlatHiveTableStep",
+    "type" : "zorg.apache.kylin.source.hive.HiveMRInput$RedistributeFlatHiveTableStep",
     "params" : {
       "HiveInit" : "USE default;\n",
       "HiveRedistributeData" : "INSERT OVERWRITE TABLE kylin_intermediate_ss_a110ac52_9a91_49fe_944a_346d61e54115 SELECT * FROM kylin_intermediate_ss_a110ac52_9a91_49fe_944a_346d61e54115 DISTRIBUTE BY RAND();\n",
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 3c9434b..c2687a1 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -115,10 +115,10 @@ public class HiveMRInput extends HiveInputBase implements IMRInput {
 
     public static class BatchCubingInputSide implements IMRBatchCubingInputSide {
 
-        final protected IJoinedFlatTableDesc flatDesc;
-        final protected String flatTableDatabase;
-        final protected String hdfsWorkingDir;
-        final protected IStoragePathBuilder pathBuilder;
+        protected final IJoinedFlatTableDesc flatDesc;
+        protected final String flatTableDatabase;
+        protected final String hdfsWorkingDir;
+        protected final IStoragePathBuilder pathBuilder;
 
         List<String> hiveViewIntermediateTables = Lists.newArrayList();
 
@@ -127,7 +127,7 @@ public class HiveMRInput extends HiveInputBase implements IMRInput {
             this.flatDesc = flatDesc;
             this.flatTableDatabase = config.getHiveDatabaseForIntermediateTable();
             this.hdfsWorkingDir = config.getHdfsWorkingDirectory();
-            this.pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(config.getStorageSystemPathBuilderClz());
+            this.pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(config.getStoragePathBuilder());
         }
 
         @Override
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
index 031548c8..1364eed 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
@@ -73,7 +73,7 @@ public class HiveSparkInput extends HiveInputBase implements ISparkInput {
             this.flatDesc = flatDesc;
             this.flatTableDatabase = config.getHiveDatabaseForIntermediateTable();
             this.hdfsWorkingDir = config.getHdfsWorkingDirectory();
-            this.pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(config.getStorageSystemPathBuilderClz());
+            this.pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(config.getStoragePathBuilder());
         }
 
         @Override
diff --git a/source-hive/src/test/java/org/apache/kylin/source/hive/HiveMRInputTest.java b/source-hive/src/test/java/org/apache/kylin/source/hive/HiveMRInputTest.java
index 1bcd9ce..574c6f2 100644
--- a/source-hive/src/test/java/org/apache/kylin/source/hive/HiveMRInputTest.java
+++ b/source-hive/src/test/java/org/apache/kylin/source/hive/HiveMRInputTest.java
@@ -45,11 +45,11 @@ public class HiveMRInputTest {
         try (SetAndUnsetThreadLocalConfig autoUnset = KylinConfig.setAndUnsetThreadLocalConfig(kylinConfig)) {
             when(kylinConfig.getHiveTableDirCreateFirst()).thenReturn(true);
             when(kylinConfig.getHdfsWorkingDirectory()).thenReturn("/tmp/kylin/");
-            when(kylinConfig.getStorageSystemPathBuilderClz()).thenReturn("org.apache.kylin.storage.path.DefaultStoragePathBuilder");
+            when(kylinConfig.getStoragePathBuilder()).thenReturn("org.apache.kylin.storage.path.DefaultStoragePathBuilder");
             DefaultChainedExecutable defaultChainedExecutable = mock(DefaultChainedExecutable.class);
             defaultChainedExecutable.setId(RandomUtil.randomUUID().toString());
 
-            String jobWorkingDir = HiveInputBase.getJobWorkingDir(defaultChainedExecutable, KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory(), (IStoragePathBuilder) ClassUtil.newInstance(KylinConfig.getInstanceFromEnv().getStorageSystemPathBuilderClz()));
+            String jobWorkingDir = HiveInputBase.getJobWorkingDir(defaultChainedExecutable, KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory(), (IStoragePathBuilder) ClassUtil.newInstance(KylinConfig.getInstanceFromEnv().getStoragePathBuilder()));
             jobWorkDirPath = new Path(jobWorkingDir);
             Assert.assertTrue(fileSystem.exists(jobWorkDirPath));
         } finally {
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 68f87ff..798d099 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -82,7 +82,7 @@ public class KafkaMRInput extends KafkaInputBase implements IMRInput {
         public KafkaTableInputFormat(CubeSegment cubeSegment, JobEngineConfig conf) {
             this.cubeSegment = cubeSegment;
             this.conf = conf;
-            this.pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(conf.getConfig().getStorageSystemPathBuilderClz());
+            this.pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(conf.getConfig().getStoragePathBuilder());
         }
 
         @Override
@@ -134,7 +134,7 @@ public class KafkaMRInput extends KafkaInputBase implements IMRInput {
             this.seg = seg;
             this.cubeDesc = seg.getCubeDesc();
             this.cubeName = seg.getCubeInstance().getName();
-            this.pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(config.getStorageSystemPathBuilderClz());
+            this.pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(config.getStoragePathBuilder());
         }
 
         @Override
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java
index 9c14fd6..9a84612 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java
@@ -73,7 +73,7 @@ public class KafkaSparkInput extends KafkaInputBase implements ISparkInput {
             this.seg = seg;
             this.cubeDesc = seg.getCubeDesc();
             this.cubeName = seg.getCubeInstance().getName();
-            this.pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(this.config.getStorageSystemPathBuilderClz());
+            this.pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(this.config.getStoragePathBuilder());
         }
 
         @Override
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java
index f6d11bd..e8f7713 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java
@@ -61,7 +61,7 @@ public class HBaseLookupMRSteps {
     public HBaseLookupMRSteps(CubeInstance cube) {
         this.cube = cube;
         this.config = new JobEngineConfig(cube.getConfig());
-        this.pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(cube.getConfig().getStorageSystemPathBuilderClz());
+        this.pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(cube.getConfig().getStoragePathBuilder());
     }
 
     public void addMaterializeLookupTablesSteps(LookupMaterializeContext context) {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
index 05df80d..4c31c45 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
@@ -60,7 +60,7 @@ public class HDFSPathGarbageCollectionStep extends AbstractExecutable {
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
         try {
             config = new JobEngineConfig(context.getConfig());
-            pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(context.getConfig().getStorageSystemPathBuilderClz());
+            pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(context.getConfig().getStoragePathBuilder());
             List<String> toDeletePaths = getDeletePaths();
             dropHdfsPathOnCluster(toDeletePaths, HadoopUtil.getWorkingFileSystem());
 
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
index ce43456..619c32c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
@@ -186,8 +186,8 @@ public class CubeMigrationCLI {
     }
 
     private static void renameFoldersInHdfs(CubeInstance cube) {
-        IStoragePathBuilder srcPathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(srcConfig.getStorageSystemPathBuilderClz());
-        IStoragePathBuilder dstPathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(dstConfig.getStorageSystemPathBuilderClz());
+        IStoragePathBuilder srcPathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(srcConfig.getStoragePathBuilder());
+        IStoragePathBuilder dstPathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(dstConfig.getStoragePathBuilder());
         for (CubeSegment segment : cube.getSegments()) {
 
             String jobUuid = segment.getLastBuildJobID();
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
index fd6124b..496b2bb 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
@@ -187,7 +187,7 @@ public class StorageCleanupJob extends AbstractApplication {
     private void cleanUnusedHdfsFiles(Configuration conf) throws IOException {
         JobEngineConfig engineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
         CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-        IStoragePathBuilder pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(engineConfig.getConfig().getStorageSystemPathBuilderClz());
+        IStoragePathBuilder pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(engineConfig.getConfig().getStoragePathBuilder());
 
         FileSystem fs = HadoopUtil.getWorkingFileSystem(conf);
         List<String> allHdfsPathsNeedToBeDeleted = new ArrayList<String>();
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/ParquetStorage.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/ParquetStorage.java
index e05b433..e878ec4 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/ParquetStorage.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/ParquetStorage.java
@@ -47,7 +47,7 @@ public class ParquetStorage implements IStorage {
         } else if (engineInterface == IMROutput2.class) {
             return (I) new ParquetMROutput();
         } else{
-            throw new RuntimeException("Cannot adapt to " + engineInterface);
+            throw new UnsupportedOperationException("Cannot adapt to " + engineInterface);
         }
     }
 }
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
index a3ce76f..2fb5733 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
@@ -20,10 +20,6 @@ package org.apache.kylin.storage.parquet.cube;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase;
 
 public class CubeStorageQuery extends GTCubeStorageQueryBase {
@@ -33,11 +29,6 @@ public class CubeStorageQuery extends GTCubeStorageQueryBase {
     }
 
     @Override
-    public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
-        return super.search(context, sqlDigest, returnTupleInfo);
-    }
-
-    @Override
     protected String getGTStorage() {
         return KylinConfig.getInstanceFromEnv().getSparkCubeGTStorage();
     }
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetPayload.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetPayload.java
index 3129b8e..2f1aa17 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetPayload.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetPayload.java
@@ -119,7 +119,7 @@ public class ParquetPayload {
         return storageType;
     }
 
-    static public class ParquetPayloadBuilder {
+    public static class ParquetPayloadBuilder {
         private byte[] gtScanRequest;
         private String gtScanRequestId;
         private String kylinProperties;
@@ -136,9 +136,6 @@ public class ParquetPayload {
         private long startTime;
         private int storageType;
 
-        public ParquetPayloadBuilder() {
-        }
-
         public ParquetPayloadBuilder setGtScanRequest(byte[] gtScanRequest) {
             this.gtScanRequest = gtScanRequest;
             return this;
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetTask.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetTask.java
index 611ee44..4b08c38 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetTask.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetTask.java
@@ -31,7 +31,12 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
 import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.measure.extendedcolumn.ExtendedColumnMeasureType;
+import org.apache.kylin.measure.percentile.PercentileMeasureType;
+import org.apache.kylin.measure.raw.RawMeasureType;
+import org.apache.kylin.measure.topn.TopNMeasureType;
 import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.spark.api.java.JavaRDD;
@@ -94,7 +99,7 @@ public class ParquetTask implements Serializable {
             String dataFolderName = request.getDataFolderName();
 
             String baseFolder = dataFolderName.substring(0, dataFolderName.lastIndexOf('/'));
-            String cuboidId = dataFolderName.substring(dataFolderName.lastIndexOf("/") + 1);
+            String cuboidId = dataFolderName.substring(dataFolderName.lastIndexOf('/') + 1);
             String prefix = "cuboid_" + cuboidId + "_";
 
             CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(request.getRealizationId());
@@ -117,11 +122,11 @@ public class ParquetTask implements Serializable {
                 pathBuilder.append(p.toString()).append(";");
             }
 
-            logger.info("Columnar path is " + pathBuilder.toString());
-            logger.info("Required Measures: " + StringUtils.join(request.getParquetColumns(), ","));
-            logger.info("Max GT length: " + request.getMaxRecordLength());
+            logger.info("Columnar path is {}", pathBuilder);
+            logger.info("Required Measures: {}", StringUtils.join(request.getParquetColumns(), ","));
+            logger.info("Max GT length: {}", request.getMaxRecordLength());
         } catch (IOException e) {
-            throw new RuntimeException(e);
+            throw new IllegalStateException(e);
         }
     }
 
@@ -138,14 +143,14 @@ public class ParquetTask implements Serializable {
             map.clear();
 
         } catch (IllegalAccessException | NoSuchFieldException e) {
-            throw new RuntimeException(e);
+            throw new IllegalStateException(e);
         }
     }
 
     public Iterator<Object[]> executeTask() {
-        logger.info("Start to visit cube data with Spark SQL <<<<<<");
+        logger.debug("Start to visit cube data with Spark <<<<<<");
 
-        SQLContext sqlContext = new SQLContext(SparderEnv.getSparkSession().sparkContext());
+        SQLContext sqlContext = SparderEnv.getSparkSession().sqlContext();
 
         Dataset<Row> dataset = sqlContext.read().parquet(parquetPaths);
         ImmutableBitSet dimensions = scanRequest.getDimensions();
@@ -175,21 +180,17 @@ public class ParquetTask implements Serializable {
         // sort
         dataset = dataset.sort(getSortColumn(groupBy, mapping));
 
-        JavaRDD<Row> rowRDD = dataset.javaRDD();
-
-        JavaRDD<Object[]> objRDD = rowRDD.map(new Function<Row, Object[]>() {
-            @Override
-            public Object[] call(Row row) throws Exception {
-                Object[] objects = new Object[row.length()];
-                for (int i = 0; i < row.length(); i++) {
-                    objects[i] = row.get(i);
-                }
-                return objects;
+        JavaRDD<Object[]> objRDD = dataset.javaRDD().map((Function<Row, Object[]>) row -> {
+            Object[] objects = new Object[row.length()];
+            for (int i = 0; i < row.length(); i++) {
+                objects[i] = row.get(i);
             }
+            return objects;
         });
 
-        logger.info("partitions: {}", objRDD.getNumPartitions());
+        logger.debug("partitions: {}", objRDD.getNumPartitions());
 
+        // TODO: optimize the way to collect data.
         List<Object[]> result = objRDD.collect();
         return result.iterator();
     }
@@ -216,23 +217,23 @@ public class ParquetTask implements Serializable {
     private Column getAggColumn(String metName, String func, DataType dataType) {
         Column column;
         switch (func) {
-            case "SUM":
+            case FunctionDesc.FUNC_SUM:
                 column = sum(metName);
                 break;
-            case "MIN":
+            case FunctionDesc.FUNC_MIN:
                 column = min(metName);
                 break;
-            case "MAX":
+            case FunctionDesc.FUNC_MAX:
                 column = max(metName);
                 break;
-            case "COUNT":
+            case FunctionDesc.FUNC_COUNT:
                 column = sum(metName);
                 break;
-            case "TOP_N":
-            case "COUNT_DISTINCT":
-            case "EXTENDED_COLUMN":
-            case "PERCENTILE_APPROX":
-            case "RAW":
+            case TopNMeasureType.FUNC_TOP_N:
+            case FunctionDesc.FUNC_COUNT_DISTINCT:
+            case ExtendedColumnMeasureType.FUNC_EXTENDED_COLUMN:
+            case PercentileMeasureType.FUNC_PERCENTILE_APPROX:
+            case RawMeasureType.FUNC_RAW:
                 String udf = UdfManager.register(dataType, func);
                 column = callUDF(udf, col(metName));
                 break;
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/SparkSubmitter.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/SparkSubmitter.java
index 1c8c246..604cb6d 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/SparkSubmitter.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/SparkSubmitter.java
@@ -21,7 +21,6 @@ package org.apache.kylin.storage.parquet.spark;
 import org.apache.kylin.ext.ClassLoaderUtils;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.IGTScanner;
-import org.apache.kylin.storage.parquet.spark.gtscanner.ParquetRecordGTScanner;
 import org.apache.kylin.storage.parquet.spark.gtscanner.ParquetRecordGTScanner4Cube;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,13 +29,9 @@ public class SparkSubmitter {
     public static final Logger logger = LoggerFactory.getLogger(SparkSubmitter.class);
 
     public static IGTScanner submitParquetTask(GTScanRequest scanRequest, ParquetPayload payload) {
-
         Thread.currentThread().setContextClassLoader(ClassLoaderUtils.getSparkClassLoader());
         ParquetTask parquetTask = new ParquetTask(payload);
-
-        ParquetRecordGTScanner scanner = new ParquetRecordGTScanner4Cube(scanRequest.getInfo(), parquetTask.executeTask(), scanRequest,
-                payload.getMaxScanBytes());
-
-        return scanner;
+        return new ParquetRecordGTScanner4Cube(scanRequest.getInfo(),
+                parquetTask.executeTask(), scanRequest, payload.getMaxScanBytes());
     }
 }
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/gtscanner/ParquetRecordGTScanner.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/gtscanner/ParquetRecordGTScanner.java
index 322aec1..27f005a 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/gtscanner/ParquetRecordGTScanner.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/gtscanner/ParquetRecordGTScanner.java
@@ -19,18 +19,30 @@
 package org.apache.kylin.storage.parquet.spark.gtscanner;
 
 import com.google.common.collect.Iterators;
+import com.google.common.collect.Maps;
 import org.apache.kylin.common.exceptions.KylinTimeoutException;
 import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
-import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.dimension.DictionaryDimEnc;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTUtil;
 import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.measure.bitmap.BitmapSerializer;
+import org.apache.kylin.measure.dim.DimCountDistincSerializer;
+import org.apache.kylin.measure.extendedcolumn.ExtendedColumnSerializer;
+import org.apache.kylin.measure.hllc.HLLCSerializer;
+import org.apache.kylin.measure.percentile.PercentileSerializer;
+import org.apache.kylin.measure.raw.RawSerializer;
+import org.apache.kylin.measure.topn.TopNCounterSerializer;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Iterator;
+import java.util.Map;
 
 /**
  * this class tracks resource 
@@ -43,20 +55,16 @@ public abstract class ParquetRecordGTScanner implements IGTScanner {
     private ImmutableBitSet columns;
 
     private long maxScannedBytes;
-
     private long scannedRows;
     private long scannedBytes;
 
-    private ImmutableBitSet[] columnBlocks;
-
     public ParquetRecordGTScanner(GTInfo info, Iterator<Object[]> iterator, GTScanRequest scanRequest,
-                                  long maxScannedBytes) {
+            long maxScannedBytes) {
         this.iterator = iterator;
         this.info = info;
         this.gtrecord = new GTRecord(info);
         this.columns = scanRequest.getColumns();
         this.maxScannedBytes = maxScannedBytes;
-        this.columnBlocks = getParquetCoveredColumnBlocks(scanRequest);
     }
 
     @Override
@@ -79,12 +87,40 @@ public abstract class ParquetRecordGTScanner implements IGTScanner {
     @Override
     public Iterator<GTRecord> iterator() {
         return Iterators.transform(iterator, new com.google.common.base.Function<Object[], GTRecord>() {
+            private int maxColumnLength = -1;
+            private ByteBuffer byteBuf = null;
+            private final Map<Integer, Integer> dictCols = Maps.newHashMap();
+            private final Map<Integer, Integer> binaryCols = Maps.newHashMap();
+            private final Map<Integer, Integer> otherCols = Maps.newHashMap();
             @Nullable
             @Override
+            @SuppressWarnings("checkstyle:BooleanExpressionComplexity")
             public GTRecord apply(@Nullable Object[] input) {
-                gtrecord.setValuesParquet(ParquetRecordGTScanner.this.columns, new ByteArray(info.getMaxColumnLength(ParquetRecordGTScanner.this.columns)), input);
+                if (maxColumnLength <= 0) {
+                    maxColumnLength = info.getMaxColumnLength(ParquetRecordGTScanner.this.columns);
+                    for (int i = 0; i < ParquetRecordGTScanner.this.columns.trueBitCount(); i++) {
+                        int c = ParquetRecordGTScanner.this.columns.trueBitAt(i);
+                        DataTypeSerializer serializer = info.getCodeSystem().getSerializer(c);
+                        if (serializer instanceof DictionaryDimEnc.DictionarySerializer) {
+                            dictCols.put(i, c);
+                        } else if (serializer instanceof TopNCounterSerializer || serializer instanceof HLLCSerializer
+                                || serializer instanceof BitmapSerializer
+                                || serializer instanceof ExtendedColumnSerializer
+                                || serializer instanceof PercentileSerializer
+                                || serializer instanceof DimCountDistincSerializer
+                                || serializer instanceof RawSerializer) {
+                            binaryCols.put(i, c);
+                        } else {
+                            otherCols.put(i, c);
+                        }
+                    }
+
+                    byteBuf = ByteBuffer.allocate(maxColumnLength);
+                }
+                byteBuf.clear();
+                GTUtil.setValuesParquet(gtrecord, byteBuf, dictCols, binaryCols, otherCols, input);
 
-                scannedBytes += info.getMaxColumnLength(ParquetRecordGTScanner.this.columns);
+                scannedBytes += maxColumnLength;
                 if ((++scannedRows % GTScanRequest.terminateCheckInterval == 1) && Thread.interrupted()) {
                     throw new KylinTimeoutException("Query timeout");
                 }
@@ -98,8 +134,6 @@ public abstract class ParquetRecordGTScanner implements IGTScanner {
         });
     }
 
-    abstract protected ImmutableBitSet getParquetCoveredColumns(GTScanRequest scanRequest);
 
-    abstract protected ImmutableBitSet[] getParquetCoveredColumnBlocks(GTScanRequest scanRequest);
 
 }
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/gtscanner/ParquetRecordGTScanner4Cube.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/gtscanner/ParquetRecordGTScanner4Cube.java
index 3bf670e..d97531f 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/gtscanner/ParquetRecordGTScanner4Cube.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/gtscanner/ParquetRecordGTScanner4Cube.java
@@ -18,47 +18,15 @@
 
 package org.apache.kylin.storage.parquet.spark.gtscanner;
 
-import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTScanRequest;
 
-import java.util.BitSet;
 import java.util.Iterator;
 
 public class ParquetRecordGTScanner4Cube extends ParquetRecordGTScanner {
     public ParquetRecordGTScanner4Cube(GTInfo info, Iterator<Object[]> iterator, GTScanRequest scanRequest,
-                                       long maxScannedBytes) {
+            long maxScannedBytes) {
         super(info, iterator, scanRequest, maxScannedBytes);
     }
 
-    protected ImmutableBitSet getParquetCoveredColumns(GTScanRequest scanRequest) {
-        BitSet bs = new BitSet();
-
-        ImmutableBitSet dimensions = scanRequest.getInfo().getPrimaryKey();
-        for (int i = 0; i < dimensions.trueBitCount(); ++i) {
-            bs.set(dimensions.trueBitAt(i));
-        }
-
-        ImmutableBitSet queriedColumns = scanRequest.getColumns();
-        for (int i = 0; i < queriedColumns.trueBitCount(); ++i) {
-            bs.set(queriedColumns.trueBitAt(i));
-        }
-        return new ImmutableBitSet(bs);
-    }
-
-    protected ImmutableBitSet[] getParquetCoveredColumnBlocks(GTScanRequest scanRequest) {
-        
-        ImmutableBitSet selectedColBlocksBitSet = scanRequest.getSelectedColBlocks();
-        
-        ImmutableBitSet[] selectedColBlocks = new ImmutableBitSet[selectedColBlocksBitSet.trueBitCount()];
-        
-        for(int i = 0; i < selectedColBlocksBitSet.trueBitCount(); i++) {
-            
-            selectedColBlocks[i] = scanRequest.getInfo().getColumnBlock(selectedColBlocksBitSet.trueBitAt(i));
-            
-        }
-
-        return selectedColBlocks;
-    }
-
 }
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ConvertToParquetReducer.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ConvertToParquetReducer.java
index c778ee0..a0c1a2d 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ConvertToParquetReducer.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ConvertToParquetReducer.java
@@ -43,7 +43,6 @@ import java.io.IOException;
 import java.util.Map;
 
 /**
- * Created by Yichen on 11/14/18.
  */
 public class ConvertToParquetReducer extends KylinReducer<Text, Text, NullWritable, Group> {
     private ParquetConvertor convertor;
@@ -51,7 +50,7 @@ public class ConvertToParquetReducer extends KylinReducer<Text, Text, NullWritab
     private CubeSegment cubeSegment;
 
     @Override
-    protected void doSetup(Context context) throws IOException, InterruptedException {
+    protected void doSetup(Context context) throws IOException {
         Configuration conf = context.getConfiguration();
         super.bindCurrentConfiguration(conf);
         mos = new MultipleOutputs(context);
@@ -68,9 +67,9 @@ public class ConvertToParquetReducer extends KylinReducer<Text, Text, NullWritab
         SerializableConfiguration sConf = new SerializableConfiguration(conf);
 
         Map<TblColRef, String> colTypeMap = Maps.newHashMap();
-        Map<MeasureDesc, String> meaTypeMap = Maps.newHashMap();
-        ParquetConvertor.generateTypeMap(baseCuboid, dimEncMap, cube.getDescriptor(), colTypeMap, meaTypeMap);
-        convertor = new ParquetConvertor(cubeName, segmentId, kylinConfig, sConf, colTypeMap, meaTypeMap);
+        Map<MeasureDesc, String> measureTypeMap = Maps.newHashMap();
+        ParquetConvertor.generateTypeMap(baseCuboid, dimEncMap, cube.getDescriptor(), colTypeMap, measureTypeMap);
+        convertor = new ParquetConvertor(cubeName, segmentId, kylinConfig, sConf, colTypeMap, measureTypeMap);
     }
 
     @Override
@@ -80,14 +79,10 @@ public class ConvertToParquetReducer extends KylinReducer<Text, Text, NullWritab
         int partitionId = context.getTaskAttemptID().getTaskID().getId();
 
         for (Text value : values) {
-            try {
-                Group group = convertor.parseValueToGroup(key, value);
-                String output = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel("", layerNumber)
-                        + "/" + ParquetJobSteps.getCuboidOutputFileName(cuboidId, partitionId);
-                mos.write(MRCubeParquetJob.BY_LAYER_OUTPUT, null, group, output);
-            } catch (IOException e){
-                throw new IOException(e);
-            }
+            Group group = convertor.parseValueToGroup(key, value);
+            String output = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel("", layerNumber) + "/"
+                    + ParquetJobSteps.getCuboidOutputFileName(cuboidId, partitionId);
+            mos.write(MRCubeParquetJob.BY_LAYER_OUTPUT, null, group, output);
         }
     }
 
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/CuboidToPartitionMapping.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/CuboidToPartitionMapping.java
index 7fcf95d..97309f1 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/CuboidToPartitionMapping.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/CuboidToPartitionMapping.java
@@ -39,7 +39,6 @@ import java.util.Map;
 import java.util.Set;
 
 /**
- * Created by Yichen on 11/12/18.
  */
 public class CuboidToPartitionMapping implements Serializable {
     private static final Logger logger = LoggerFactory.getLogger(CuboidToPartitionMapping.class);
@@ -58,31 +57,26 @@ public class CuboidToPartitionMapping implements Serializable {
 
     public CuboidToPartitionMapping(CubeSegment cubeSeg, KylinConfig kylinConfig) throws IOException {
         cuboidPartitions = Maps.newHashMap();
-
         Set<Long> allCuboidIds = cubeSeg.getCuboidScheduler().getAllCuboidIds();
-
-        CalculatePartitionId(cubeSeg, kylinConfig, allCuboidIds);
+        calculatePartitionId(cubeSeg, kylinConfig, allCuboidIds);
     }
 
     public CuboidToPartitionMapping(CubeSegment cubeSeg, KylinConfig kylinConfig, int level) throws IOException {
         cuboidPartitions = Maps.newHashMap();
-
         List<Long> layeredCuboids = cubeSeg.getCuboidScheduler().getCuboidsByLayer().get(level);
-
-        CalculatePartitionId(cubeSeg, kylinConfig, layeredCuboids);
+        calculatePartitionId(cubeSeg, kylinConfig, layeredCuboids);
     }
 
-    private void CalculatePartitionId(CubeSegment cubeSeg, KylinConfig kylinConfig, Collection<Long> cuboidIds) throws IOException {
+    private void calculatePartitionId(CubeSegment cubeSeg, KylinConfig kylinConfig, Collection<Long> cuboidIds)
+            throws IOException {
         int position = 0;
         CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSeg, kylinConfig);
         for (Long cuboidId : cuboidIds) {
             int partition = estimateCuboidPartitionNum(cuboidId, cubeStatsReader, kylinConfig);
             List<Integer> positions = Lists.newArrayListWithCapacity(partition);
-
             for (int i = position; i < position + partition; i++) {
                 positions.add(i);
             }
-
             cuboidPartitions.put(cuboidId, positions);
             position = position + partition;
         }
@@ -95,7 +89,9 @@ public class CuboidToPartitionMapping implements Serializable {
     }
 
     public static CuboidToPartitionMapping deserialize(String jsonMapping) throws IOException {
-        Map<Long, List<Integer>> cuboidPartitions = JsonUtil.readValue(jsonMapping, new TypeReference<Map<Long, List<Integer>>>() {});
+        Map<Long, List<Integer>> cuboidPartitions = JsonUtil.readValue(jsonMapping,
+                new TypeReference<Map<Long, List<Integer>>>() {
+                });
         return new CuboidToPartitionMapping(cuboidPartitions);
     }
 
@@ -137,9 +133,7 @@ public class CuboidToPartitionMapping implements Serializable {
     public String getPartitionFilePrefix(int partition) {
         long cuboid = getCuboidIdByPartition(partition);
         int partNum = partition % getPartitionNumForCuboidId(cuboid);
-        String prefix = ParquetJobSteps.getCuboidOutputFileName(cuboid, partNum);
-
-        return prefix;
+        return ParquetJobSteps.getCuboidOutputFileName(cuboid, partNum);
     }
 
     private int estimateCuboidPartitionNum(long cuboidId, CubeStatsReader cubeStatsReader, KylinConfig kylinConfig) {
@@ -157,7 +151,8 @@ public class CuboidToPartitionMapping implements Serializable {
     public String toString() {
         StringBuilder sb = new StringBuilder();
         for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
-            sb.append("cuboidId:").append(entry.getKey()).append(" [").append(StringUtils.join(entry.getValue(), ",")).append("]\n");
+            sb.append("cuboidId:").append(entry.getKey()).append(" [").append(StringUtils.join(entry.getValue(), ","))
+                    .append("]\n");
         }
 
         return sb.toString();
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/MRCubeParquetJob.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/MRCubeParquetJob.java
index 7113a7a..a1d74fe 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/MRCubeParquetJob.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/MRCubeParquetJob.java
@@ -48,13 +48,12 @@ import java.io.IOException;
 
 
 /**
- * Created by Yichen on 11/9/18.
  */
 public class MRCubeParquetJob extends AbstractHadoopJob {
 
     protected static final Logger logger = LoggerFactory.getLogger(MRCubeParquetJob.class);
 
-    final static String BY_LAYER_OUTPUT = "ByLayer";
+    public static final String BY_LAYER_OUTPUT = "ByLayer";
     private Options options;
 
     public MRCubeParquetJob(){
@@ -73,7 +72,7 @@ public class MRCubeParquetJob extends AbstractHadoopJob {
         final Path inputPath = new Path(getOptionValue(OPTION_INPUT_PATH));
         final Path outputPath = new Path(getOptionValue(OPTION_OUTPUT_PATH));
         final String cubeName = getOptionValue(OPTION_CUBE_NAME);
-        logger.info("CubeName: ", cubeName);
+        logger.info("CubeName: {}", cubeName);
         final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
@@ -94,13 +93,10 @@ public class MRCubeParquetJob extends AbstractHadoopJob {
         Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeSegment.getCubeDesc());
 
         MessageType schema = ParquetConvertor.cuboidToMessageType(baseCuboid, dimEncMap, cubeSegment.getCubeDesc());
-        logger.info("Schema: {}", schema.toString());
+        logger.info("Schema: {}", schema);
 
         try {
-
             job.getConfiguration().set(BatchConstants.ARG_CUBOID_TO_PARTITION_MAPPING, jsonStr);
-
-
             addInputDirs(inputPath.toString(), job);
             FileOutputFormat.setOutputPath(job, outputPath);
 
@@ -145,7 +141,7 @@ public class MRCubeParquetJob extends AbstractHadoopJob {
             try {
                 mapping = CuboidToPartitionMapping.deserialize(conf.get(BatchConstants.ARG_CUBOID_TO_PARTITION_MAPPING));
             } catch (IOException e) {
-                throw new RuntimeException(e);
+                throw new IllegalArgumentException(e);
             }
         }
 
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetConvertor.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetConvertor.java
index 9b9578d..b3be22d 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetConvertor.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetConvertor.java
@@ -64,7 +64,6 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Created by Yichen on 11/9/18.
  */
 public class ParquetConvertor {
     private static final Logger logger = LoggerFactory.getLogger(ParquetConvertor.class);
@@ -121,7 +120,7 @@ public class ParquetConvertor {
         int valueOffset = 0;
         for (int i = 0; i < valueLengths.length; ++i) {
             MeasureDesc measureDesc = measureDescs.get(i);
-            parseMeaValue(group, measureDesc, rawValue.getBytes(), valueOffset, valueLengths[i]);
+            parseMeasureValue(group, measureDesc, rawValue.getBytes(), valueOffset, valueLengths[i]);
             valueOffset += valueLengths[i];
         }
 
@@ -146,7 +145,7 @@ public class ParquetConvertor {
         }
     }
 
-    private void parseMeaValue(final Group group, final MeasureDesc measureDesc, final byte[] value, final int offset, final int length) throws IOException {
+    private void parseMeasureValue(final Group group, final MeasureDesc measureDesc, final byte[] value, final int offset, final int length) {
         if (value==null) {
             logger.error("value is null");
             return;
@@ -161,10 +160,10 @@ public class ParquetConvertor {
             case DATATYPE_DECIMAL:
                 BigDecimal decimal = serializer.deserialize(ByteBuffer.wrap(value, offset, length));
                 decimal = decimal.setScale(4);
-                group.append(measureDesc.getName(), Binary.fromByteArray(decimal.unscaledValue().toByteArray()));
+                group.append(measureDesc.getName(), Binary.fromReusedByteArray(decimal.unscaledValue().toByteArray()));
                 break;
             default:
-                group.append(measureDesc.getName(), Binary.fromByteArray(value, offset, length));
+                group.append(measureDesc.getName(), Binary.fromReusedByteArray(value, offset, length));
                 break;
         }
     }
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetJobSteps.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetJobSteps.java
index 625737a..f097a6a 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetJobSteps.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetJobSteps.java
@@ -22,7 +22,6 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.job.execution.AbstractExecutable;
 
-
 /**
  * Common steps for building cube into Parquet
  */
@@ -33,8 +32,7 @@ public abstract class ParquetJobSteps extends JobBuilderSupport {
     }
 
     public static String getCuboidOutputFileName(long cuboid, int partNum) {
-        String fileName = "cuboid_" + cuboid + "_part" + partNum;
-        return fileName;
+        return "cuboid_" + cuboid + "_part" + partNum;
     }
-    abstract public AbstractExecutable convertToParquetStep(String jobId);
+    public abstract AbstractExecutable convertToParquetStep(String jobId);
 }
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMROutput.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMROutput.java
index f54a7a0..00f8e36 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMROutput.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMROutput.java
@@ -32,20 +32,14 @@ import org.apache.kylin.engine.mr.steps.HiveToBaseCuboidMapper;
 import org.apache.kylin.engine.mr.steps.InMemCuboidMapper;
 import org.apache.kylin.engine.mr.steps.NDCuboidMapper;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
 
-
 /**
- * Created by Yichen on 10/16/18.
  */
 public class ParquetMROutput implements IMROutput2 {
 
-    private static final Logger logger = LoggerFactory.getLogger(ParquetMROutput.class);
-
     @Override
     public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(CubeSegment seg) {
 
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMRSteps.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMRSteps.java
index e1d0b0f..bfa5ce6 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMRSteps.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMRSteps.java
@@ -25,7 +25,6 @@ import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.execution.AbstractExecutable;
 
 /**
- * Created by Yichen on 11/8/18.
  */
 public class ParquetMRSteps extends ParquetJobSteps{
     public ParquetMRSteps(CubeSegment seg) {
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/SparkCubeParquet.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/SparkCubeParquet.java
index 4ad7cb3..c6ea2ac 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/SparkCubeParquet.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/SparkCubeParquet.java
@@ -63,8 +63,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
 
-
-public class SparkCubeParquet extends AbstractApplication implements Serializable{
+public class SparkCubeParquet extends AbstractApplication implements Serializable {
 
     protected static final Logger logger = LoggerFactory.getLogger(SparkCubeParquet.class);
 
@@ -78,12 +77,12 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
             .isRequired(true).withDescription("Paqruet output path").create(BatchConstants.ARG_OUTPUT);
     public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
             .isRequired(true).withDescription("Cuboid files PATH").create(BatchConstants.ARG_INPUT);
-    public static final Option OPTION_COUNTER_PATH = OptionBuilder.withArgName(BatchConstants.ARG_COUNTER_OUPUT).hasArg()
-            .isRequired(true).withDescription("Counter output path").create(BatchConstants.ARG_COUNTER_OUPUT);
+    public static final Option OPTION_COUNTER_PATH = OptionBuilder.withArgName(BatchConstants.ARG_COUNTER_OUPUT)
+            .hasArg().isRequired(true).withDescription("Counter output path").create(BatchConstants.ARG_COUNTER_OUPUT);
 
     private Options options;
 
-    public SparkCubeParquet(){
+    public SparkCubeParquet() {
         options = new Options();
         options.addOption(OPTION_INPUT_PATH);
         options.addOption(OPTION_CUBE_NAME);
@@ -107,17 +106,18 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
         final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
         final String counterPath = optionsHelper.getOptionValue(OPTION_COUNTER_PATH);
 
-        Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1"), Text.class, Group.class};
+        Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1"), Text.class,
+                Group.class };
 
-        SparkConf conf = new SparkConf().setAppName("Converting Parquet File for: " + cubeName + " segment " + segmentId);
+        SparkConf conf = new SparkConf()
+                .setAppName("Converting Parquet File for: " + cubeName + " segment " + segmentId);
         //serialization conf
         conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
         conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
         conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
 
-
         KylinSparkJobListener jobListener = new KylinSparkJobListener();
-        try (JavaSparkContext sc = new JavaSparkContext(conf)){
+        try (JavaSparkContext sc = new JavaSparkContext(conf)) {
             sc.sc().addSparkListener(jobListener);
 
             HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
@@ -147,10 +147,10 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
             ParquetConvertor.generateTypeMap(baseCuboid, dimEncMap, cubeSegment.getCubeDesc(), colTypeMap, meaTypeMap);
             GroupWriteSupport.setSchema(schema, job.getConfiguration());
 
-            GenerateGroupRDDFunction groupPairFunction = new GenerateGroupRDDFunction(cubeName, cubeSegment.getUuid(), metaUrl, new SerializableConfiguration(job.getConfiguration()), colTypeMap, meaTypeMap);
-
+            GenerateGroupRDDFunction groupPairFunction = new GenerateGroupRDDFunction(cubeName, cubeSegment.getUuid(),
+                    metaUrl, new SerializableConfiguration(job.getConfiguration()), colTypeMap, meaTypeMap);
 
-            logger.info("Schema: {}", schema.toString());
+            logger.info("Schema: {}", schema);
 
             // Read from cuboid and save to parquet
             for (int level = 0; level <= totalLevels; level++) {
@@ -162,7 +162,8 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
             logger.info("HDFS: Number of bytes written={}", jobListener.metrics.getBytesWritten());
 
             Map<String, String> counterMap = Maps.newHashMap();
-            counterMap.put(ExecutableConstants.HDFS_BYTES_WRITTEN, String.valueOf(jobListener.metrics.getBytesWritten()));
+            counterMap.put(ExecutableConstants.HDFS_BYTES_WRITTEN,
+                    String.valueOf(jobListener.metrics.getBytesWritten()));
 
             // save counter to hdfs
             HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, counterMap);
@@ -170,10 +171,12 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
 
     }
 
-    protected void saveToParquet(JavaPairRDD<Text, Text> rdd, GenerateGroupRDDFunction groupRDDFunction, CubeSegment cubeSeg, String parquetOutput, int level, Job job, KylinConfig kylinConfig) throws IOException {
-        final CuboidToPartitionMapping cuboidToPartitionMapping = new CuboidToPartitionMapping(cubeSeg, kylinConfig, level);
+    protected void saveToParquet(JavaPairRDD<Text, Text> rdd, GenerateGroupRDDFunction groupRDDFunction,
+            CubeSegment cubeSeg, String parquetOutput, int level, Job job, KylinConfig kylinConfig) throws IOException {
+        final CuboidToPartitionMapping cuboidToPartitionMapping = new CuboidToPartitionMapping(cubeSeg, kylinConfig,
+                level);
 
-        logger.info("CuboidToPartitionMapping: {}", cuboidToPartitionMapping.toString());
+        logger.info("CuboidToPartitionMapping: {}", cuboidToPartitionMapping);
 
         String output = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(parquetOutput, level);
 
@@ -182,7 +185,8 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
         CustomParquetOutputFormat.setWriteSupportClass(job, GroupWriteSupport.class);
         CustomParquetOutputFormat.setCuboidToPartitionMapping(job, cuboidToPartitionMapping);
 
-        JavaPairRDD<Void, Group> groupRDD = rdd.partitionBy(new CuboidPartitioner(cuboidToPartitionMapping)).mapToPair(groupRDDFunction);
+        JavaPairRDD<Void, Group> groupRDD = rdd.partitionBy(new CuboidPartitioner(cuboidToPartitionMapping))
+                .mapToPair(groupRDDFunction);
 
         groupRDD.saveAsNewAPIHadoopDataset(job.getConfiguration());
     }
@@ -201,7 +205,7 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
 
         @Override
         public int getPartition(Object key) {
-            Text textKey = (Text)key;
+            Text textKey = (Text) key;
             return mapping.getPartitionByKey(textKey.getBytes());
         }
     }
@@ -210,16 +214,19 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
 
         @Override
         public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException {
-            FileOutputCommitter committer = (FileOutputCommitter)this.getOutputCommitter(context);
+            FileOutputCommitter committer = (FileOutputCommitter) this.getOutputCommitter(context);
             TaskID taskId = context.getTaskAttemptID().getTaskID();
             int partition = taskId.getId();
 
-            CuboidToPartitionMapping mapping = CuboidToPartitionMapping.deserialize(context.getConfiguration().get(BatchConstants.ARG_CUBOID_TO_PARTITION_MAPPING));
+            CuboidToPartitionMapping mapping = CuboidToPartitionMapping
+                    .deserialize(context.getConfiguration().get(BatchConstants.ARG_CUBOID_TO_PARTITION_MAPPING));
 
-            return new Path(committer.getWorkPath(), getUniqueFile(context, mapping.getPartitionFilePrefix(partition)+ "-" + getOutputName(context), extension));
+            return new Path(committer.getWorkPath(), getUniqueFile(context,
+                    mapping.getPartitionFilePrefix(partition) + "-" + getOutputName(context), extension));
         }
 
-        public static void setCuboidToPartitionMapping(Job job, CuboidToPartitionMapping cuboidToPartitionMapping) throws IOException {
+        public static void setCuboidToPartitionMapping(Job job, CuboidToPartitionMapping cuboidToPartitionMapping)
+                throws IOException {
             String jsonStr = cuboidToPartitionMapping.serialize();
 
             job.getConfiguration().set(BatchConstants.ARG_CUBOID_TO_PARTITION_MAPPING, jsonStr);
@@ -227,7 +234,7 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
     }
 
     static class GenerateGroupRDDFunction implements PairFunction<Tuple2<Text, Text>, Void, Group> {
-        private volatile transient boolean initialized = false;
+        private transient volatile boolean initialized = false;
         private String cubeName;
         private String segmentId;
         private String metaUrl;
@@ -237,7 +244,9 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
 
         private transient ParquetConvertor convertor;
 
-        public GenerateGroupRDDFunction(String cubeName, String segmentId, String metaurl, SerializableConfiguration conf, Map<TblColRef, String> colTypeMap, Map<MeasureDesc, String> meaTypeMap) {
+        public GenerateGroupRDDFunction(String cubeName, String segmentId, String metaurl,
+                SerializableConfiguration conf, Map<TblColRef, String> colTypeMap,
+                Map<MeasureDesc, String> meaTypeMap) {
             this.cubeName = cubeName;
             this.segmentId = segmentId;
             this.metaUrl = metaurl;
@@ -253,9 +262,9 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
 
         @Override
         public Tuple2<Void, Group> call(Tuple2<Text, Text> tuple) throws Exception {
-            if (initialized == false) {
+            if (!initialized) {
                 synchronized (SparkCubeParquet.class) {
-                    if (initialized == false) {
+                    if (!initialized) {
                         init();
                         initialized = true;
                     }
diff --git a/tomcat-ext/src/main/java/org/apache/kylin/ext/DebugTomcatClassLoader.java b/tomcat-ext/src/main/java/org/apache/kylin/ext/DebugTomcatClassLoader.java
index a0c212c..88012c8 100644
--- a/tomcat-ext/src/main/java/org/apache/kylin/ext/DebugTomcatClassLoader.java
+++ b/tomcat-ext/src/main/java/org/apache/kylin/ext/DebugTomcatClassLoader.java
@@ -113,7 +113,7 @@ public class DebugTomcatClassLoader extends ParallelWebappClassLoader {
             return sparkClassLoader.loadClass(name);
         }
         if (isParentCLPrecedent(name)) {
-            logger.debug("Skipping exempt class " + name + " - delegating directly to parent");
+            logger.trace("Skipping exempt class " + name + " - delegating directly to parent");
             return parent.loadClass(name);
         }
         return super.loadClass(name, resolve);
diff --git a/tomcat-ext/src/main/java/org/apache/kylin/ext/SparkClassLoader.java b/tomcat-ext/src/main/java/org/apache/kylin/ext/SparkClassLoader.java
index 8fe211e..1cdfbc6 100644
--- a/tomcat-ext/src/main/java/org/apache/kylin/ext/SparkClassLoader.java
+++ b/tomcat-ext/src/main/java/org/apache/kylin/ext/SparkClassLoader.java
@@ -159,11 +159,11 @@ public class SparkClassLoader extends URLClassLoader {
             // Check whether the class has already been loaded:
             Class<?> clasz = findLoadedClass(name);
             if (clasz != null) {
-                logger.debug("Class " + name + " already loaded");
+                logger.trace("Class " + name + " already loaded");
             } else {
                 try {
                     // Try to find this class using the URLs passed to this ClassLoader
-                    logger.debug("Finding class: " + name);
+                    logger.trace("Finding class: " + name);
                     clasz = super.findClass(name);
                     if (clasz == null) {
                         logger.debug("cannot find class" + name);
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
index 5c2a839..e01c3c7 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
@@ -230,8 +230,8 @@ public class CubeMigrationCLI extends AbstractApplication {
     }
 
     protected void renameFoldersInHdfs(CubeInstance cube) throws IOException {
-        IStoragePathBuilder srcPathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(srcConfig.getStorageSystemPathBuilderClz());
-        IStoragePathBuilder dstPathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(dstConfig.getStorageSystemPathBuilderClz());
+        IStoragePathBuilder srcPathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(srcConfig.getStoragePathBuilder());
+        IStoragePathBuilder dstPathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(dstConfig.getStoragePathBuilder());
 
         for (CubeSegment segment : cube.getSegments()) {
 
diff --git a/webapp/app/js/model/cubeConfig.js b/webapp/app/js/model/cubeConfig.js
index 42e4c34..34358d7 100644
--- a/webapp/app/js/model/cubeConfig.js
+++ b/webapp/app/js/model/cubeConfig.js
@@ -29,7 +29,7 @@ KylinApp.constant('cubeConfig', {
   ],
   storageTypes: [
     {name: 'HBase', value: 2},
-    {name: 'Parquet', value: 4}
+    {name: 'Parquet (alpha)', value: 4}
   ],
   joinTypes: [
     {name: 'Left', value: 'left'},
@@ -204,4 +204,4 @@ KylinApp.constant('cubeConfig', {
       'left': '-12px'
     }
   }
-});
\ No newline at end of file
+});