You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/12/10 03:41:25 UTC

[kylin] 01/06: KYLIN-3622,KYLIN-3624 Cube layout in Parquet; convert cube data to parquet in Spark

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch kylin-on-parquet
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 9954d36e39f5d0f3868a712bfc9e1bbde8a071ce
Author: chao long <wa...@qq.com>
AuthorDate: Sat Sep 29 14:05:30 2018 +0800

    KYLIN-3622,KYLIN-3624 Cube layout in Parquet; convert cube data to parquet in Spark
---
 .../src/main/resources/kylin-defaults.properties   |  26 ++
 .../org/apache/kylin/cube/kv/RowKeyColumnIO.java   |   3 +
 .../apache/kylin/measure/BufferedMeasureCodec.java |   4 +
 .../apache/kylin/metadata/model/IStorageAware.java |   1 +
 .../kylin/engine/mr/common/CubeStatsReader.java    |   5 +
 .../engine/spark/SparkBatchCubingJobBuilder2.java  |  21 +-
 .../kylin/engine/spark/SparkCubingByLayer.java     |  20 +-
 .../engine/spark/SparkCubingByLayerParquet.java    | 421 +++++++++++++++++++++
 pom.xml                                            |   6 +
 server-base/pom.xml                                |   4 +
 server/pom.xml                                     |   6 +
 .../kylin/storage/parquet/ParquetStorage.java      |  49 +++
 .../storage/parquet/cube/CubeStorageQuery.java     |  33 +-
 .../storage/parquet/steps/ParquetSparkOutput.java  |  72 ++++
 webapp/app/js/controllers/cubeSchema.js            |   9 +-
 webapp/app/js/model/cubeConfig.js                  |   4 +
 .../partials/cubeDesigner/advanced_settings.html   |  15 +-
 17 files changed, 684 insertions(+), 15 deletions(-)

diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index 6f2db9a..6238e44 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -61,6 +61,9 @@ kylin.restclient.connection.default-max-per-route=20
 #max connections of one rest-client
 kylin.restclient.connection.max-total=200
 
+## Parquet storage
+kylin.storage.provider.4=org.apache.kylin.storage.parquet.ParquetStorage
+
 ### PUBLIC CONFIG ###
 kylin.engine.default=2
 kylin.storage.default=2
@@ -351,3 +354,26 @@ kylin.engine.spark-conf-mergedict.spark.memory.fraction=0.2
 #kylin.source.jdbc.pass=
 #kylin.source.jdbc.sqoop-home=
 #kylin.source.jdbc.filed-delimiter=|
