You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/12/10 03:41:26 UTC

[kylin] 02/06: KYLIN-3624 Convert sequence files to Parquet in Spark

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch kylin-on-parquet
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 5566c5956f9fe2ad1810a2cb0a3f3c61aba2a71c
Author: Yichen Zhou <zh...@gmail.com>
AuthorDate: Thu Oct 11 21:01:53 2018 +0800

    KYLIN-3624 Convert sequence files to Parquet in Spark
---
 assembly/pom.xml                                   |   4 +
 .../kylin/job/constant/ExecutableConstants.java    |   1 +
 .../apache/kylin/engine/mr/JobBuilderSupport.java  |  10 +-
 .../engine/spark/SparkBatchCubingJobBuilder2.java  |  19 +-
 .../kylin/engine/spark/SparkCubingByLayer.java     |  10 +-
 examples/test_case_data/sandbox/kylin.properties   |   1 -
 .../kylin/storage/hbase/steps/HBaseSparkSteps.java |   2 +-
 .../kylin/storage/parquet/ParquetStorage.java      |   6 +-
 .../kylin/storage/parquet/cube/CubeSparkRPC.java   | 107 ++++
 .../storage/parquet/steps/ParquetJobSteps.java     |  37 ++
 .../storage/parquet/steps/ParquetMROutput.java     | 103 ++++
 .../storage/parquet/steps/ParquetSparkOutput.java  |   6 +-
 .../storage/parquet/steps/ParquetSparkSteps.java   |  66 +++
 .../storage/parquet/steps/SparkCubeParquet.java    | 543 +++++++++++++++++++++
 14 files changed, 891 insertions(+), 24 deletions(-)

diff --git a/assembly/pom.xml b/assembly/pom.xml
index dd3211a..c74f6c8 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -52,6 +52,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-storage-parquet</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-engine-mr</artifactId>
         </dependency>
         <dependency>
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 5735a80..d405772 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -52,6 +52,7 @@ public final class ExecutableConstants {
     public static final String STEP_NAME_CREATE_HBASE_TABLE = "Create HTable";
     public static final String STEP_NAME_CONVERT_CUBOID_TO_HFILE = "Convert Cuboid Data to HFile";
     public static final String STEP_NAME_BULK_LOAD_HFILE = "Load HFile to HBase Table";
+    public static final String STEP_NAME_CONVERT_CUBOID_TO_PARQUET = "Convert Cuboid Data to Parquet";
     public static final String STEP_NAME_COPY_DICTIONARY = "Copy dictionary from Old Segment";
     public static final String STEP_NAME_MERGE_DICTIONARY = "Merge Cuboid Dictionary";
     public static final String STEP_NAME_MERGE_STATISTICS = "Merge Cuboid Statistics";
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 11c7d36..d21638a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -338,10 +338,18 @@ public class JobBuilderSupport {
         return getJobWorkingDir(jobId) + "/hbase-conf.xml";
     }
 
-    public String getCounterOuputPath(String jobId) {
+    public String getCounterOutputPath(String jobId) {
         return getRealizationRootPath(jobId) + "/counter";
     }
 
+    public String getParquetOutputPath(String jobId) {
+        return getRealizationRootPath(jobId) + "/parquet/";
+    }
+
+    public String getParquetOutputPath() {
+        return getParquetOutputPath(seg.getLastBuildJobID());
+    }
+
     // ============================================================================
     // static methods also shared by other job flow participant
     // ----------------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 41e4e49..62ccf03 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -75,16 +75,11 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
         // add materialize lookup tables if needed
         LookupMaterializeContext lookupMaterializeContext = addMaterializeLookupTableSteps(result);
 
-        if (seg.getStorageType() != ID_PARQUET) {
-            outputSide.addStepPhase2_BuildDictionary(result);
-        }
+        outputSide.addStepPhase2_BuildDictionary(result);
 
         // Phase 3: Build Cube
         addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, only selected algorithm will execute
-
-        if (seg.getStorageType() != ID_PARQUET) {
-            outputSide.addStepPhase3_BuildCube(result);
-        }
+        outputSide.addStepPhase3_BuildCube(result);
 
         // Phase 4: Update Metadata & Cleanup
         result.addTask(createUpdateCubeInfoAfterBuildStep(jobId, lookupMaterializeContext));
@@ -110,7 +105,7 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
 
         sparkExecutable.setJobId(jobId);
         sparkExecutable.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
-        sparkExecutable.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES, getCounterOuputPath(jobId));
+        sparkExecutable.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES, getCounterOutputPath(jobId));
 
         StringBuilder jars = new StringBuilder();
 
@@ -123,11 +118,7 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
 
     protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
         final SparkExecutable sparkExecutable = new SparkExecutable();
