You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/12/10 03:41:25 UTC
[kylin] 01/06: KYLIN-3622,KYLIN-3624 Cube layout in Parquet;
convert cube data to parquet in Spark
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch kylin-on-parquet
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 9954d36e39f5d0f3868a712bfc9e1bbde8a071ce
Author: chao long <wa...@qq.com>
AuthorDate: Sat Sep 29 14:05:30 2018 +0800
KYLIN-3622,KYLIN-3624 Cube layout in Parquet; convert cube data to parquet in Spark
---
.../src/main/resources/kylin-defaults.properties | 26 ++
.../org/apache/kylin/cube/kv/RowKeyColumnIO.java | 3 +
.../apache/kylin/measure/BufferedMeasureCodec.java | 4 +
.../apache/kylin/metadata/model/IStorageAware.java | 1 +
.../kylin/engine/mr/common/CubeStatsReader.java | 5 +
.../engine/spark/SparkBatchCubingJobBuilder2.java | 21 +-
.../kylin/engine/spark/SparkCubingByLayer.java | 20 +-
.../engine/spark/SparkCubingByLayerParquet.java | 421 +++++++++++++++++++++
pom.xml | 6 +
server-base/pom.xml | 4 +
server/pom.xml | 6 +
.../kylin/storage/parquet/ParquetStorage.java | 49 +++
.../storage/parquet/cube/CubeStorageQuery.java | 33 +-
.../storage/parquet/steps/ParquetSparkOutput.java | 72 ++++
webapp/app/js/controllers/cubeSchema.js | 9 +-
webapp/app/js/model/cubeConfig.js | 4 +
.../partials/cubeDesigner/advanced_settings.html | 15 +-
17 files changed, 684 insertions(+), 15 deletions(-)
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index 6f2db9a..6238e44 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -61,6 +61,9 @@ kylin.restclient.connection.default-max-per-route=20
#max connections of one rest-client
kylin.restclient.connection.max-total=200
+## Parquet storage
+kylin.storage.provider.4=org.apache.kylin.storage.parquet.ParquetStorage
+
### PUBLIC CONFIG ###
kylin.engine.default=2
kylin.storage.default=2
@@ -351,3 +354,26 @@ kylin.engine.spark-conf-mergedict.spark.memory.fraction=0.2
#kylin.source.jdbc.pass=
#kylin.source.jdbc.sqoop-home=
#kylin.source.jdbc.filed-delimiter=|
+
+kylin.storage.columnar.spark-env.HADOOP_CONF_DIR=${kylin_hadoop_conf_dir}
+## for any spark config entry in http://spark.apache.org/docs/latest/configuration.html#spark-properties, prefix it with "kylin.storage.columnar.spark-conf" and append here
+kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dzipkin.collector-hostname=${ZIPKIN_HOSTNAME} -Dzipkin.collector-port=${ZIPKIN_SCRIBE_PORT} -DinfluxDB.address=${INFLUXDB_ADDRESS} -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkap.spark.identifier=${KAP_SPARK_IDENTIFIER} -Dkap.hdfs.working.dir=${KAP_HDFS_WORKING_DIR} -Dkap.metadata.url=${KAP_METADATA_IDENTIFIER} -XX:MaxDirectMemorySize=896M -Dsparder.dict.cache.size=${SPARDER [...]
+kylin.storage.columnar.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current
+kylin.storage.columnar.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
+#kylin.storage.columnar.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializer
+kylin.storage.columnar.spark-conf.spark.driver.memory=512m
+kylin.storage.columnar.spark-conf.spark.executor.memory=512m
+kylin.storage.columnar.spark-conf.spark.yarn.executor.memoryOverhead=512
+kylin.storage.columnar.spark-conf.yarn.am.memory=512m
+kylin.storage.columnar.spark-conf.spark.executor.cores=1
+kylin.storage.columnar.spark-conf.spark.executor.instances=1
+kylin.storage.columnar.spark-conf.spark.task.maxFailures=1
+kylin.storage.columnar.spark-conf.spark.ui.port=4041
+kylin.storage.columnar.spark-conf.spark.locality.wait=0s
+kylin.storage.columnar.spark-conf.spark.sql.dialect=hiveql
+kylin.storage.columnar.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false
+kylin.storage.columnar.spark-conf.hive.execution.engine=MR
+kylin.storage.columnar.spark-conf.spark.scheduler.listenerbus.eventqueue.size=100000000
+kylin.storage.columnar.spark-conf.spark.master=yarn-client
+kylin.storage.columnar.spark-conf.spark.broadcast.compress=false
+
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
index 65911a0..b0efc91 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
@@ -18,6 +18,7 @@
package org.apache.kylin.cube.kv;
+import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.dimension.DictionaryDimEnc;
import org.apache.kylin.dimension.DimensionEncoding;
@@ -57,6 +58,8 @@ public class RowKeyColumnIO implements java.io.Serializable {
public String readColumnString(TblColRef col, byte[] bytes, int offset, int length) {
DimensionEncoding dimEnc = dimEncMap.get(col);
+ if (dimEnc instanceof DictionaryDimEnc)
+ return String.valueOf(BytesUtil.readUnsigned(bytes, offset, length));
return dimEnc.decode(bytes, offset, length);
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureCodec.java b/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureCodec.java
index 44e5708..ec8069f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureCodec.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureCodec.java
@@ -40,6 +40,10 @@ public class BufferedMeasureCodec implements java.io.Serializable {
private transient ByteBuffer buf;
final private int[] measureSizes;
+ public MeasureCodec getCodec() {
+ return codec;
+ }
+
public BufferedMeasureCodec(Collection<MeasureDesc> measureDescs) {
this.codec = new MeasureCodec(measureDescs);
this.measureSizes = new int[codec.getMeasuresCount()];
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java
index e552574..3e7de24 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java
@@ -23,6 +23,7 @@ public interface IStorageAware {
public static final int ID_HBASE = 0;
public static final int ID_HYBRID = 1;
public static final int ID_SHARDED_HBASE = 2;
+ public static final int ID_PARQUET = 4;
int getStorageType();
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index 102995e..f6579ef 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -307,6 +307,11 @@ public class CubeStatsReader {
return ret;
}
+ public double estimateCuboidSize(long cuboidId) {
+ Map<Long, Double> cuboidSizeMap = getCuboidSizeMap();
+ return cuboidSizeMap.get(cuboidId);
+ }
+
public List<Long> getCuboidsByLayer(int level) {
if (cuboidScheduler == null) {
throw new UnsupportedOperationException("cuboid scheduler is null");
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 3f3c14d..41e4e49 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -35,6 +35,8 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.kylin.metadata.model.IStorageAware.ID_PARQUET;
+
/**
*/
public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
@@ -73,11 +75,16 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
// add materialize lookup tables if needed
LookupMaterializeContext lookupMaterializeContext = addMaterializeLookupTableSteps(result);
- outputSide.addStepPhase2_BuildDictionary(result);
+ if (seg.getStorageType() != ID_PARQUET) {
+ outputSide.addStepPhase2_BuildDictionary(result);
+ }
// Phase 3: Build Cube
addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, only selected algorithm will execute
- outputSide.addStepPhase3_BuildCube(result);
+
+ if (seg.getStorageType() != ID_PARQUET) {
+ outputSide.addStepPhase3_BuildCube(result);
+ }
// Phase 4: Update Metadata & Cleanup
result.addTask(createUpdateCubeInfoAfterBuildStep(jobId, lookupMaterializeContext));
@@ -116,7 +123,11 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
final SparkExecutable sparkExecutable = new SparkExecutable();
- sparkExecutable.setClassName(SparkCubingByLayer.class.getName());
+ if (seg.getStorageType() == ID_PARQUET) {
+ sparkExecutable.setClassName(SparkCubingByLayerParquet.class.getName());
+ } else {
+ sparkExecutable.setClassName(SparkCubingByLayer.class.getName());
+ }
configureSparkJob(seg, sparkExecutable, jobId, cuboidRootPath);
result.addTask(sparkExecutable);
}
@@ -142,6 +153,10 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars());
sparkExecutable.setJars(jars.toString());
sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_SPARK_CUBE);
+
+ if (seg.getStorageType() == ID_PARQUET) {
+ sparkExecutable.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES, getCounterOuputPath(jobId));
+ }
}
public String getSegmentMetadataUrl(KylinConfig kylinConfig, String jobId) {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 2f453c5..ed35f54 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -23,7 +23,9 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
+import com.google.common.collect.Maps;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
@@ -55,6 +57,7 @@ import org.apache.kylin.engine.mr.common.CubeStatsReader;
import org.apache.kylin.engine.mr.common.NDCuboidBuilder;
import org.apache.kylin.engine.mr.common.SerializableConfiguration;
import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.measure.BufferedMeasureCodec;
import org.apache.kylin.measure.MeasureAggregators;
import org.apache.kylin.measure.MeasureIngester;
@@ -66,6 +69,7 @@ import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.SQLContext;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -91,6 +95,8 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
.withDescription("Hive Intermediate Table").create("hiveTable");
public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
.isRequired(true).withDescription("Hive Intermediate Table PATH").create(BatchConstants.ARG_INPUT);
+ public static final Option OPTION_COUNTER_PATH = OptionBuilder.withArgName(BatchConstants.ARG_COUNTER_OUPUT).hasArg()
+ .isRequired(false).withDescription("counter output path").create(BatchConstants.ARG_COUNTER_OUPUT);
private Options options;
@@ -102,6 +108,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
options.addOption(OPTION_SEGMENT_ID);
options.addOption(OPTION_META_URL);
options.addOption(OPTION_OUTPUT_PATH);
+ options.addOption(OPTION_COUNTER_PATH);
}
@Override
@@ -117,6 +124,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+ String counterPath = optionsHelper.getOptionValue(OPTION_COUNTER_PATH);
Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1") };
@@ -128,6 +136,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
KylinSparkJobListener jobListener = new KylinSparkJobListener();
JavaSparkContext sc = new JavaSparkContext(conf);
+
sc.sc().addSparkListener(jobListener);
HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
SparkUtil.modifySparkHadoopConfiguration(sc.sc()); // set dfs.replication=2 and enable compress
@@ -205,8 +214,17 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
}
allRDDs[totalLevels].unpersist();
logger.info("Finished on calculating all level cuboids.");
+
+ // only parquet storage work
logger.info("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
- //HadoopUtil.deleteHDFSMeta(metaUrl);
+
+ if (counterPath != null) {
+ Map<String, String> counterMap = Maps.newHashMap();
+ counterMap.put(ExecutableConstants.HDFS_BYTES_WRITTEN, String.valueOf(jobListener.metrics.getBytesWritten()));
+
+ // save counter to hdfs
+ HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, counterMap);
+ }
}
protected JavaPairRDD<ByteArray, Object[]> prepareOutput(JavaPairRDD<ByteArray, Object[]> rdd, KylinConfig config,
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerParquet.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerParquet.java
new file mode 100644
index 0000000..d8fccf9
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerParquet.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.kv.RowKeyDecoder;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dimension.AbstractDateDimEnc;
+import org.apache.kylin.dimension.DimensionEncoding;
+import org.apache.kylin.dimension.FixedLenDimEnc;
+import org.apache.kylin.dimension.FixedLenHexDimEnc;
+import org.apache.kylin.dimension.IDimensionEncodingMap;
+import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.basic.BasicMeasureType;
+import org.apache.kylin.measure.basic.BigDecimalIngester;
+import org.apache.kylin.measure.basic.DoubleIngester;
+import org.apache.kylin.measure.basic.LongIngester;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.GroupFactory;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFunction;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+public class SparkCubingByLayerParquet extends SparkCubingByLayer {
+ @Override
+ protected void saveToHDFS(JavaPairRDD<ByteArray, Object[]> rdd, String metaUrl, String cubeName, CubeSegment cubeSeg, String hdfsBaseLocation, int level, Job job, KylinConfig kylinConfig) throws Exception {
+ final IDimensionEncodingMap dimEncMap = cubeSeg.getDimensionEncodingMap();
+
+ Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeSeg.getCubeDesc());
+
+ final Map<TblColRef, String> colTypeMap = Maps.newHashMap();
+ final Map<MeasureDesc, String> meaTypeMap = Maps.newHashMap();
+
+ MessageType schema = cuboidToMessageType(baseCuboid, dimEncMap, cubeSeg.getCubeDesc(), colTypeMap, meaTypeMap);
+
+ logger.info("Schema: {}", schema.toString());
+
+ final CuboidToPartitionMapping cuboidToPartitionMapping = new CuboidToPartitionMapping(cubeSeg, kylinConfig, level);
+
+ logger.info("CuboidToPartitionMapping: {}", cuboidToPartitionMapping.toString());
+
+ JavaPairRDD<ByteArray, Object[]> repartitionedRDD = rdd.repartitionAndSortWithinPartitions(new CuboidPartitioner(cuboidToPartitionMapping));
+
+ String output = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
+
+ job.setOutputFormatClass(CustomParquetOutputFormat.class);
+ GroupWriteSupport.setSchema(schema, job.getConfiguration());
+ CustomParquetOutputFormat.setOutputPath(job, new Path(output));
+ CustomParquetOutputFormat.setWriteSupportClass(job, GroupWriteSupport.class);
+ CustomParquetOutputFormat.setCuboidToPartitionMapping(job, cuboidToPartitionMapping);
+
+ JavaPairRDD<Void, Group> groupRDD = repartitionedRDD.mapToPair(new GenerateGroupRDDFunction(cubeName, cubeSeg.getUuid(), metaUrl, new SerializableConfiguration(job.getConfiguration()), colTypeMap, meaTypeMap));
+
+ groupRDD.saveAsNewAPIHadoopDataset(job.getConfiguration());
+ }
+
+ static class CuboidPartitioner extends Partitioner {
+
+ private CuboidToPartitionMapping mapping;
+
+ public CuboidPartitioner(CuboidToPartitionMapping cuboidToPartitionMapping) {
+ this.mapping = cuboidToPartitionMapping;
+ }
+
+ @Override
+ public int numPartitions() {
+ return mapping.getNumPartitions();
+ }
+
+ @Override
+ public int getPartition(Object key) {
+ ByteArray byteArray = (ByteArray) key;
+ long cuboidId = Bytes.toLong(byteArray.array(), RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN);
+
+ return mapping.getRandomPartitionForCuboidId(cuboidId);
+ }
+ }
+
+ public static class CuboidToPartitionMapping implements Serializable {
+ private Map<Long, List<Integer>> cuboidPartitions;
+ private int partitionNum;
+
+ public CuboidToPartitionMapping(Map<Long, List<Integer>> cuboidPartitions) {
+ this.cuboidPartitions = cuboidPartitions;
+ int partitions = 0;
+ for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
+ partitions = partitions + entry.getValue().size();
+ }
+ this.partitionNum = partitions;
+ }
+
+ public CuboidToPartitionMapping(CubeSegment cubeSeg, KylinConfig kylinConfig, int level) throws IOException {
+ cuboidPartitions = Maps.newHashMap();
+
+ List<Long> layeredCuboids = cubeSeg.getCuboidScheduler().getCuboidsByLayer().get(level);
+ CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSeg, kylinConfig);
+
+ int position = 0;
+ for (Long cuboidId : layeredCuboids) {
+ int partition = estimateCuboidPartitionNum(cuboidId, cubeStatsReader, kylinConfig);
+ List<Integer> positions = Lists.newArrayListWithCapacity(partition);
+
+ for (int i = position; i < position + partition; i++) {
+ positions.add(i);
+ }
+
+ cuboidPartitions.put(cuboidId, positions);
+ position = position + partition;
+ }
+
+ this.partitionNum = position;
+ }
+
+ public String serialize() throws JsonProcessingException {
+ return JsonUtil.writeValueAsString(cuboidPartitions);
+ }
+
+ public static CuboidToPartitionMapping deserialize(String jsonMapping) throws IOException {
+ Map<Long, List<Integer>> cuboidPartitions = JsonUtil.readValue(jsonMapping, new TypeReference<Map<Long, List<Integer>>>() {});
+ return new CuboidToPartitionMapping(cuboidPartitions);
+ }
+
+ public int getNumPartitions() {
+ return this.partitionNum;
+ }
+
+ public long getCuboidIdByPartition(int partition) {
+ for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
+ if (entry.getValue().contains(partition)) {
+ return entry.getKey();
+ }
+ }
+
+ throw new IllegalArgumentException("No cuboidId for partition id: " + partition);
+ }
+
+ public int getRandomPartitionForCuboidId(long cuboidId) {
+ List<Integer> partitions = cuboidPartitions.get(cuboidId);
+ return partitions.get(new Random().nextInt(partitions.size()));
+ }
+
+ public int getPartitionNumForCuboidId(long cuboidId) {
+ return cuboidPartitions.get(cuboidId).size();
+ }
+
+ public String getPartitionFilePrefix(int partition) {
+ String prefix = "cuboid_";
+ long cuboid = getCuboidIdByPartition(partition);
+ int partNum = partition % getPartitionNumForCuboidId(cuboid);
+ prefix = prefix + cuboid + "_part" + partNum;
+
+ return prefix;
+ }
+
+ private int estimateCuboidPartitionNum(long cuboidId, CubeStatsReader cubeStatsReader, KylinConfig kylinConfig) {
+ double cuboidSize = cubeStatsReader.estimateCuboidSize(cuboidId);
+ float rddCut = kylinConfig.getSparkRDDPartitionCutMB();
+ int partition = (int) (cuboidSize / rddCut);
+ partition = Math.max(kylinConfig.getSparkMinPartition(), partition);
+ partition = Math.min(kylinConfig.getSparkMaxPartition(), partition);
+ return partition;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
+ sb.append("cuboidId:").append(entry.getKey()).append(" [").append(StringUtils.join(entry.getValue(), ",")).append("]\n");
+ }
+
+ return sb.toString();
+ }
+ }
+
+ public static class CustomParquetOutputFormat extends ParquetOutputFormat {
+ public static final String CUBOID_TO_PARTITION_MAPPING = "cuboidToPartitionMapping";
+
+ @Override
+ public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException {
+ FileOutputCommitter committer = (FileOutputCommitter)this.getOutputCommitter(context);
+ TaskID taskId = context.getTaskAttemptID().getTaskID();
+ int partition = taskId.getId();
+
+ CuboidToPartitionMapping mapping = CuboidToPartitionMapping.deserialize(context.getConfiguration().get(CUBOID_TO_PARTITION_MAPPING));
+
+ return new Path(committer.getWorkPath(), getUniqueFile(context, mapping.getPartitionFilePrefix(partition)+ "-" + getOutputName(context), extension));
+ }
+
+ public static void setCuboidToPartitionMapping(Job job, CuboidToPartitionMapping cuboidToPartitionMapping) throws IOException {
+ String jsonStr = cuboidToPartitionMapping.serialize();
+
+ job.getConfiguration().set(CUBOID_TO_PARTITION_MAPPING, jsonStr);
+ }
+ }
+
+ static class GenerateGroupRDDFunction implements PairFunction<Tuple2<ByteArray, Object[]>, Void, Group> {
+ private volatile transient boolean initialized = false;
+ private String cubeName;
+ private String segmentId;
+ private String metaUrl;
+ private SerializableConfiguration conf;
+ private List<MeasureDesc> measureDescs;
+ private RowKeyDecoder decoder;
+ private Map<TblColRef, String> colTypeMap;
+ private Map<MeasureDesc, String> meaTypeMap;
+ private GroupFactory factory;
+ private BufferedMeasureCodec measureCodec;
+
+ public GenerateGroupRDDFunction(String cubeName, String segmentId, String metaurl, SerializableConfiguration conf, Map<TblColRef, String> colTypeMap, Map<MeasureDesc, String> meaTypeMap) {
+ this.cubeName = cubeName;
+ this.segmentId = segmentId;
+ this.metaUrl = metaurl;
+ this.conf = conf;
+ this.colTypeMap = colTypeMap;
+ this.meaTypeMap = meaTypeMap;
+ }
+
+ private void init() {
+ KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+ KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
+ CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
+ CubeDesc cubeDesc = cubeInstance.getDescriptor();
+ CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+ measureDescs = cubeDesc.getMeasures();
+ decoder = new RowKeyDecoder(cubeSegment);
+ factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(conf.get()));
+ measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures());
+ }
+
+ @Override
+ public Tuple2<Void, Group> call(Tuple2<ByteArray, Object[]> tuple) throws Exception {
+ if (initialized == false) {
+ synchronized (SparkCubingByLayer.class) {
+ if (initialized == false) {
+ init();
+ }
+ }
+ }
+
+ long cuboid = decoder.decode(tuple._1.array());
+ List<String> values = decoder.getValues();
+ List<TblColRef> columns = decoder.getColumns();
+
+ Group group = factory.newGroup();
+
+ // for check
+ group.append("cuboidId", cuboid);
+
+ for (int i = 0; i < columns.size(); i++) {
+ TblColRef column = columns.get(i);
+ parseColValue(group, column, values.get(i));
+ }
+
+ ByteBuffer valueBuf = measureCodec.encode(tuple._2());
+ byte[] encodedBytes = new byte[valueBuf.position()];
+ System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position());
+
+ int[] valueLengths = measureCodec.getCodec().getPeekLength(ByteBuffer.wrap(encodedBytes));
+
+ int valueOffset = 0;
+ for (int i = 0; i < valueLengths.length; ++i) {
+ MeasureDesc measureDesc = measureDescs.get(i);
+ parseMeaValue(group, measureDesc, encodedBytes, valueOffset, valueLengths[i]);
+ valueOffset += valueLengths[i];
+ }
+
+ return new Tuple2<>(null, group);
+ }
+
+ private void parseColValue(final Group group, final TblColRef colRef, final String value) {
+ switch (colTypeMap.get(colRef)) {
+ case "int":
+ group.append(colRef.getTableAlias() + "_" + colRef.getName(), Integer.valueOf(value));
+ break;
+ case "long":
+ group.append(colRef.getTableAlias() + "_" + colRef.getName(), Long.valueOf(value));
+ break;
+ default:
+ group.append(colRef.getTableAlias() + "_" + colRef.getName(), Binary.fromString(value));
+ break;
+ }
+ }
+
+ private void parseMeaValue(final Group group, final MeasureDesc measureDesc, final byte[] value, final int offset, final int length) {
+ switch (meaTypeMap.get(measureDesc)) {
+ case "long":
+ group.append(measureDesc.getName(), BytesUtil.readLong(value, offset, length));
+ break;
+ case "double":
+ group.append(measureDesc.getName(), ByteBuffer.wrap(value, offset, length).getDouble());
+ break;
+ default:
+ group.append(measureDesc.getName(), Binary.fromConstantByteArray(value, offset, length));
+ break;
+ }
+ }
+ }
+
+ private MessageType cuboidToMessageType(Cuboid cuboid, IDimensionEncodingMap dimEncMap, CubeDesc cubeDesc, Map<TblColRef, String> colTypeMap, Map<MeasureDesc, String> meaTypeMap) {
+ Types.MessageTypeBuilder builder = Types.buildMessage();
+
+ List<TblColRef> colRefs = cuboid.getColumns();
+
+ builder.optional(PrimitiveType.PrimitiveTypeName.INT64).named("cuboidId");
+
+ for (TblColRef colRef : colRefs) {
+ DimensionEncoding dimEnc = dimEncMap.get(colRef);
+
+ if (dimEnc instanceof AbstractDateDimEnc) {
+ builder.optional(PrimitiveType.PrimitiveTypeName.INT64).named(getColName(colRef));
+ colTypeMap.put(colRef, "long");
+ } else if (dimEnc instanceof FixedLenDimEnc || dimEnc instanceof FixedLenHexDimEnc) {
+ org.apache.kylin.metadata.datatype.DataType colDataType = colRef.getType();
+ if (colDataType.isNumberFamily() || colDataType.isDateTimeFamily()){
+ builder.optional(PrimitiveType.PrimitiveTypeName.INT64).named(getColName(colRef));
+ colTypeMap.put(colRef, "long");
+ } else {
+ // stringFamily && default
+ builder.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(getColName(colRef));
+ colTypeMap.put(colRef, "string");
+ }
+ } else {
+ builder.optional(PrimitiveType.PrimitiveTypeName.INT32).named(getColName(colRef));
+ colTypeMap.put(colRef, "int");
+ }
+ }
+
+ MeasureIngester[] aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures());
+
+ for (int i = 0; i < cubeDesc.getMeasures().size(); i++) {
+ MeasureDesc measureDesc = cubeDesc.getMeasures().get(i);
+ org.apache.kylin.metadata.datatype.DataType meaDataType = measureDesc.getFunction().getReturnDataType();
+ MeasureType measureType = measureDesc.getFunction().getMeasureType();
+
+ if (measureType instanceof BasicMeasureType) {
+ MeasureIngester measureIngester = aggrIngesters[i];
+ if (measureIngester instanceof LongIngester) {
+ builder.required(PrimitiveType.PrimitiveTypeName.INT64).named(measureDesc.getName());
+ meaTypeMap.put(measureDesc, "long");
+ } else if (measureIngester instanceof DoubleIngester) {
+ builder.required(PrimitiveType.PrimitiveTypeName.DOUBLE).named(measureDesc.getName());
+ meaTypeMap.put(measureDesc, "double");
+ } else if (measureIngester instanceof BigDecimalIngester) {
+ builder.required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.DECIMAL).precision(meaDataType.getPrecision()).scale(meaDataType.getScale()).named(measureDesc.getName());
+ meaTypeMap.put(measureDesc, "decimal");
+ } else {
+ builder.required(PrimitiveType.PrimitiveTypeName.BINARY).named(measureDesc.getName());
+ meaTypeMap.put(measureDesc, "binary");
+ }
+ } else {
+ builder.required(PrimitiveType.PrimitiveTypeName.BINARY).named(measureDesc.getName());
+ meaTypeMap.put(measureDesc, "binary");
+ }
+ }
+
+ return builder.named(String.valueOf(cuboid.getId()));
+ }
+
+ private String getColName(TblColRef colRef) {
+ return colRef.getTableAlias() + "_" + colRef.getName();
+ }
+}
diff --git a/pom.xml b/pom.xml
index 680144f..3585927 100644
--- a/pom.xml
+++ b/pom.xml
@@ -359,6 +359,12 @@
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-storage-parquet</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-server-base</artifactId>
diff --git a/server-base/pom.xml b/server-base/pom.xml
index 4cd3f76..29b193b 100644
--- a/server-base/pom.xml
+++ b/server-base/pom.xml
@@ -66,6 +66,10 @@
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-storage-parquet</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
<artifactId>kylin-source-hive</artifactId>
</dependency>
<dependency>
diff --git a/server/pom.xml b/server/pom.xml
index b1365a7..a898eff 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -102,6 +102,12 @@
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-storage-parquet</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
<artifactId>kylin-server-base</artifactId>
<type>test-jar</type>
<scope>test</scope>
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/ParquetStorage.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/ParquetStorage.java
new file mode 100644
index 0000000..cddce23
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/ParquetStorage.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.parquet;
+
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.engine.spark.ISparkOutput;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.storage.IStorage;
+import org.apache.kylin.storage.IStorageQuery;
+import org.apache.kylin.storage.parquet.cube.CubeStorageQuery;
+import org.apache.kylin.storage.parquet.steps.ParquetSparkOutput;
+
+public class ParquetStorage implements IStorage {
+ @Override
+ public IStorageQuery createQuery(IRealization realization) {
+ if (realization.getType() == RealizationType.CUBE) {
+ return new CubeStorageQuery((CubeInstance) realization);
+ } else {
+ throw new IllegalStateException(
+ "Unsupported realization type for ParquetStorage: " + realization.getType());
+ }
+ }
+
+ @Override
+ public <I> I adaptToBuildEngine(Class<I> engineInterface) {
+ if (engineInterface == ISparkOutput.class) {
+ return (I) new ParquetSparkOutput();
+ } else {
+ throw new RuntimeException("Cannot adapt to " + engineInterface);
+ }
+ }
+}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
similarity index 50%
copy from core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java
copy to storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
index e552574..6a3ad59 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IStorageAware.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
@@ -6,23 +6,38 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
-package org.apache.kylin.metadata.model;
+package org.apache.kylin.storage.parquet.cube;
-public interface IStorageAware {
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.metadata.tuple.TupleInfo;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase;
- public static final int ID_HBASE = 0;
- public static final int ID_HYBRID = 1;
- public static final int ID_SHARDED_HBASE = 2;
+public class CubeStorageQuery extends GTCubeStorageQueryBase {
- int getStorageType();
+ public CubeStorageQuery(CubeInstance cube) {
+ super(cube);
+ }
+
+ @Override
+ public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
+ return super.search(context, sqlDigest, returnTupleInfo);
+ }
+
+ @Override
+ protected String getGTStorage() {
+ return null;
+ }
}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkOutput.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkOutput.java
new file mode 100644
index 0000000..6f82d8b
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkOutput.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.parquet.steps;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.spark.ISparkOutput;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+import java.util.List;
+
+public class ParquetSparkOutput implements ISparkOutput {
+ @Override
+ public ISparkBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg) {
+ return new ISparkBatchCubingOutputSide() {
+ @Override
+ public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) {
+
+ }
+
+ @Override
+ public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
+
+ }
+
+ @Override
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+
+ }
+ };
+ }
+
+ @Override
+ public ISparkBatchMergeOutputSide getBatchMergeOutputSide(CubeSegment seg) {
+ return new ISparkBatchMergeOutputSide() {
+ @Override
+ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+
+ }
+
+ @Override
+ public void addStepPhase2_BuildCube(CubeSegment set, List<CubeSegment> mergingSegments, DefaultChainedExecutable jobFlow) {
+
+ }
+
+ @Override
+ public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) {
+
+ }
+ };
+ }
+
+ @Override
+ public ISparkBatchOptimizeOutputSide getBatchOptimizeOutputSide(CubeSegment seg) {
+ return null;
+ }
+}
diff --git a/webapp/app/js/controllers/cubeSchema.js b/webapp/app/js/controllers/cubeSchema.js
index 5eed9b9..0ac8a96 100755
--- a/webapp/app/js/controllers/cubeSchema.js
+++ b/webapp/app/js/controllers/cubeSchema.js
@@ -18,7 +18,7 @@
'use strict';
-KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserService,modelsManager, ProjectService, AuthenticationService,$filter,ModelService,MetaModel,CubeDescModel,CubeList,TableModel,ProjectModel,ModelDescService,SweetAlert,cubesManager,StreamingService,CubeService,VdmUtil) {
+KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserService,modelsManager, ProjectService, AuthenticationService,$filter,ModelService,MetaModel,CubeDescModel,CubeList,TableModel,ProjectModel,ModelDescService,SweetAlert,cubesManager,StreamingService,CubeService,VdmUtil, kylinConfig) {
$scope.modelsManager = modelsManager;
$scope.cubesManager = cubesManager;
$scope.projects = [];
@@ -419,6 +419,13 @@ KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserServic
}
});
+ // set default value for null type
+ if ($scope.cubeMetaFrame.engine_type === null) {
+ $scope.cubeMetaFrame.engine_type = kylinConfig.getCubeEng();
+ }
+ if ($scope.cubeMetaFrame.storage_type === null) {
+ $scope.cubeMetaFrame.storage_type = kylinConfig.getStorageEng();
+ }
var errorInfo = "";
angular.forEach(errors,function(item){
diff --git a/webapp/app/js/model/cubeConfig.js b/webapp/app/js/model/cubeConfig.js
index a83d4c9..42e4c34 100644
--- a/webapp/app/js/model/cubeConfig.js
+++ b/webapp/app/js/model/cubeConfig.js
@@ -27,6 +27,10 @@ KylinApp.constant('cubeConfig', {
{name:'MapReduce',value: 2},
{name:'Spark',value: 4}
],
+ storageTypes: [
+ {name: 'HBase', value: 2},
+ {name: 'Parquet', value: 4}
+ ],
joinTypes: [
{name: 'Left', value: 'left'},
{name: 'Inner', value: 'inner'}
diff --git a/webapp/app/partials/cubeDesigner/advanced_settings.html b/webapp/app/partials/cubeDesigner/advanced_settings.html
index 89229d0..a6674da 100755
--- a/webapp/app/partials/cubeDesigner/advanced_settings.html
+++ b/webapp/app/partials/cubeDesigner/advanced_settings.html
@@ -371,7 +371,7 @@
<!--Cube Engine-->
<div class="form-group large-popover" style="margin-bottom:30px;">
<h3 style="margin-left:42px;margin-bottom:30px;">Cube Engine <i kylinpopover placement="right" title="Cube Engine" template="CubeEngineTip.html" class="fa fa-info-circle"></i></h3>
- <div class="row" style="margin-left:42px">
+ <div class="row" style="margin-left:42px;margin-bottom:20px;">
<label class="control-label col-xs-12 col-sm-3 no-padding-right font-color-default"><b>Engine Type :</b></label>
<div class="col-xs-12 col-sm-6">
<select style="width: 100%" chosen
@@ -384,6 +384,19 @@
<span ng-if="state.mode=='view'&&cubeMetaFrame.engine_type==4">Spark</span>
</div>
</div>
+ <div class="row" style="margin-left:42px">
+ <label class="control-label col-xs-12 col-sm-3 no-padding-right font-color-default"><b>Storage Type :</b></label>
+ <div class="col-xs-12 col-sm-6">
+ <select style="width: 100%" chosen
+ ng-model="cubeMetaFrame.storage_type"
+ ng-if="state.mode=='edit'"
+ ng-options="st.value as st.name for st in cubeConfig.storageTypes">
+ <option value="">--Select Storage Type--</option>
+ </select>
+ <span ng-if="state.mode=='view'&&cubeMetaFrame.storage_type==2">HBase</span>
+ <span ng-if="state.mode=='view'&&cubeMetaFrame.storage_type==4">Parquet</span>
+ </div>
+ </div>
</div>
<div class="form-group large-popover">
<h3 style="margin-left:42px">Advanced Dictionaries <i kylinpopover placement="right" title="Advanced Dictionaries" template="AdvancedDictionariesTip.html" class="fa fa-info-circle"></i></h3>