+
+kylin.storage.columnar.spark-env.HADOOP_CONF_DIR=${kylin_hadoop_conf_dir}
+## for any spark config entry in http://spark.apache.org/docs/latest/configuration.html#spark-properties, prefix it with "kylin.storage.columnar.spark-conf" and append here
+kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dzipkin.collector-hostname=${ZIPKIN_HOSTNAME} -Dzipkin.collector-port=${ZIPKIN_SCRIBE_PORT} -DinfluxDB.address=${INFLUXDB_ADDRESS} -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkap.spark.identifier=${KAP_SPARK_IDENTIFIER} -Dkap.hdfs.working.dir=${KAP_HDFS_WORKING_DIR} -Dkap.metadata.url=${KAP_METADATA_IDENTIFIER} -XX:MaxDirectMemorySize=896M -Dsparder.dict.cache.size=${SPARDER [...]
+kylin.storage.columnar.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current
+kylin.storage.columnar.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
+#kylin.storage.columnar.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializer
+kylin.storage.columnar.spark-conf.spark.driver.memory=512m
+kylin.storage.columnar.spark-conf.spark.executor.memory=512m
+kylin.storage.columnar.spark-conf.spark.yarn.executor.memoryOverhead=512
+kylin.storage.columnar.spark-conf.yarn.am.memory=512m
+kylin.storage.columnar.spark-conf.spark.executor.cores=1
+kylin.storage.columnar.spark-conf.spark.executor.instances=1
+kylin.storage.columnar.spark-conf.spark.task.maxFailures=1
+kylin.storage.columnar.spark-conf.spark.ui.port=4041
+kylin.storage.columnar.spark-conf.spark.locality.wait=0s
+kylin.storage.columnar.spark-conf.spark.sql.dialect=hiveql
+kylin.storage.columnar.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false
+kylin.storage.columnar.spark-conf.hive.execution.engine=MR
+kylin.storage.columnar.spark-conf.spark.scheduler.listenerbus.eventqueue.size=100000000
+kylin.storage.columnar.spark-conf.spark.master=yarn-client
+kylin.storage.columnar.spark-conf.spark.broadcast.compress=false
+
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
index 65911a0..b0efc91 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.cube.kv;
 
+import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.dimension.DictionaryDimEnc;
 import org.apache.kylin.dimension.DimensionEncoding;
@@ -57,6 +58,8 @@ public class RowKeyColumnIO implements java.io.Serializable {
 
     public String readColumnString(TblColRef col, byte[] bytes, int offset, int length) {
         DimensionEncoding dimEnc = dimEncMap.get(col);
+        if (dimEnc instanceof DictionaryDimEnc)
+            return String.valueOf(BytesUtil.readUnsigned(bytes, offset, length));
         return dimEnc.decode(bytes, offset, length);
     }
 
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureCodec.java b/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureCodec.java
index 44e5708..ec8069f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureCodec.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureCodec.java
@@ -40,6 +40,10 @@ public class BufferedMeasureCodec implements java.io.Serializable {
     private transient ByteBuffer buf;
     final private int[] measureSizes;
 
+    public MeasureCodec getCodec() {
+        return codec;
+    }
+
     public BufferedMeasureCodec(Collection<MeasureDesc> measureDescs) {
         this.codec = new MeasureCodec(measureDescs);
         this.measureSizes = new int[codec.getMeasuresCount()];
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java
index e552574..3e7de24 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java
@@ -23,6 +23,7 @@ public interface IStorageAware {
     public static final int ID_HBASE = 0;
     public static final int ID_HYBRID = 1;
     public static final int ID_SHARDED_HBASE = 2;
+    public static final int ID_PARQUET = 4;
 
     int getStorageType();
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index 102995e..f6579ef 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -307,6 +307,11 @@ public class CubeStatsReader {
         return ret;
     }
 
+    public double estimateCuboidSize(long cuboidId) {
+        Map<Long, Double> cuboidSizeMap = getCuboidSizeMap();
+        return cuboidSizeMap.get(cuboidId);
+    }
+
     public List<Long> getCuboidsByLayer(int level) {
         if (cuboidScheduler == null) {
             throw new UnsupportedOperationException("cuboid scheduler is null");
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 3f3c14d..41e4e49 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -35,6 +35,8 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.kylin.metadata.model.IStorageAware.ID_PARQUET;
+
 /**
  */
 public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
@@ -73,11 +75,16 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
         // add materialize lookup tables if needed
         LookupMaterializeContext lookupMaterializeContext = addMaterializeLookupTableSteps(result);
 
-        outputSide.addStepPhase2_BuildDictionary(result);
+        if (seg.getStorageType() != ID_PARQUET) {
+            outputSide.addStepPhase2_BuildDictionary(result);
+        }
 
         // Phase 3: Build Cube
         addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, only selected algorithm will execute
-        outputSide.addStepPhase3_BuildCube(result);
+
+        if (seg.getStorageType() != ID_PARQUET) {
+            outputSide.addStepPhase3_BuildCube(result);
+        }
 
         // Phase 4: Update Metadata & Cleanup
         result.addTask(createUpdateCubeInfoAfterBuildStep(jobId, lookupMaterializeContext));
@@ -116,7 +123,11 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
 
     protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
         final SparkExecutable sparkExecutable = new SparkExecutable();
-        sparkExecutable.setClassName(SparkCubingByLayer.class.getName());
+        if (seg.getStorageType() == ID_PARQUET) {
+            sparkExecutable.setClassName(SparkCubingByLayerParquet.class.getName());
+        } else {
+            sparkExecutable.setClassName(SparkCubingByLayer.class.getName());
+        }
         configureSparkJob(seg, sparkExecutable, jobId, cuboidRootPath);
         result.addTask(sparkExecutable);
     }
@@ -142,6 +153,10 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
         StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars());
         sparkExecutable.setJars(jars.toString());
         sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_SPARK_CUBE);
+
+        if (seg.getStorageType() == ID_PARQUET) {
+            sparkExecutable.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES, getCounterOuputPath(jobId));
+        }
     }
 
     public String getSegmentMetadataUrl(KylinConfig kylinConfig, String jobId) {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 2f453c5..ed35f54 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -23,7 +23,9 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
@@ -55,6 +57,7 @@ import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.engine.mr.common.NDCuboidBuilder;
 import org.apache.kylin.engine.mr.common.SerializableConfiguration;
 import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.MeasureAggregators;
 import org.apache.kylin.measure.MeasureIngester;
@@ -66,6 +69,7 @@ import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.SQLContext;
 import org.apache.spark.storage.StorageLevel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -91,6 +95,8 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
             .withDescription("Hive Intermediate Table").create("hiveTable");
     public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
             .isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
+    public static final Option OPTION_COUNTER_PATH = OptionBuilder.withArgName(BatchConstants.ARG_COUNTER_OUPUT).hasArg()
+            .isRequired(false).withDescription("counter output path").create(BatchConstants.ARG_COUNTER_OUPUT);
 
     private Options options;
 
@@ -102,6 +108,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         options.addOption(OPTION_SEGMENT_ID);
         options.addOption(OPTION_META_URL);
         options.addOption(OPTION_OUTPUT_PATH);
+        options.addOption(OPTION_COUNTER_PATH);
     }
 
     @Override
@@ -117,6 +124,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
         String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
         String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+        String counterPath = optionsHelper.getOptionValue(OPTION_COUNTER_PATH);
 
         Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1") };
 
@@ -128,6 +136,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
 
         KylinSparkJobListener jobListener = new KylinSparkJobListener();
         JavaSparkContext sc = new JavaSparkContext(conf);
+
         sc.sc().addSparkListener(jobListener);
         HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
         SparkUtil.modifySparkHadoopConfiguration(sc.sc()); // set dfs.replication=2 and enable compress
@@ -205,8 +214,17 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         }
         allRDDs[totalLevels].unpersist();
         logger.info("Finished on calculating all level cuboids.");
+
+        // only parquet storage work
         logger.info("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
-        //HadoopUtil.deleteHDFSMeta(metaUrl);
+
+        if (counterPath != null) {
+            Map<String, String> counterMap = Maps.newHashMap();
+            counterMap.put(ExecutableConstants.HDFS_BYTES_WRITTEN, String.valueOf(jobListener.metrics.getBytesWritten()));
+
+            // save counter to hdfs
+            HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, counterMap);
+        }
     }
 
     protected JavaPairRDD<ByteArray, Object[]> prepareOutput(JavaPairRDD<ByteArray, Object[]> rdd, KylinConfig config,
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerParquet.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerParquet.java
new file mode 100644
index 0000000..d8fccf9
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerParquet.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.kv.RowKeyDecoder;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dimension.AbstractDateDimEnc;
+import org.apache.kylin.dimension.DimensionEncoding;
+import org.apache.kylin.dimension.FixedLenDimEnc;
+import org.apache.kylin.dimension.FixedLenHexDimEnc;
+import org.apache.kylin.dimension.IDimensionEncodingMap;
+import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.basic.BasicMeasureType;
+import org.apache.kylin.measure.basic.BigDecimalIngester;
+import org.apache.kylin.measure.basic.DoubleIngester;
+import org.apache.kylin.measure.basic.LongIngester;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.GroupFactory;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFunction;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+public class SparkCubingByLayerParquet extends SparkCubingByLayer {
+    @Override
+    protected void saveToHDFS(JavaPairRDD<ByteArray, Object[]> rdd, String metaUrl, String cubeName, CubeSegment cubeSeg, String hdfsBaseLocation, int level, Job job, KylinConfig kylinConfig) throws Exception {
+        final IDimensionEncodingMap dimEncMap = cubeSeg.getDimensionEncodingMap();
+
+        Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeSeg.getCubeDesc());
+
+        final Map<TblColRef, String> colTypeMap = Maps.newHashMap();
+        final Map<MeasureDesc, String> meaTypeMap = Maps.newHashMap();
+
+        MessageType schema = cuboidToMessageType(baseCuboid, dimEncMap, cubeSeg.getCubeDesc(), colTypeMap, meaTypeMap);
+
+        logger.info("Schema: {}", schema.toString());
+
+        final CuboidToPartitionMapping cuboidToPartitionMapping = new CuboidToPartitionMapping(cubeSeg, kylinConfig, level);
+
+        logger.info("CuboidToPartitionMapping: {}", cuboidToPartitionMapping.toString());
+
+        JavaPairRDD<ByteArray, Object[]> repartitionedRDD = rdd.repartitionAndSortWithinPartitions(new CuboidPartitioner(cuboidToPartitionMapping));
+
+        String output = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
+
+        job.setOutputFormatClass(CustomParquetOutputFormat.class);
+        GroupWriteSupport.setSchema(schema, job.getConfiguration());
+        CustomParquetOutputFormat.setOutputPath(job, new Path(output));
+        CustomParquetOutputFormat.setWriteSupportClass(job, GroupWriteSupport.class);
+        CustomParquetOutputFormat.setCuboidToPartitionMapping(job, cuboidToPartitionMapping);
+
+        JavaPairRDD<Void, Group> groupRDD = repartitionedRDD.mapToPair(new GenerateGroupRDDFunction(cubeName, cubeSeg.getUuid(), metaUrl, new SerializableConfiguration(job.getConfiguration()), colTypeMap, meaTypeMap));
+
+        groupRDD.saveAsNewAPIHadoopDataset(job.getConfiguration());
+    }
+
+    static class CuboidPartitioner extends Partitioner {
+
+        private CuboidToPartitionMapping mapping;
+
+        public CuboidPartitioner(CuboidToPartitionMapping cuboidToPartitionMapping) {
+            this.mapping = cuboidToPartitionMapping;
+        }
+
+        @Override
+        public int numPartitions() {
+            return mapping.getNumPartitions();
+        }
+
+        @Override
+        public int getPartition(Object key) {
+            ByteArray byteArray = (ByteArray) key;
+            long cuboidId = Bytes.toLong(byteArray.array(), RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN);
+
+            return mapping.getRandomPartitionForCuboidId(cuboidId);
+        }
+    }
+
+    public static class CuboidToPartitionMapping implements Serializable {
+        private Map<Long, List<Integer>> cuboidPartitions;
+        private int partitionNum;
+
+        public CuboidToPartitionMapping(Map<Long, List<Integer>> cuboidPartitions) {
+            this.cuboidPartitions = cuboidPartitions;
+            int partitions = 0;
+            for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
+                partitions = partitions + entry.getValue().size();
+            }
+            this.partitionNum = partitions;
+        }
+
+        public CuboidToPartitionMapping(CubeSegment cubeSeg, KylinConfig kylinConfig, int level) throws IOException {
+            cuboidPartitions = Maps.newHashMap();
+
+            List<Long> layeredCuboids = cubeSeg.getCuboidScheduler().getCuboidsByLayer().get(level);
+            CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSeg, kylinConfig);
+
+            int position = 0;
+            for (Long cuboidId : layeredCuboids) {
+                int partition = estimateCuboidPartitionNum(cuboidId, cubeStatsReader, kylinConfig);
+                List<Integer> positions = Lists.newArrayListWithCapacity(partition);
+
+                for (int i = position; i < position + partition; i++) {
+                    positions.add(i);
+                }
+
+                cuboidPartitions.put(cuboidId, positions);
+                position = position + partition;
+            }
+
+            this.partitionNum = position;
+        }
+
+        public String serialize() throws JsonProcessingException {
+            return JsonUtil.writeValueAsString(cuboidPartitions);
+        }
+
+        public static CuboidToPartitionMapping deserialize(String jsonMapping) throws IOException {
+            Map<Long, List<Integer>> cuboidPartitions = JsonUtil.readValue(jsonMapping, new TypeReference<Map<Long, List<Integer>>>() {});
+            return new CuboidToPartitionMapping(cuboidPartitions);
+        }
+
+        public int getNumPartitions() {
+            return this.partitionNum;
+        }
+
+        public long getCuboidIdByPartition(int partition) {
+            for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
+                if (entry.getValue().contains(partition)) {
+                    return entry.getKey();
+                }
+            }
+
+            throw new IllegalArgumentException("No cuboidId for partition id: " + partition);
+        }
+
+        public int getRandomPartitionForCuboidId(long cuboidId) {
+            List<Integer> partitions = cuboidPartitions.get(cuboidId);
+            return partitions.get(new Random().nextInt(partitions.size()));
+        }
+
+        public int getPartitionNumForCuboidId(long cuboidId) {
+            return cuboidPartitions.get(cuboidId).size();
+        }
+
+        public String getPartitionFilePrefix(int partition) {
+            String prefix = "cuboid_";
+            long cuboid = getCuboidIdByPartition(partition);
+            int partNum = partition % getPartitionNumForCuboidId(cuboid);
+            prefix = prefix + cuboid + "_part" + partNum;
+
+            return prefix;
+        }
+
+        private int estimateCuboidPartitionNum(long cuboidId, CubeStatsReader cubeStatsReader, KylinConfig kylinConfig) {
+            double cuboidSize = cubeStatsReader.estimateCuboidSize(cuboidId);
+            float rddCut = kylinConfig.getSparkRDDPartitionCutMB();
+            int partition = (int) (cuboidSize / rddCut);
+            partition = Math.max(kylinConfig.getSparkMinPartition(), partition);
+            partition = Math.min(kylinConfig.getSparkMaxPartition(), partition);
+            return partition;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
+                sb.append("cuboidId:").append(entry.getKey()).append(" [").append(StringUtils.join(entry.getValue(), ",")).append("]\n");
+            }
+
+            return sb.toString();
+        }
+    }
+
+    public static class CustomParquetOutputFormat extends ParquetOutputFormat {
+        public static final String CUBOID_TO_PARTITION_MAPPING = "cuboidToPartitionMapping";
+
+        @Override
+        public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException {
+            FileOutputCommitter committer = (FileOutputCommitter)this.getOutputCommitter(context);
+            TaskID taskId = context.getTaskAttemptID().getTaskID();
+            int partition = taskId.getId();
+
+            CuboidToPartitionMapping mapping = CuboidToPartitionMapping.deserialize(context.getConfiguration().get(CUBOID_TO_PARTITION_MAPPING));
+
+            return new Path(committer.getWorkPath(), getUniqueFile(context, mapping.getPartitionFilePrefix(partition)+ "-" + getOutputName(context), extension));
+        }
+
+        public static void setCuboidToPartitionMapping(Job job, CuboidToPartitionMapping cuboidToPartitionMapping) throws IOException {
+            String jsonStr = cuboidToPartitionMapping.serialize();
+
+            job.getConfiguration().set(CUBOID_TO_PARTITION_MAPPING, jsonStr);
+        }
+    }
+
+    static class GenerateGroupRDDFunction implements PairFunction<Tuple2<ByteArray, Object[]>, Void, Group> {
+        private volatile transient boolean initialized = false;
+        private String cubeName;
+        private String segmentId;
+        private String metaUrl;
+        private SerializableConfiguration conf;
+        private List<MeasureDesc> measureDescs;
+        private RowKeyDecoder decoder;
+        private Map<TblColRef, String> colTypeMap;
+        private Map<MeasureDesc, String> meaTypeMap;
+        private GroupFactory factory;
+        private BufferedMeasureCodec measureCodec;
+
+        public GenerateGroupRDDFunction(String cubeName, String segmentId, String metaurl, SerializableConfiguration conf, Map<TblColRef, String> colTypeMap, Map<MeasureDesc, String> meaTypeMap) {
+            this.cubeName = cubeName;
+            this.segmentId = segmentId;
+            this.metaUrl = metaurl;
+            this.conf = conf;
+            this.colTypeMap = colTypeMap;
+            this.meaTypeMap = meaTypeMap;
+        }
+
+        private void init() {
+            KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
+            CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
+            CubeDesc cubeDesc = cubeInstance.getDescriptor();
+            CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+            measureDescs = cubeDesc.getMeasures();
+            decoder = new RowKeyDecoder(cubeSegment);
+            factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(conf.get()));
+            measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures());
+        }
+
+        @Override
+        public Tuple2<Void, Group> call(Tuple2<ByteArray, Object[]> tuple) throws Exception {
+            if (initialized == false) {
+                synchronized (SparkCubingByLayer.class) {
+                    if (initialized == false) {
+                        init();
+                    }
+                }
+            }
+
+            long cuboid = decoder.decode(tuple._1.array());
+            List<String> values = decoder.getValues();
+            List<TblColRef> columns = decoder.getColumns();
+
+            Group group = factory.newGroup();
+
+            // for check
+            group.append("cuboidId", cuboid);
+
+            for (int i = 0; i < columns.size(); i++) {
+                TblColRef column = columns.get(i);
+                parseColValue(group, column, values.get(i));
+            }
+
+            ByteBuffer valueBuf = measureCodec.encode(tuple._2());
+            byte[] encodedBytes = new byte[valueBuf.position()];
+            System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position());
+
+            int[] valueLengths = measureCodec.getCodec().getPeekLength(ByteBuffer.wrap(encodedBytes));
+
+            int valueOffset = 0;
+            for (int i = 0; i < valueLengths.length; ++i) {
+                MeasureDesc measureDesc = measureDescs.get(i);
+                parseMeaValue(group, measureDesc, encodedBytes, valueOffset, valueLengths[i]);
+                valueOffset += valueLengths[i];
+            }
+
+            return new Tuple2<>(null, group);
+        }
+
+        private void parseColValue(final Group group, final TblColRef colRef, final String value) {
+            switch (colTypeMap.get(colRef)) {
+                case "int":
+                    group.append(colRef.getTableAlias() + "_" + colRef.getName(), Integer.valueOf(value));
+                    break;
+                case "long":
+                    group.append(colRef.getTableAlias() + "_" + colRef.getName(), Long.valueOf(value));
+                    break;
+                default:
+                    group.append(colRef.getTableAlias() + "_" + colRef.getName(), Binary.fromString(value));
+                    break;
+            }
+        }
+
+        private void parseMeaValue(final Group group, final MeasureDesc measureDesc, final byte[] value, final int offset, final int length) {
+            switch (meaTypeMap.get(measureDesc)) {
+                case "long":
+                    group.append(measureDesc.getName(), BytesUtil.readLong(value, offset, length));
+                    break;
+                case "double":
+                    group.append(measureDesc.getName(), ByteBuffer.wrap(value, offset, length).getDouble());
+                    break;
+                default:
+                    group.append(measureDesc.getName(), Binary.fromConstantByteArray(value, offset, length));
+                    break;
+            }
+        }
+    }
+
+    private MessageType cuboidToMessageType(Cuboid cuboid, IDimensionEncodingMap dimEncMap, CubeDesc cubeDesc, Map<TblColRef, String> colTypeMap, Map<MeasureDesc, String> meaTypeMap) {
+        Types.MessageTypeBuilder builder = Types.buildMessage();
+
+        List<TblColRef> colRefs = cuboid.getColumns();
+
+        builder.optional(PrimitiveType.PrimitiveTypeName.INT64).named("cuboidId");
+
+        for (TblColRef colRef : colRefs) {
+            DimensionEncoding dimEnc = dimEncMap.get(colRef);
+
+            if (dimEnc instanceof AbstractDateDimEnc) {
+                builder.optional(PrimitiveType.PrimitiveTypeName.INT64).named(getColName(colRef));
+                colTypeMap.put(colRef, "long");
+            } else if (dimEnc instanceof FixedLenDimEnc || dimEnc instanceof FixedLenHexDimEnc) {
+                org.apache.kylin.metadata.datatype.DataType colDataType = colRef.getType();
+                if (colDataType.isNumberFamily() || colDataType.isDateTimeFamily()){
+                    builder.optional(PrimitiveType.PrimitiveTypeName.INT64).named(getColName(colRef));
+                    colTypeMap.put(colRef, "long");
+                } else {
+                    // stringFamily && default
+                    builder.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(getColName(colRef));
+                    colTypeMap.put(colRef, "string");
+                }
+            } else {
+                builder.optional(PrimitiveType.PrimitiveTypeName.INT32).named(getColName(colRef));
+                colTypeMap.put(colRef, "int");
+            }
+        }
+
+        MeasureIngester[] aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures());
+
+        for (int i = 0; i < cubeDesc.getMeasures().size(); i++) {
+            MeasureDesc measureDesc = cubeDesc.getMeasures().get(i);
+            org.apache.kylin.metadata.datatype.DataType meaDataType = measureDesc.getFunction().getReturnDataType();
+            MeasureType measureType = measureDesc.getFunction().getMeasureType();
+
+            if (measureType instanceof BasicMeasureType) {
+                MeasureIngester measureIngester = aggrIngesters[i];
+                if (measureIngester instanceof LongIngester) {
+                    builder.required(PrimitiveType.PrimitiveTypeName.INT64).named(measureDesc.getName());
+                    meaTypeMap.put(measureDesc, "long");
+                } else if (measureIngester instanceof DoubleIngester) {
+                    builder.required(PrimitiveType.PrimitiveTypeName.DOUBLE).named(measureDesc.getName());
+                    meaTypeMap.put(measureDesc, "double");
+                } else if (measureIngester instanceof BigDecimalIngester) {
+                    builder.required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.DECIMAL).precision(meaDataType.getPrecision()).scale(meaDataType.getScale()).named(measureDesc.getName());
+                    meaTypeMap.put(measureDesc, "decimal");
+                } else {
+                    builder.required(PrimitiveType.PrimitiveTypeName.BINARY).named(measureDesc.getName());
+                    meaTypeMap.put(measureDesc, "binary");
+                }
+            } else {
+                builder.required(PrimitiveType.PrimitiveTypeName.BINARY).named(measureDesc.getName());
+                meaTypeMap.put(measureDesc, "binary");
+            }
+        }
+
+        return builder.named(String.valueOf(cuboid.getId()));
+    }
+
+    private String getColName(TblColRef colRef) {
+        return colRef.getTableAlias() + "_" + colRef.getName();
+    }
+}
diff --git a/pom.xml b/pom.xml
index 680144f..3585927 100644
--- a/pom.xml
+++ b/pom.xml
@@ -359,6 +359,12 @@
         <version>${project.version}</version>
         <type>test-jar</type>
       </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-storage-parquet</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+        </dependency>
       <dependency>
         <groupId>org.apache.kylin</groupId>
         <artifactId>kylin-server-base</artifactId>