-        if (seg.getStorageType() == ID_PARQUET) {
-            sparkExecutable.setClassName(SparkCubingByLayerParquet.class.getName());
-        } else {
-            sparkExecutable.setClassName(SparkCubingByLayer.class.getName());
-        }
+        sparkExecutable.setClassName(SparkCubingByLayer.class.getName());
         configureSparkJob(seg, sparkExecutable, jobId, cuboidRootPath);
         result.addTask(sparkExecutable);
     }
@@ -155,7 +146,7 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
         sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_SPARK_CUBE);
 
         if (seg.getStorageType() == ID_PARQUET) {
-            sparkExecutable.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES, getCounterOuputPath(jobId));
+            sparkExecutable.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES, getCounterOutputPath(jobId));
         }
     }
 
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index ed35f54..99e6a67 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -30,7 +30,9 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.ByteArray;
@@ -48,8 +50,6 @@ import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
-import org.apache.kylin.engine.mr.IMROutput2;
-import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BaseCuboidBuilder;
 import org.apache.kylin.engine.mr.common.BatchConstants;
@@ -126,7 +126,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
         String counterPath = optionsHelper.getOptionValue(OPTION_COUNTER_PATH);
 
-        Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1") };
+        Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1"), Class.forName("org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage"), Class.forName("scala.collection.immutable.Set$EmptySet$") };
 
         SparkConf conf = new SparkConf().setAppName("Cubing for:" + cubeName + " segment " + segmentId);
         //serialization conf
@@ -238,8 +238,8 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
         final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
         final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration());
 
-        IMROutput2.IMROutputFormat outputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getOuputFormat();
-        outputFormat.configureJobOutput(job, cuboidOutputPath, cubeSeg, cubeSeg.getCuboidScheduler(), level);
+        FileOutputFormat.setOutputPath(job, new Path(cuboidOutputPath));
+        job.setOutputFormatClass(SequenceFileOutputFormat.class);
 
         prepareOutput(rdd, kylinConfig, cubeSeg, level).mapToPair(
                 new PairFunction<Tuple2<ByteArray, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index e6a6bd6..a89b4da 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -41,7 +41,6 @@ kylin.source.hive.client=cli
 # The metadata store in hbase
 kylin.metadata.url=kylin_metadata@hbase
 
-
 # The storage for final cube file in hbase
 kylin.storage.url=hbase
 
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
index ccab22f..ad83003 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
@@ -71,7 +71,7 @@ public class HBaseSparkSteps extends HBaseJobSteps {
         sparkExecutable.setJars(jars.toString());
 
         sparkExecutable.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
-        sparkExecutable.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES, getCounterOuputPath(jobId));
+        sparkExecutable.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES, getCounterOutputPath(jobId));
 
         return sparkExecutable;
     }
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/ParquetStorage.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/ParquetStorage.java
index cddce23..e05b433 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/ParquetStorage.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/ParquetStorage.java
@@ -19,12 +19,14 @@
 package org.apache.kylin.storage.parquet;
 
 import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.engine.mr.IMROutput2;
 import org.apache.kylin.engine.spark.ISparkOutput;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.storage.IStorage;
 import org.apache.kylin.storage.IStorageQuery;
 import org.apache.kylin.storage.parquet.cube.CubeStorageQuery;
+import org.apache.kylin.storage.parquet.steps.ParquetMROutput;
 import org.apache.kylin.storage.parquet.steps.ParquetSparkOutput;
 
 public class ParquetStorage implements IStorage {
@@ -42,7 +44,9 @@ public class ParquetStorage implements IStorage {
     public <I> I adaptToBuildEngine(Class<I> engineInterface) {
         if (engineInterface == ISparkOutput.class) {
             return (I) new ParquetSparkOutput();
-        } else {
+        } else if (engineInterface == IMROutput2.class) {
+            return (I) new ParquetMROutput();
+        } else{
             throw new RuntimeException("Cannot adapt to " + engineInterface);
         }
     }
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeSparkRPC.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeSparkRPC.java
new file mode 100644
index 0000000..1322da8
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeSparkRPC.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.parquet.cube;
+
+import com.google.common.collect.Lists;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.IGTStorage;
+import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.parquet.spark.ParquetPayload;
+import org.apache.kylin.storage.parquet.spark.SparkSubmitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class CubeSparkRPC implements IGTStorage {
+
+    public static final Logger logger = LoggerFactory.getLogger(CubeSparkRPC.class);
+
+    protected CubeSegment cubeSegment;
+    protected Cuboid cuboid;
+    protected GTInfo gtInfo;
+    protected StorageContext storageContext;
+
+    public CubeSparkRPC(ISegment segment, Cuboid cuboid, GTInfo gtInfo, StorageContext context) {
+        this.cubeSegment = (CubeSegment) segment;
+        this.cuboid = cuboid;
+        this.gtInfo = gtInfo;
+        this.storageContext = context;
+    }
+
+    protected List<Integer> getRequiredParquetColumns(GTScanRequest request) {
+        List<Integer> columnFamilies = Lists.newArrayList();
+
+        for (int i = 0; i < request.getSelectedColBlocks().trueBitCount(); i++) {
+            columnFamilies.add(request.getSelectedColBlocks().trueBitAt(i));
+        }
+
+        return columnFamilies;
+    }
+
+    @Override
+    public IGTScanner getGTScanner(GTScanRequest scanRequest) throws IOException {
+        String scanReqId = Integer.toHexString(System.identityHashCode(scanRequest));
+
+        ParquetPayload.ParquetPayloadBuilder builder = new ParquetPayload.ParquetPayloadBuilder();
+
+        JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(cubeSegment, "");
+
+        List<List<Long>> layeredCuboids = cubeSegment.getCuboidScheduler().getCuboidsByLayer();
+        int level = 0;
+        for (List<Long> levelCuboids : layeredCuboids) {
+            if (levelCuboids.contains(cuboid.getId())) {
+                 break;
+            }
+            level++;
+        }
+
+        String dataFolderName;
+        String parquetRootPath = jobBuilderSupport.getParquetOutputPath();
+        dataFolderName = JobBuilderSupport.getCuboidOutputPathsByLevel(parquetRootPath, level) + "/" + cuboid.getId();
+
+        builder.setGtScanRequest(scanRequest.toByteArray()).setGtScanRequestId(scanReqId)
+                .setKylinProperties(KylinConfig.getInstanceFromEnv().exportAllToString())
+                .setRealizationId(cubeSegment.getCubeInstance().getName()).setSegmentId(cubeSegment.getUuid())
+                .setDataFolderName(dataFolderName)
+                .setMaxRecordLength(scanRequest.getInfo().getMaxLength())
+                .setParquetColumns(getRequiredParquetColumns(scanRequest))
+                .setRealizationType(RealizationType.CUBE.toString()).setQueryId(QueryContext.current().getQueryId())
+                .setSpillEnabled(cubeSegment.getConfig().getQueryCoprocessorSpillEnabled())
+                .setMaxScanBytes(cubeSegment.getConfig().getPartitionMaxScanBytes())
+                .setStartTime(scanRequest.getStartTime()).setStorageType(cubeSegment.getStorageType());
+
+        ParquetPayload payload = builder.createParquetPayload();
+
+        logger.info("The scan {} for segment {} is ready to be submitted to spark client", scanReqId, cubeSegment);
+
+        return SparkSubmitter.submitParquetTask(scanRequest, payload);
+    }
+
+}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetJobSteps.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetJobSteps.java
new file mode 100644
index 0000000..b47a03a
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetJobSteps.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.parquet.steps;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.job.execution.AbstractExecutable;
+
+
+/**
+ * Common steps for building cube into Parquet
+ */
+public abstract class ParquetJobSteps extends JobBuilderSupport {
+
+    public ParquetJobSteps(CubeSegment seg) {
+        super(seg, null);
+    }
+
+
+    abstract public AbstractExecutable createConvertToParquetStep(String jobId);
+}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMROutput.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMROutput.java
new file mode 100644
index 0000000..fe85e24
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMROutput.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.storage.parquet.steps;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IEngineAware;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Created by Yichen on 10/16/18.
+ */
+public class ParquetMROutput implements IMROutput2 {
+
+    private static final Logger logger = LoggerFactory.getLogger(ParquetMROutput.class);
+
+    @Override
+    public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(CubeSegment seg) {
+
+        boolean useSpark = seg.getCubeDesc().getEngineType() == IEngineAware.ID_SPARK;
+
+
+        // TODO need refactor
+        if (!useSpark){
+            throw new RuntimeException("Cannot adapt to MR engine");
+        }
+        final ParquetJobSteps steps = new ParquetSparkSteps(seg);
+
+        return new IMRBatchCubingOutputSide2() {
+
+            @Override
+            public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) {
+            }
+
+            @Override
+            public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
+            }
+
+            @Override
+            public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+            }
+
+            @Override
+            public IMROutputFormat getOuputFormat() {
+                return new ParquetMROutputFormat();
+            }
+        };
+
+    }
+
+    public static class ParquetMROutputFormat implements IMROutputFormat {
+
+        @Override
+        public void configureJobInput(Job job, String input) throws Exception {
+            job.setInputFormatClass(SequenceFileInputFormat.class);
+        }
+
+        @Override
+        public void configureJobOutput(Job job, String output, CubeSegment segment, CuboidScheduler cuboidScheduler,
+                                       int level) throws Exception {
+
+            Path outputPath = new Path(output);
+            FileOutputFormat.setOutputPath(job, outputPath);
+            job.setOutputFormatClass(SequenceFileOutputFormat.class);
+            HadoopUtil.deletePath(job.getConfiguration(), outputPath);
+        }
+    }
+
+    @Override
+    public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(CubeSegment seg) {
+        return null;
+    }
+
+    @Override
+    public IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide(CubeSegment seg) {
+        return null;
+    }
+}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkOutput.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkOutput.java
index 6f82d8b..176afd0 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkOutput.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkOutput.java
@@ -27,14 +27,16 @@ import java.util.List;
 public class ParquetSparkOutput implements ISparkOutput {
     @Override
     public ISparkBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg) {
+        final ParquetJobSteps steps = new ParquetSparkSteps(seg);
+
         return new ISparkBatchCubingOutputSide() {
             @Override
             public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) {
-
             }
 
             @Override
             public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
+                jobFlow.addTask(steps.createConvertToParquetStep(jobFlow.getId()));
 
             }
 