diff --git a/server-base/pom.xml b/server-base/pom.xml
index 4cd3f76..29b193b 100644
--- a/server-base/pom.xml
+++ b/server-base/pom.xml
@@ -66,6 +66,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-storage-parquet</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-source-hive</artifactId>
         </dependency>
         <dependency>
diff --git a/server/pom.xml b/server/pom.xml
index b1365a7..a898eff 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -102,6 +102,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-storage-parquet</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-server-base</artifactId>
             <type>test-jar</type>
             <scope>test</scope>
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/ParquetStorage.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/ParquetStorage.java
new file mode 100644
index 0000000..cddce23
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/ParquetStorage.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.parquet;
+
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.engine.spark.ISparkOutput;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.storage.IStorage;
+import org.apache.kylin.storage.IStorageQuery;
+import org.apache.kylin.storage.parquet.cube.CubeStorageQuery;
+import org.apache.kylin.storage.parquet.steps.ParquetSparkOutput;
+
+public class ParquetStorage implements IStorage {
+    @Override
+    public IStorageQuery createQuery(IRealization realization) {
+        if (realization.getType() == RealizationType.CUBE) {
+            return new CubeStorageQuery((CubeInstance) realization);
+        } else {
+            throw new IllegalStateException(
+                    "Unsupported realization type for ParquetStorage: " + realization.getType());
+        }
+    }
+
+    @Override
+    public <I> I adaptToBuildEngine(Class<I> engineInterface) {
+        if (engineInterface == ISparkOutput.class) {
+            return (I) new ParquetSparkOutput();
+        } else {
+            throw new RuntimeException("Cannot adapt to " + engineInterface);
+        }
+    }
+}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
similarity index 50%
copy from core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java
copy to storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
index e552574..6a3ad59 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
@@ -6,23 +6,38 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
-package org.apache.kylin.metadata.model;
+package org.apache.kylin.storage.parquet.cube;
 