@@ -47,6 +49,7 @@ public class ParquetSparkOutput implements ISparkOutput {
 
     @Override
     public ISparkBatchMergeOutputSide getBatchMergeOutputSide(CubeSegment seg) {
+        final ParquetJobSteps steps = new ParquetSparkSteps(seg);
         return new ISparkBatchMergeOutputSide() {
             @Override
             public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
@@ -55,6 +58,7 @@ public class ParquetSparkOutput implements ISparkOutput {
 
             @Override
             public void addStepPhase2_BuildCube(CubeSegment set, List<CubeSegment> mergingSegments, DefaultChainedExecutable jobFlow) {
+                jobFlow.addTask(steps.createConvertToParquetStep(jobFlow.getId()));
 
             }
 
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkSteps.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkSteps.java
new file mode 100644
index 0000000..296bc68
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkSteps.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.parquet.steps;
+
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeSegment;
+
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.spark.SparkBatchCubingJobBuilder2;
+import org.apache.kylin.engine.spark.SparkExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.AbstractExecutable;
+
+
+public class ParquetSparkSteps extends ParquetJobSteps {
+
+    public ParquetSparkSteps(CubeSegment seg) {
+        super(seg);
+    }
+
+    @Override
+    public AbstractExecutable createConvertToParquetStep(String jobId) {
+
+        String cuboidRootPath = getCuboidRootPath(jobId);
+        String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/");
+
+        SparkBatchCubingJobBuilder2 jobBuilder2 = new SparkBatchCubingJobBuilder2(seg, null);
+        final SparkExecutable sparkExecutable = new SparkExecutable();
+        sparkExecutable.setClassName(SparkCubeParquet.class.getName());
+        sparkExecutable.setParam(SparkCubeParquet.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
+        sparkExecutable.setParam(SparkCubeParquet.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+        sparkExecutable.setParam(SparkCubeParquet.OPTION_INPUT_PATH.getOpt(), inputPath);
+        sparkExecutable.setParam(SparkCubeParquet.OPTION_OUTPUT_PATH.getOpt(), getParquetOutputPath(jobId));
+        sparkExecutable.setParam(SparkCubeParquet.OPTION_META_URL.getOpt(),
+                jobBuilder2.getSegmentMetadataUrl(seg.getConfig(), jobId));
+        sparkExecutable.setJobId(jobId);
+
+        StringBuilder jars = new StringBuilder();
+
+        StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars());
+        sparkExecutable.setJars(jars.toString());
+
+        sparkExecutable.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_PARQUET);
+        sparkExecutable.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES, getCounterOutputPath(jobId));
+
+        return sparkExecutable;
+    }
+
+
+}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/SparkCubeParquet.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/SparkCubeParquet.java
new file mode 100644
index 0000000..2a7c1ee
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/SparkCubeParquet.java
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.storage.parquet.steps;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.kv.RowKeyDecoder;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dimension.AbstractDateDimEnc;
+import org.apache.kylin.dimension.DimensionEncoding;
+import org.apache.kylin.dimension.FixedLenDimEnc;
+import org.apache.kylin.dimension.FixedLenHexDimEnc;
+import org.apache.kylin.dimension.IDimensionEncodingMap;
+import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.engine.spark.KylinSparkJobListener;
+import org.apache.kylin.engine.spark.SparkUtil;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.basic.BasicMeasureType;
+import org.apache.kylin.measure.basic.BigDecimalIngester;
+import org.apache.kylin.measure.basic.DoubleIngester;
+import org.apache.kylin.measure.basic.LongIngester;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.GroupFactory;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
+import org.apache.spark.Partitioner;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+
+public class SparkCubeParquet extends AbstractApplication implements Serializable{
+
+    protected static final Logger logger = LoggerFactory.getLogger(SparkCubeParquet.class);
+
+    public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+            .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
+    public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true)
+            .withDescription("Cube Segment Id").create("segmentId");
+    public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
+            .withDescription("HDFS metadata url").create("metaUrl");
+    public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+            .isRequired(true).withDescription("Paqruet output path").create(BatchConstants.ARG_OUTPUT);
+    public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
+            .isRequired(true).withDescription("Cuboid files PATH").create(BatchConstants.ARG_INPUT);
+    public static final Option OPTION_COUNTER_PATH = OptionBuilder.withArgName(BatchConstants.ARG_COUNTER_OUPUT).hasArg()
+            .isRequired(true).withDescription("Counter output path").create(BatchConstants.ARG_COUNTER_OUPUT);
+
+    private Options options;
+
+    public SparkCubeParquet(){
+        options = new Options();
+        options.addOption(OPTION_INPUT_PATH);
+        options.addOption(OPTION_CUBE_NAME);
+        options.addOption(OPTION_SEGMENT_ID);
+        options.addOption(OPTION_META_URL);
+        options.addOption(OPTION_OUTPUT_PATH);
+        options.addOption(OPTION_COUNTER_PATH);
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        final String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+        final String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+        final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+        final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+        final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+        final String counterPath = optionsHelper.getOptionValue(OPTION_COUNTER_PATH);
+
+        Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1"), Text.class, Group.class};
+
+        SparkConf conf = new SparkConf().setAppName("Converting Parquet File for: " + cubeName + " segment " + segmentId);
+        //serialization conf
+        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+        conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
+        conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
+
+
+        KylinSparkJobListener jobListener = new KylinSparkJobListener();
+        try (JavaSparkContext sc = new JavaSparkContext(conf)){
+            sc.sc().addSparkListener(jobListener);
+            final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+
+            final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+            final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+            final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+
+
+            final FileSystem fs = new Path(inputPath).getFileSystem(sc.hadoopConfiguration());
+
+            final int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel();
+            JavaPairRDD<Text, Text>[] allRDDs = new JavaPairRDD[totalLevels + 1];
+
+            final Job job = Job.getInstance(sConf.get());
+            logger.info("Input path: {}", inputPath);
+            logger.info("Output path: {}", outputPath);
+
+            // Read from cuboid and save to parquet
+            for (int level = 0; level <= totalLevels; level++) {
+                String cuboidPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(inputPath, level);
+                allRDDs[level] = SparkUtil.parseInputPath(cuboidPath, fs, sc, Text.class, Text.class);
+                saveToParquet(allRDDs[level], metaUrl, cubeName, cubeSegment, outputPath, level, job, envConfig);
+            }
+
+            Map<String, String> counterMap = Maps.newHashMap();
+            counterMap.put(ExecutableConstants.HDFS_BYTES_WRITTEN, String.valueOf(jobListener.metrics.getBytesWritten()));
+
+            // save counter to hdfs
+            HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, counterMap);
+        }
+
+    }
+
+    protected void saveToParquet(JavaPairRDD<Text, Text> rdd, String metaUrl, String cubeName, CubeSegment cubeSeg, String parquetOutput, int level, Job job, KylinConfig kylinConfig) throws Exception {
+        final IDimensionEncodingMap dimEncMap = cubeSeg.getDimensionEncodingMap();
+
+        Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeSeg.getCubeDesc());
+
+        final Map<TblColRef, String> colTypeMap = Maps.newHashMap();
+        final Map<MeasureDesc, String> meaTypeMap = Maps.newHashMap();
+
+        MessageType schema = cuboidToMessageType(baseCuboid, dimEncMap, cubeSeg.getCubeDesc(), colTypeMap, meaTypeMap);
+
+        logger.info("Schema: {}", schema.toString());
+
+        final CuboidToPartitionMapping cuboidToPartitionMapping = new CuboidToPartitionMapping(cubeSeg, kylinConfig, level);
+
+        logger.info("CuboidToPartitionMapping: {}", cuboidToPartitionMapping.toString());
+
+        JavaPairRDD<Text, Text> repartitionedRDD = rdd.partitionBy(new CuboidPartitioner(cuboidToPartitionMapping));
+
+        String output = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(parquetOutput, level);
+
+        job.setOutputFormatClass(CustomParquetOutputFormat.class);
+        GroupWriteSupport.setSchema(schema, job.getConfiguration());
+        CustomParquetOutputFormat.setOutputPath(job, new Path(output));
+        CustomParquetOutputFormat.setWriteSupportClass(job, GroupWriteSupport.class);
+        CustomParquetOutputFormat.setCuboidToPartitionMapping(job, cuboidToPartitionMapping);
+
+        JavaPairRDD<Void, Group> groupRDD = rdd.partitionBy(new CuboidPartitioner(cuboidToPartitionMapping))
+                                                .mapToPair(new GenerateGroupRDDFunction(cubeName, cubeSeg.getUuid(), metaUrl, new SerializableConfiguration(job.getConfiguration()), colTypeMap, meaTypeMap));
+
+        groupRDD.saveAsNewAPIHadoopDataset(job.getConfiguration());
+    }
+
+    static class CuboidPartitioner extends Partitioner {
+
+        private CuboidToPartitionMapping mapping;
+
+        public CuboidPartitioner(CuboidToPartitionMapping cuboidToPartitionMapping) {
+            this.mapping = cuboidToPartitionMapping;
+        }
+
+        @Override
+        public int numPartitions() {
+            return mapping.getNumPartitions();
+        }
+
+        @Override
+        public int getPartition(Object key) {
+            Text textKey = (Text)key;
+            return mapping.getPartitionForCuboidId(textKey.getBytes());
+        }
+    }
+
+    public static class CuboidToPartitionMapping implements Serializable {
+        private Map<Long, List<Integer>> cuboidPartitions;
+        private int partitionNum;
+
+        public CuboidToPartitionMapping(Map<Long, List<Integer>> cuboidPartitions) {
+            this.cuboidPartitions = cuboidPartitions;
+            int partitions = 0;
+            for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
+                partitions = partitions + entry.getValue().size();
+            }
+            this.partitionNum = partitions;
+        }
+
+        public CuboidToPartitionMapping(CubeSegment cubeSeg, KylinConfig kylinConfig, int level) throws IOException {
+            cuboidPartitions = Maps.newHashMap();
+
+            List<Long> layeredCuboids = cubeSeg.getCuboidScheduler().getCuboidsByLayer().get(level);
+            CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSeg, kylinConfig);
+
+            int position = 0;
+            for (Long cuboidId : layeredCuboids) {
+                int partition = estimateCuboidPartitionNum(cuboidId, cubeStatsReader, kylinConfig);
+                List<Integer> positions = Lists.newArrayListWithCapacity(partition);
+
+                for (int i = position; i < position + partition; i++) {
+                    positions.add(i);
+                }
+
+                cuboidPartitions.put(cuboidId, positions);
+                position = position + partition;
+            }
+
+            this.partitionNum = position;
+        }
+
+        public String serialize() throws JsonProcessingException {
+            return JsonUtil.writeValueAsString(cuboidPartitions);
+        }
+
+        public static CuboidToPartitionMapping deserialize(String jsonMapping) throws IOException {
+            Map<Long, List<Integer>> cuboidPartitions = JsonUtil.readValue(jsonMapping, new TypeReference<Map<Long, List<Integer>>>() {});
+            return new CuboidToPartitionMapping(cuboidPartitions);
+        }
+
+        public int getNumPartitions() {
+            return this.partitionNum;
+        }
+
+        public long getCuboidIdByPartition(int partition) {
+            for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
+                if (entry.getValue().contains(partition)) {
+                    return entry.getKey();
+                }
+            }
+
+            throw new IllegalArgumentException("No cuboidId for partition id: " + partition);
+        }
+
+        public int getPartitionForCuboidId(byte[] key) {
+            long cuboidId = Bytes.toLong(key, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN);
+            List<Integer> partitions = cuboidPartitions.get(cuboidId);
+            int partitionKey = mod(key, RowConstants.ROWKEY_COL_DEFAULT_LENGTH, key.length, partitions.size());
+
+            return partitions.get(partitionKey);
+        }
+
+        private int mod(byte[] src, int start, int end, int total) {
+            int sum = Bytes.hashBytes(src, start, end - start);
+            int mod = sum % total;
+            if (mod < 0)
+                mod += total;
+
+            return mod;
+        }
+
+        public int getPartitionNumForCuboidId(long cuboidId) {
+            return cuboidPartitions.get(cuboidId).size();
+        }
+
+        public String getPartitionFilePrefix(int partition) {
+            String prefix = "cuboid_";
+            long cuboid = getCuboidIdByPartition(partition);
+            int partNum = partition % getPartitionNumForCuboidId(cuboid);
+            prefix = prefix + cuboid + "_part" + partNum;
+
+            return prefix;
+        }
+
+        private int estimateCuboidPartitionNum(long cuboidId, CubeStatsReader cubeStatsReader, KylinConfig kylinConfig) {
+            double cuboidSize = cubeStatsReader.estimateCuboidSize(cuboidId);
+            float rddCut = kylinConfig.getSparkRDDPartitionCutMB();
+            int partition = (int) (cuboidSize / (rddCut * 10));
+            partition = Math.max(kylinConfig.getSparkMinPartition(), partition);
+            partition = Math.min(kylinConfig.getSparkMaxPartition(), partition);
+            return partition;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
+                sb.append("cuboidId:").append(entry.getKey()).append(" [").append(StringUtils.join(entry.getValue(), ",")).append("]\n");
+            }
+
+            return sb.toString();
+        }
+    }
+
+    public static class CustomParquetOutputFormat extends ParquetOutputFormat {
+        public static final String CUBOID_TO_PARTITION_MAPPING = "cuboidToPartitionMapping";
+
+        @Override
+        public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException {
+            FileOutputCommitter committer = (FileOutputCommitter)this.getOutputCommitter(context);
+            TaskID taskId = context.getTaskAttemptID().getTaskID();
+            int partition = taskId.getId();
+
+            CuboidToPartitionMapping mapping = CuboidToPartitionMapping.deserialize(context.getConfiguration().get(CUBOID_TO_PARTITION_MAPPING));
+
+            return new Path(committer.getWorkPath(), getUniqueFile(context, mapping.getPartitionFilePrefix(partition)+ "-" + getOutputName(context), extension));
+        }
+
+        public static void setCuboidToPartitionMapping(Job job, CuboidToPartitionMapping cuboidToPartitionMapping) throws IOException {
+            String jsonStr = cuboidToPartitionMapping.serialize();
+
+            job.getConfiguration().set(CUBOID_TO_PARTITION_MAPPING, jsonStr);
+        }
+    }
+
+    static class GenerateGroupRDDFunction implements PairFunction<Tuple2<Text, Text>, Void, Group> {
+        private volatile transient boolean initialized = false;
+        private String cubeName;
+        private String segmentId;
+        private String metaUrl;
+        private SerializableConfiguration conf;
+        private List<MeasureDesc> measureDescs;
+        private RowKeyDecoder decoder;
+        private Map<TblColRef, String> colTypeMap;
+        private Map<MeasureDesc, String> meaTypeMap;
+        private GroupFactory factory;
+        private BufferedMeasureCodec measureCodec;
+
+        public GenerateGroupRDDFunction(String cubeName, String segmentId, String metaurl, SerializableConfiguration conf, Map<TblColRef, String> colTypeMap, Map<MeasureDesc, String> meaTypeMap) {
+            this.cubeName = cubeName;
+            this.segmentId = segmentId;
+            this.metaUrl = metaurl;
+            this.conf = conf;
+            this.colTypeMap = colTypeMap;
+            this.meaTypeMap = meaTypeMap;
+        }
+
+        private void init() {
+            KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
+            CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
+            CubeDesc cubeDesc = cubeInstance.getDescriptor();
+            CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+            measureDescs = cubeDesc.getMeasures();
+            decoder = new RowKeyDecoder(cubeSegment);
+            factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(conf.get()));
+            measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures());
+        }
+
+        @Override
+        public Tuple2<Void, Group> call(Tuple2<Text, Text> tuple) throws Exception {
+
+            logger.debug("call: transfer Text to byte[]");
+            if (initialized == false) {
+                synchronized (SparkCubeParquet.class) {
+                    if (initialized == false) {
+                        init();
+                        initialized = true;
+                    }
+                }
+            }
+
+            long cuboid = decoder.decode(tuple._1.getBytes());
+            List<String> values = decoder.getValues();
+            List<TblColRef> columns = decoder.getColumns();
+
+            Group group = factory.newGroup();
+
+            // for check
+            group.append("cuboidId", cuboid);
+
+            for (int i = 0; i < columns.size(); i++) {
+                TblColRef column = columns.get(i);
+                parseColValue(group, column, values.get(i));
+            }
+
+
+            byte[] encodedBytes = tuple._2().copyBytes();
+            int[] valueLengths = measureCodec.getCodec().getPeekLength(ByteBuffer.wrap(encodedBytes));
+
+            int valueOffset = 0;
+            for (int i = 0; i < valueLengths.length; ++i) {
+                MeasureDesc measureDesc = measureDescs.get(i);
+                parseMeaValue(group, measureDesc, encodedBytes, valueOffset, valueLengths[i]);
+                valueOffset += valueLengths[i];
+            }
+
+            return new Tuple2<>(null, group);
+        }
+
+        private void parseColValue(final Group group, final TblColRef colRef, final String value) {
+            if (value==null) {
+                logger.info("value is null");
+                return;
+            }
+            switch (colTypeMap.get(colRef)) {
+                case "int":
+                    group.append(colRef.getTableAlias() + "_" + colRef.getName(), Integer.valueOf(value));
+                    break;
+                case "long":
+                    group.append(colRef.getTableAlias() + "_" + colRef.getName(), Long.valueOf(value));
+                    break;
+                default:
+                    group.append(colRef.getTableAlias() + "_" + colRef.getName(), Binary.fromString(value));
+                    break;
+            }
+        }
+
+        private void parseMeaValue(final Group group, final MeasureDesc measureDesc, final byte[] value, final int offset, final int length) throws IOException {
+            if (value==null) {
+                logger.info("value is null");
+                return;
+            }
+            switch (meaTypeMap.get(measureDesc)) {
+                case "long":
+                    group.append(measureDesc.getName(), BytesUtil.readLong(value, offset, length));
+                    break;
+                case "double":
+                    group.append(measureDesc.getName(), ByteBuffer.wrap(value, offset, length).getDouble());
+                    break;
+                default:
+                    group.append(measureDesc.getName(), Binary.fromConstantByteArray(value, offset, length));
+                    break;
+            }
+        }
+    }
+
+    private MessageType cuboidToMessageType(Cuboid cuboid, IDimensionEncodingMap dimEncMap, CubeDesc cubeDesc, Map<TblColRef, String> colTypeMap, Map<MeasureDesc, String> meaTypeMap) {
+        Types.MessageTypeBuilder builder = Types.buildMessage();
+
+        List<TblColRef> colRefs = cuboid.getColumns();
+
+        builder.optional(PrimitiveType.PrimitiveTypeName.INT64).named("cuboidId");
+
+        for (TblColRef colRef : colRefs) {
+            DimensionEncoding dimEnc = dimEncMap.get(colRef);
+
+            if (dimEnc instanceof AbstractDateDimEnc) {
+                builder.optional(PrimitiveType.PrimitiveTypeName.INT64).named(getColName(colRef));
+                colTypeMap.put(colRef, "long");
+            } else if (dimEnc instanceof FixedLenDimEnc || dimEnc instanceof FixedLenHexDimEnc) {
+                org.apache.kylin.metadata.datatype.DataType colDataType = colRef.getType();
+                if (colDataType.isNumberFamily() || colDataType.isDateTimeFamily()){
+                    builder.optional(PrimitiveType.PrimitiveTypeName.INT64).named(getColName(colRef));
+                    colTypeMap.put(colRef, "long");
+                } else {
+                    // stringFamily && default
+                    builder.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(getColName(colRef));
+                    colTypeMap.put(colRef, "string");
+                }
+            } else {
+                builder.optional(PrimitiveType.PrimitiveTypeName.INT32).named(getColName(colRef));
+                colTypeMap.put(colRef, "int");
+            }
+        }
+
+        MeasureIngester[] aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures());
+
+        for (int i = 0; i < cubeDesc.getMeasures().size(); i++) {
+            MeasureDesc measureDesc = cubeDesc.getMeasures().get(i);
+            org.apache.kylin.metadata.datatype.DataType meaDataType = measureDesc.getFunction().getReturnDataType();
+            MeasureType measureType = measureDesc.getFunction().getMeasureType();
+
+            if (measureType instanceof BasicMeasureType) {
+                MeasureIngester measureIngester = aggrIngesters[i];
+                if (measureIngester instanceof LongIngester) {
+                    builder.required(PrimitiveType.PrimitiveTypeName.INT64).named(measureDesc.getName());
+                    meaTypeMap.put(measureDesc, "long");
+                } else if (measureIngester instanceof DoubleIngester) {
+                    builder.required(PrimitiveType.PrimitiveTypeName.DOUBLE).named(measureDesc.getName());
+                    meaTypeMap.put(measureDesc, "double");
+                } else if (measureIngester instanceof BigDecimalIngester) {
+                    builder.required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.DECIMAL).precision(meaDataType.getPrecision()).scale(meaDataType.getScale()).named(measureDesc.getName());
+                    meaTypeMap.put(measureDesc, "decimal");
+                } else {
+                    builder.required(PrimitiveType.PrimitiveTypeName.BINARY).named(measureDesc.getName());
+                    meaTypeMap.put(measureDesc, "binary");
+                }
+            } else {
+                builder.required(PrimitiveType.PrimitiveTypeName.BINARY).named(measureDesc.getName());
+                meaTypeMap.put(measureDesc, "binary");
+            }
+        }
+
+        return builder.named(String.valueOf(cuboid.getId()));
+    }
+
+    private String getColName(TblColRef colRef) {
+        return colRef.getTableAlias() + "_" + colRef.getName();
+    }
+}