-public interface IStorageAware {
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.metadata.tuple.TupleInfo;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase;
 
-    public static final int ID_HBASE = 0;
-    public static final int ID_HYBRID = 1;
-    public static final int ID_SHARDED_HBASE = 2;
+public class CubeStorageQuery extends GTCubeStorageQueryBase {
 
-    int getStorageType();
+    public CubeStorageQuery(CubeInstance cube) {
+        super(cube);
+    }
+
+    @Override
+    public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
+        return super.search(context, sqlDigest, returnTupleInfo);
+    }
+
+    @Override
+    protected String getGTStorage() {
+        return null;
+    }
 }
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkOutput.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkOutput.java
new file mode 100644
index 0000000..6f82d8b
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkOutput.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.parquet.steps;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.spark.ISparkOutput;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+import java.util.List;
+
+public class ParquetSparkOutput implements ISparkOutput {
+    @Override
+    public ISparkBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg) {
+        return new ISparkBatchCubingOutputSide() {
+            @Override
+            public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) {
+
+            }
+
+            @Override
+            public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
+
+            }
+
+            @Override
+            public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+
+            }
+        };
+    }
+
+    @Override
+    public ISparkBatchMergeOutputSide getBatchMergeOutputSide(CubeSegment seg) {
+        return new ISparkBatchMergeOutputSide() {
+            @Override
+            public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+
+            }
+
+            @Override
+            public void addStepPhase2_BuildCube(CubeSegment set, List<CubeSegment> mergingSegments, DefaultChainedExecutable jobFlow) {
+
+            }
+
+            @Override
+            public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) {
+
+            }
+        };
+    }
+
+    @Override
+    public ISparkBatchOptimizeOutputSide getBatchOptimizeOutputSide(CubeSegment seg) {
+        return null;
+    }
+}
diff --git a/webapp/app/js/controllers/cubeSchema.js b/webapp/app/js/controllers/cubeSchema.js
index 5eed9b9..0ac8a96 100755
--- a/webapp/app/js/controllers/cubeSchema.js
+++ b/webapp/app/js/controllers/cubeSchema.js
@@ -18,7 +18,7 @@
 
 'use strict';
 
-KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserService,modelsManager, ProjectService, AuthenticationService,$filter,ModelService,MetaModel,CubeDescModel,CubeList,TableModel,ProjectModel,ModelDescService,SweetAlert,cubesManager,StreamingService,CubeService,VdmUtil) {
+KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserService,modelsManager, ProjectService, AuthenticationService,$filter,ModelService,MetaModel,CubeDescModel,CubeList,TableModel,ProjectModel,ModelDescService,SweetAlert,cubesManager,StreamingService,CubeService,VdmUtil, kylinConfig) {
     $scope.modelsManager = modelsManager;
     $scope.cubesManager = cubesManager;
     $scope.projects = [];
@@ -419,6 +419,13 @@ KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserServic
             }
         });
 
+        // set default value for null type
+        if ($scope.cubeMetaFrame.engine_type === null) {
+            $scope.cubeMetaFrame.engine_type = kylinConfig.getCubeEng();
+        }
+        if ($scope.cubeMetaFrame.storage_type === null) {
+            $scope.cubeMetaFrame.storage_type = kylinConfig.getStorageEng();
+        }
 
         var errorInfo = "";
         angular.forEach(errors,function(item){
diff --git a/webapp/app/js/model/cubeConfig.js b/webapp/app/js/model/cubeConfig.js
index a83d4c9..42e4c34 100644
--- a/webapp/app/js/model/cubeConfig.js
+++ b/webapp/app/js/model/cubeConfig.js
@@ -27,6 +27,10 @@ KylinApp.constant('cubeConfig', {
     {name:'MapReduce',value: 2},
     {name:'Spark',value: 4}
   ],
+  storageTypes: [
+    {name: 'HBase', value: 2},
+    {name: 'Parquet', value: 4}
+  ],
   joinTypes: [
     {name: 'Left', value: 'left'},
     {name: 'Inner', value: 'inner'}
diff --git a/webapp/app/partials/cubeDesigner/advanced_settings.html b/webapp/app/partials/cubeDesigner/advanced_settings.html
index 89229d0..a6674da 100755
--- a/webapp/app/partials/cubeDesigner/advanced_settings.html
+++ b/webapp/app/partials/cubeDesigner/advanced_settings.html
@@ -371,7 +371,7 @@
         <!--Cube Engine-->
         <div class="form-group large-popover" style="margin-bottom:30px;">
           <h3 style="margin-left:42px;margin-bottom:30px;">Cube Engine  <i kylinpopover placement="right" title="Cube Engine" template="CubeEngineTip.html" class="fa fa-info-circle"></i></h3>
-          <div class="row" style="margin-left:42px">
+          <div class="row" style="margin-left:42px;margin-bottom:20px;">
             <label class="control-label col-xs-12 col-sm-3 no-padding-right font-color-default"><b>Engine Type :</b></label>
             <div class="col-xs-12 col-sm-6">
               <select style="width: 100%" chosen
@@ -384,6 +384,19 @@
               <span ng-if="state.mode=='view'&&cubeMetaFrame.engine_type==4">Spark</span>
             </div>
           </div>
+          <div class="row" style="margin-left:42px">
+            <label class="control-label col-xs-12 col-sm-3 no-padding-right font-color-default"><b>Storage Type :</b></label>
+            <div class="col-xs-12 col-sm-6">
+              <select style="width: 100%" chosen
+                      ng-model="cubeMetaFrame.storage_type"
+                      ng-if="state.mode=='edit'"
+                      ng-options="st.value as st.name for st in cubeConfig.storageTypes">
+                <option value="">--Select Storage Type--</option>
+              </select>
+              <span ng-if="state.mode=='view'&&cubeMetaFrame.storage_type==2">HBase</span>
+              <span ng-if="state.mode=='view'&&cubeMetaFrame.storage_type==4">Parquet</span>
+            </div>
+          </div>
         </div>
         <div class="form-group large-popover">
           <h3 style="margin-left:42px">Advanced Dictionaries  <i kylinpopover placement="right" title="Advanced Dictionaries" template="AdvancedDictionariesTip.html" class="fa fa-info-circle"></i></h3>