You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/12/10 03:41:28 UTC

[kylin] 04/06: KYLIN-3623 Convert cuboid to Parquet in MR

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch kylin-on-parquet
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit bbf9d256436a921299526e14025f5cbf6071a619
Author: Yichen Zhou <zh...@gmail.com>
AuthorDate: Tue Nov 13 14:51:35 2018 +0800

    KYLIN-3623 Convert cuboid to Parquet in MR
---
 core-common/pom.xml                                |   2 +-
 .../org/apache/kylin/common/KylinConfigBase.java   |  16 +
 .../apache/kylin/cube/cuboid/CuboidScheduler.java  |  29 +-
 .../java/org/apache/kylin/gridtable/GTRecord.java  |   1 +
 .../kylin/engine/mr/common/BatchConstants.java     |   1 +
 .../java/org/apache/spark/sql/KylinSession.scala   |   1 -
 .../java/org/apache/spark/sql/SparderEnv.scala     |   8 -
 storage-parquet/pom.xml                            |  12 +
 .../kylin/storage/parquet/cube/CubeSparkRPC.java   |  12 +-
 .../storage/parquet/spark/ParquetPayload.java      |   1 +
 .../parquet/steps/ConvertToParquetReducer.java     |  98 ++++++
 .../parquet/steps/CuboidToPartitionMapping.java    | 165 ++++++++++
 .../storage/parquet/steps/MRCubeParquetJob.java    | 168 ++++++++++
 .../storage/parquet/steps/ParquetConvertor.java    | 283 +++++++++++++++++
 .../storage/parquet/steps/ParquetJobSteps.java     |   7 +-
 .../storage/parquet/steps/ParquetMROutput.java     |  78 ++++-
 .../storage/parquet/steps/ParquetMRSteps.java      |  59 ++++
 .../storage/parquet/steps/ParquetSparkOutput.java  |   4 +-
 .../storage/parquet/steps/ParquetSparkSteps.java   |   2 +-
 .../storage/parquet/steps/SparkCubeParquet.java    | 338 ++-------------------
 .../java/org/apache/kylin/ext/ItClassLoader.java   |   4 +-
 .../org/apache/kylin/ext/ItSparkClassLoader.java   |  10 +-
 .../org/apache/kylin/ext/SparkClassLoader.java     |  25 +-
 .../org/apache/kylin/ext/TomcatClassLoader.java    |  19 +-
 24 files changed, 966 insertions(+), 377 deletions(-)

diff --git a/core-common/pom.xml b/core-common/pom.xml
index 594e39b..9e1e3ac 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -104,4 +104,4 @@
             <scope>test</scope>
         </dependency>
     </dependencies>
-</project>
+</project>
\ No newline at end of file
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 6092834..e3e3e03 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1121,6 +1121,22 @@ abstract public class KylinConfigBase implements Serializable {
     }
 
     // ============================================================================
+    // STORAGE.Parquet
+    // ============================================================================
+
+    public float getParquetFileSizeMB() {
+        return Integer.parseInt(getOptional("kylin.storage.parquet.file-size-mb", "100"));
+    }
+
+    public int getParquetMinPartitions() {
+        return Integer.parseInt(getOptional("kylin.storage.parquet.min-partitions", "1"));
+    }
+
+    public int getParquetMaxPartitions() {
+        return Integer.parseInt(getOptional("kylin.storage.parquet.max-partitions", "5000"));
+    }
+
+    // ============================================================================
     // ENGINE.MR
     // ============================================================================
 
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
index 17096f6..c1791e1 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
@@ -20,8 +20,10 @@ package org.apache.kylin.cube.cuboid;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import com.google.common.collect.Maps;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.cube.model.AggregationGroup;
 import org.apache.kylin.cube.model.CubeDesc;
@@ -71,6 +73,8 @@ abstract public class CuboidScheduler {
     
     private transient List<List<Long>> cuboidsByLayer;
 
+    private transient Map<Long, Integer> cuboidLayerMap;
+
     public long getBaseCuboidId() {
         return Cuboid.getBaseCuboidId(cubeDesc);
     }
@@ -125,7 +129,30 @@ abstract public class CuboidScheduler {
     public int getBuildLevel() {
         return getCuboidsByLayer().size() - 1;
     }
-    
+
+    public Map<Long, Integer> getCuboidLayerMap() {
+        if (cuboidLayerMap != null){
+            return cuboidLayerMap;
+        }
+        cuboidLayerMap = Maps.newHashMap();
+
+        if (cuboidsByLayer == null) {
+            cuboidsByLayer = getCuboidsByLayer();
+        }
+
+        for (int layerIndex = 0; layerIndex <= getBuildLevel(); layerIndex++){
+            List<Long> layeredCuboids = cuboidsByLayer.get(layerIndex);
+            for (Long cuboidId : layeredCuboids){
+                cuboidLayerMap.put(cuboidId, layerIndex);
+            }
+        }
+
+        int size = getAllCuboidIds().size();
+        int totalNum = cuboidLayerMap.size();
+        Preconditions.checkState(totalNum == size, "total Num: " + totalNum + " actual size: " + size);
+        return cuboidLayerMap;
+    }
+
     /** Returns the key for what this cuboid scheduler responsible for. */
     public String getCuboidCacheKey() {
         return cubeDesc.getName();
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index d7be088..24278c4 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -113,6 +113,7 @@ public class GTRecord implements Comparable<GTRecord> {
     }
 
     /** set record to the codes of specified values, reuse given space to hold the codes */
+    @SuppressWarnings("checkstyle:BooleanExpressionComplexity")
     public GTRecord setValuesParquet(ImmutableBitSet selectedCols, ByteArray space, Object... values) {
         assert selectedCols.cardinality() == values.length;
 
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 66da1b2..33264c8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -107,6 +107,7 @@ public interface BatchConstants {
     String ARG_HBASE_CONF_PATH = "hbaseConfPath";
     String ARG_SHRUNKEN_DICT_PATH = "shrunkenDictPath";
     String ARG_COUNTER_OUPUT = "counterOutput";
+    String ARG_CUBOID_TO_PARTITION_MAPPING = "cuboidToPartitionMapping";
 
     /**
      * logger and counter
diff --git a/engine-spark/src/main/java/org/apache/spark/sql/KylinSession.scala b/engine-spark/src/main/java/org/apache/spark/sql/KylinSession.scala
index 20f86e5..72c4ba5 100644
--- a/engine-spark/src/main/java/org/apache/spark/sql/KylinSession.scala
+++ b/engine-spark/src/main/java/org/apache/spark/sql/KylinSession.scala
@@ -107,7 +107,6 @@ object KylinSession extends Logging {
         sparkContext.addSparkListener(new SparkListener {
           override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
             SparkSession.setDefaultSession(null)
-            SparkSession.sqlListener.set(null)
           }
         })
         UdfManager.create(session)
diff --git a/engine-spark/src/main/java/org/apache/spark/sql/SparderEnv.scala b/engine-spark/src/main/java/org/apache/spark/sql/SparderEnv.scala
index ecaa5c8..209576d 100644
--- a/engine-spark/src/main/java/org/apache/spark/sql/SparderEnv.scala
+++ b/engine-spark/src/main/java/org/apache/spark/sql/SparderEnv.scala
@@ -57,14 +57,6 @@ object SparderEnv extends Logging {
     getSparkSession.sparkContext.conf.get(key)
   }
 
-  def getActiveJobs(): Int = {
-    SparderEnv.getSparkSession.sparkContext.jobProgressListener.activeJobs.size
-  }
-
-  def getFailedJobs(): Int = {
-    SparderEnv.getSparkSession.sparkContext.jobProgressListener.failedJobs.size
-  }
-
   def getAsyncResultCore: Int = {
     val sparkConf = getSparkSession.sparkContext.getConf
     val instances = sparkConf.get("spark.executor.instances").toInt
diff --git a/storage-parquet/pom.xml b/storage-parquet/pom.xml
index 3778031..88ad6f9 100644
--- a/storage-parquet/pom.xml
+++ b/storage-parquet/pom.xml
@@ -103,6 +103,18 @@
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-column</artifactId>
+            <version>1.8.1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-hadoop</artifactId>
+            <version>1.8.1</version>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeSparkRPC.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeSparkRPC.java
index 5009a51..62c294d 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeSparkRPC.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeSparkRPC.java
@@ -73,11 +73,6 @@ public class CubeSparkRPC implements IGTStorage {
 
         JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(cubeSegment, "");
 
-<<<<<<< HEAD
-=======
-        String cubooidRootPath = jobBuilderSupport.getCuboidRootPath();
-
->>>>>>> 198041d63... KYLIN-3625 Init query
         List<List<Long>> layeredCuboids = cubeSegment.getCuboidScheduler().getCuboidsByLayer();
         int level = 0;
         for (List<Long> levelCuboids : layeredCuboids) {
@@ -87,13 +82,8 @@ public class CubeSparkRPC implements IGTStorage {
             level++;
         }
 
-<<<<<<< HEAD
-        String dataFolderName;
         String parquetRootPath = jobBuilderSupport.getParquetOutputPath();
-        dataFolderName = JobBuilderSupport.getCuboidOutputPathsByLevel(parquetRootPath, level) + "/" + cuboid.getId();
-=======
-        String dataFolderName = JobBuilderSupport.getCuboidOutputPathsByLevel(cubooidRootPath, level) + "/" + cuboid.getId();
->>>>>>> 198041d63... KYLIN-3625 Init query
+        String dataFolderName = JobBuilderSupport.getCuboidOutputPathsByLevel(parquetRootPath, level) + "/" + cuboid.getId();
 
         builder.setGtScanRequest(scanRequest.toByteArray()).setGtScanRequestId(scanReqId)
                 .setKylinProperties(KylinConfig.getInstanceFromEnv().exportAllToString())
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetPayload.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetPayload.java
index 9096679..3129b8e 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetPayload.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetPayload.java
@@ -37,6 +37,7 @@ public class ParquetPayload {
     private long startTime;
     private int storageType;
 
+    @SuppressWarnings("checkstyle:ParameterNumber")
     private ParquetPayload(byte[] gtScanRequest, String gtScanRequestId, String kylinProperties, String realizationId,
                            String segmentId, String dataFolderName, int maxRecordLength, List<Integer> parquetColumns,
                            boolean isUseII, String realizationType, String queryId, boolean spillEnabled, long maxScanBytes,
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ConvertToParquetReducer.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ConvertToParquetReducer.java
new file mode 100644
index 0000000..c778ee0
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ConvertToParquetReducer.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.parquet.steps;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.dimension.IDimensionEncodingMap;
+import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
+import org.apache.kylin.engine.mr.KylinReducer;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.parquet.example.data.Group;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Created by Yichen on 11/14/18.
+ */
+public class ConvertToParquetReducer extends KylinReducer<Text, Text, NullWritable, Group> {
+    private ParquetConvertor convertor;
+    private MultipleOutputs<NullWritable, Group> mos;
+    private CubeSegment cubeSegment;
+
+    @Override
+    protected void doSetup(Context context) throws IOException, InterruptedException {
+        Configuration conf = context.getConfiguration();
+        super.bindCurrentConfiguration(conf);
+        mos = new MultipleOutputs(context);
+
+        KylinConfig kylinConfig = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+        String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
+        String segmentId = conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID);
+        CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+        CubeInstance cube = cubeManager.getCube(cubeName);
+        cubeSegment = cube.getSegmentById(segmentId);
+        Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeSegment.getCubeDesc());
+        final IDimensionEncodingMap dimEncMap = cubeSegment.getDimensionEncodingMap();
+        SerializableConfiguration sConf = new SerializableConfiguration(conf);
+
+        Map<TblColRef, String> colTypeMap = Maps.newHashMap();
+        Map<MeasureDesc, String> meaTypeMap = Maps.newHashMap();
+        ParquetConvertor.generateTypeMap(baseCuboid, dimEncMap, cube.getDescriptor(), colTypeMap, meaTypeMap);
+        convertor = new ParquetConvertor(cubeName, segmentId, kylinConfig, sConf, colTypeMap, meaTypeMap);
+    }
+
+    @Override
+    protected void doReduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+        long cuboidId = Bytes.toLong(key.getBytes(), RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN);
+        int layerNumber = cubeSegment.getCuboidScheduler().getCuboidLayerMap().get(cuboidId);
+        int partitionId = context.getTaskAttemptID().getTaskID().getId();
+
+        for (Text value : values) {
+            try {
+                Group group = convertor.parseValueToGroup(key, value);
+                String output = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel("", layerNumber)
+                        + "/" + ParquetJobSteps.getCuboidOutputFileName(cuboidId, partitionId);
+                mos.write(MRCubeParquetJob.BY_LAYER_OUTPUT, null, group, output);
+            } catch (IOException e){
+                throw new IOException(e);
+            }
+        }
+    }
+
+    @Override
+    protected void doCleanup(Context context) throws IOException, InterruptedException {
+        mos.close();
+    }
+}
\ No newline at end of file
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/CuboidToPartitionMapping.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/CuboidToPartitionMapping.java
new file mode 100644
index 0000000..7fcf95d
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/CuboidToPartitionMapping.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.parquet.steps;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Created by Yichen on 11/12/18.
+ */
+public class CuboidToPartitionMapping implements Serializable {
+    private static final Logger logger = LoggerFactory.getLogger(CuboidToPartitionMapping.class);
+
+    private Map<Long, List<Integer>> cuboidPartitions;
+    private int partitionNum;
+
+    public CuboidToPartitionMapping(Map<Long, List<Integer>> cuboidPartitions) {
+        this.cuboidPartitions = cuboidPartitions;
+        int partitions = 0;
+        for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
+            partitions = partitions + entry.getValue().size();
+        }
+        this.partitionNum = partitions;
+    }
+
+    public CuboidToPartitionMapping(CubeSegment cubeSeg, KylinConfig kylinConfig) throws IOException {
+        cuboidPartitions = Maps.newHashMap();
+
+        Set<Long> allCuboidIds = cubeSeg.getCuboidScheduler().getAllCuboidIds();
+
+        CalculatePartitionId(cubeSeg, kylinConfig, allCuboidIds);
+    }
+
+    public CuboidToPartitionMapping(CubeSegment cubeSeg, KylinConfig kylinConfig, int level) throws IOException {
+        cuboidPartitions = Maps.newHashMap();
+
+        List<Long> layeredCuboids = cubeSeg.getCuboidScheduler().getCuboidsByLayer().get(level);
+
+        CalculatePartitionId(cubeSeg, kylinConfig, layeredCuboids);
+    }
+
+    private void CalculatePartitionId(CubeSegment cubeSeg, KylinConfig kylinConfig, Collection<Long> cuboidIds) throws IOException {
+        int position = 0;
+        CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSeg, kylinConfig);
+        for (Long cuboidId : cuboidIds) {
+            int partition = estimateCuboidPartitionNum(cuboidId, cubeStatsReader, kylinConfig);
+            List<Integer> positions = Lists.newArrayListWithCapacity(partition);
+
+            for (int i = position; i < position + partition; i++) {
+                positions.add(i);
+            }
+
+            cuboidPartitions.put(cuboidId, positions);
+            position = position + partition;
+        }
+
+        this.partitionNum = position;
+    }
+
+    public String serialize() throws JsonProcessingException {
+        return JsonUtil.writeValueAsString(cuboidPartitions);
+    }
+
+    public static CuboidToPartitionMapping deserialize(String jsonMapping) throws IOException {
+        Map<Long, List<Integer>> cuboidPartitions = JsonUtil.readValue(jsonMapping, new TypeReference<Map<Long, List<Integer>>>() {});
+        return new CuboidToPartitionMapping(cuboidPartitions);
+    }
+
+    public int getNumPartitions() {
+        return this.partitionNum;
+    }
+
+    public long getCuboidIdByPartition(int partition) {
+        for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
+            if (entry.getValue().contains(partition)) {
+                return entry.getKey();
+            }
+        }
+
+        throw new IllegalArgumentException("No cuboidId for partition id: " + partition);
+    }
+
+    public int getPartitionByKey(byte[] key) {
+        long cuboidId = Bytes.toLong(key, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN);
+        List<Integer> partitions = cuboidPartitions.get(cuboidId);
+        int partitionKey = mod(key, RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN, key.length, partitions.size());
+
+        return partitions.get(partitionKey);
+    }
+
+    private int mod(byte[] src, int start, int end, int total) {
+        int sum = Bytes.hashBytes(src, start, end - start);
+        int mod = sum % total;
+        if (mod < 0)
+            mod += total;
+
+        return mod;
+    }
+
+    public int getPartitionNumForCuboidId(long cuboidId) {
+        return cuboidPartitions.get(cuboidId).size();
+    }
+
+    public String getPartitionFilePrefix(int partition) {
+        long cuboid = getCuboidIdByPartition(partition);
+        int partNum = partition % getPartitionNumForCuboidId(cuboid);
+        String prefix = ParquetJobSteps.getCuboidOutputFileName(cuboid, partNum);
+
+        return prefix;
+    }
+
+    private int estimateCuboidPartitionNum(long cuboidId, CubeStatsReader cubeStatsReader, KylinConfig kylinConfig) {
+        double cuboidSize = cubeStatsReader.estimateCuboidSize(cuboidId);
+        float rddCut = kylinConfig.getParquetFileSizeMB();
+        int partition = (int) (cuboidSize / rddCut);
+        partition = Math.max(kylinConfig.getParquetMinPartitions(), partition);
+        partition = Math.min(kylinConfig.getParquetMaxPartitions(), partition);
+
+        logger.info("cuboid:{}, est_size:{}, partitions:{}", cuboidId, cuboidSize, partition);
+        return partition;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
+            sb.append("cuboidId:").append(entry.getKey()).append(" [").append(StringUtils.join(entry.getValue(), ",")).append("]\n");
+        }
+
+        return sb.toString();
+    }
+}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/MRCubeParquetJob.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/MRCubeParquetJob.java
new file mode 100644
index 0000000..7113a7a
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/MRCubeParquetJob.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.parquet.steps;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.dimension.IDimensionEncodingMap;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.parquet.example.data.Group;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.parquet.hadoop.example.ExampleOutputFormat;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+
+
+/**
+ * Created by Yichen on 11/9/18.
+ */
+public class MRCubeParquetJob extends AbstractHadoopJob {
+
+    protected static final Logger logger = LoggerFactory.getLogger(MRCubeParquetJob.class);
+
+    final static String BY_LAYER_OUTPUT = "ByLayer";
+    private Options options;
+
+    public MRCubeParquetJob(){
+        options = new Options();
+        options.addOption(OPTION_JOB_NAME);
+        options.addOption(OPTION_CUBE_NAME);
+        options.addOption(OPTION_SEGMENT_ID);
+        options.addOption(OPTION_INPUT_PATH);
+        options.addOption(OPTION_OUTPUT_PATH);
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+        parseOptions(options, args);
+
+        final Path inputPath = new Path(getOptionValue(OPTION_INPUT_PATH));
+        final Path outputPath = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+        final String cubeName = getOptionValue(OPTION_CUBE_NAME);
+        logger.info("CubeName: ", cubeName);
+        final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+        CubeInstance cube = cubeManager.getCube(cubeName);
+
+        CubeSegment cubeSegment = cube.getSegmentById(segmentId);
+        logger.info("Input path: {}", inputPath);
+        logger.info("Output path: {}", outputPath);
+
+        job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+        setJobClasspath(job, cube.getConfig());
+        CuboidToPartitionMapping cuboidToPartitionMapping= new CuboidToPartitionMapping(cubeSegment, kylinConfig);
+        String jsonStr = cuboidToPartitionMapping.serialize();
+        logger.info("Total Partition: {}", cuboidToPartitionMapping.getNumPartitions());
+
+        final IDimensionEncodingMap dimEncMap = cubeSegment.getDimensionEncodingMap();
+
+        Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeSegment.getCubeDesc());
+
+        MessageType schema = ParquetConvertor.cuboidToMessageType(baseCuboid, dimEncMap, cubeSegment.getCubeDesc());
+        logger.info("Schema: {}", schema.toString());
+
+        try {
+
+            job.getConfiguration().set(BatchConstants.ARG_CUBOID_TO_PARTITION_MAPPING, jsonStr);
+
+
+            addInputDirs(inputPath.toString(), job);
+            FileOutputFormat.setOutputPath(job, outputPath);
+
+            job.setJobName("Converting Parquet File for: " + cubeName + " segment " + segmentId);
+            job.setInputFormatClass(SequenceFileInputFormat.class);
+            job.setMapOutputKeyClass(Text.class);
+            job.setMapOutputValueClass(Text.class);
+
+            job.setPartitionerClass(CuboidPartitioner.class);
+
+            job.setOutputKeyClass(NullWritable.class);
+            job.setOutputValueClass(Group.class);
+            job.setReducerClass(ConvertToParquetReducer.class);
+            job.setNumReduceTasks(cuboidToPartitionMapping.getNumPartitions());
+
+            MultipleOutputs.addNamedOutput(job, BY_LAYER_OUTPUT, ExampleOutputFormat.class, NullWritable.class, Group.class);
+
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentId);
+            ExampleOutputFormat.setSchema(job, schema);
+            LazyOutputFormat.setOutputFormatClass(job, ExampleOutputFormat.class);
+            attachCubeMetadataWithDict(cube, job.getConfiguration());
+
+            this.deletePath(job.getConfiguration(), outputPath);
+            FileOutputFormat.setOutputPath(job, outputPath);
+            return waitForCompletion(job);
+
+        } finally {
+            if (job != null) {
+                cleanupTempConfFile(job.getConfiguration());
+            }
+        }
+    }
+
+    public static class CuboidPartitioner extends Partitioner<Text, Text> implements Configurable {
+        private Configuration conf;
+        private CuboidToPartitionMapping mapping;
+
+        @Override
+        public void setConf(Configuration configuration) {
+            this.conf = configuration;
+            try {
+                mapping = CuboidToPartitionMapping.deserialize(conf.get(BatchConstants.ARG_CUBOID_TO_PARTITION_MAPPING));
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public int getPartition(Text key, Text value, int numPartitions) {
+            return mapping.getPartitionByKey(key.getBytes());
+        }
+
+        @Override
+        public Configuration getConf() {
+            return conf;
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        MRCubeParquetJob job = new MRCubeParquetJob();
+        int exitCode = ToolRunner.run(job, args);
+        System.exit(exitCode);
+    }
+}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetConvertor.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetConvertor.java
new file mode 100644
index 0000000..9b9578d
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetConvertor.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.parquet.steps;
+
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowKeyDecoder;
+import org.apache.kylin.cube.kv.RowKeyDecoderParquet;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dimension.AbstractDateDimEnc;
+import org.apache.kylin.dimension.DimensionEncoding;
+import org.apache.kylin.dimension.FixedLenDimEnc;
+import org.apache.kylin.dimension.FixedLenHexDimEnc;
+import org.apache.kylin.dimension.IDimensionEncodingMap;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.basic.BasicMeasureType;
+import org.apache.kylin.measure.basic.BigDecimalIngester;
+import org.apache.kylin.measure.basic.DoubleIngester;
+import org.apache.kylin.measure.basic.LongIngester;
+import org.apache.kylin.metadata.datatype.BigDecimalSerializer;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.GroupFactory;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.io.api.Binary;
+
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by Yichen on 11/9/18.
+ */
+public class ParquetConvertor {
+    private static final Logger logger = LoggerFactory.getLogger(ParquetConvertor.class);
+
+    public static final String FIELD_CUBOID_ID = "cuboidId";
+    public static final String DATATYPE_DECIMAL = "decimal";
+    public static final String DATATYPE_INT = "int";
+    public static final String DATATYPE_LONG = "long";
+    public static final String DATATYPE_DOUBLE = "double";
+    public static final String DATATYPE_STRING = "string";
+    public static final String DATATYPE_BINARY = "binary";
+
+    private RowKeyDecoder decoder;
+    private BufferedMeasureCodec measureCodec;
+    private Map<TblColRef, String> colTypeMap;
+    private Map<MeasureDesc, String> meaTypeMap;
+    private BigDecimalSerializer serializer;
+    private GroupFactory factory;
+    private List<MeasureDesc> measureDescs;
+
+    public ParquetConvertor(String cubeName, String segmentId, KylinConfig kConfig, SerializableConfiguration sConf, Map<TblColRef, String> colTypeMap, Map<MeasureDesc, String> meaTypeMap){
+        KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
+
+        this.colTypeMap = colTypeMap;
+        this.meaTypeMap = meaTypeMap;
+        serializer = new BigDecimalSerializer(DataType.getType(DATATYPE_DECIMAL));
+
+        CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
+        CubeDesc cubeDesc = cubeInstance.getDescriptor();
+        CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+        measureDescs = cubeDesc.getMeasures();
+        decoder = new RowKeyDecoderParquet(cubeSegment);
+        factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(sConf.get()));
+        measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures());
+    }
+
+    protected Group parseValueToGroup(Text rawKey, Text rawValue) throws IOException{
+        Group group = factory.newGroup();
+
+        long cuboidId = decoder.decode(rawKey.getBytes());
+        List<String> values = decoder.getValues();
+        List<TblColRef> columns = decoder.getColumns();
+
+        // for check
+        group.append(FIELD_CUBOID_ID, cuboidId);
+
+        for (int i = 0; i < columns.size(); i++) {
+            TblColRef column = columns.get(i);
+            parseColValue(group, column, values.get(i));
+        }
+
+        int[] valueLengths = measureCodec.getCodec().getPeekLength(ByteBuffer.wrap(rawValue.getBytes()));
+
+        int valueOffset = 0;
+        for (int i = 0; i < valueLengths.length; ++i) {
+            MeasureDesc measureDesc = measureDescs.get(i);
+            parseMeaValue(group, measureDesc, rawValue.getBytes(), valueOffset, valueLengths[i]);
+            valueOffset += valueLengths[i];
+        }
+
+        return group;
+    }
+
+    private void parseColValue(final Group group, final TblColRef colRef, final String value) {
+        if (value==null) {
+            logger.error("value is null");
+            return;
+        }
+        switch (colTypeMap.get(colRef)) {
+            case DATATYPE_INT:
+                group.append(colRef.getTableAlias() + "_" + colRef.getName(), Integer.valueOf(value));
+                break;
+            case DATATYPE_LONG:
+                group.append(colRef.getTableAlias() + "_" + colRef.getName(), Long.valueOf(value));
+                break;
+            default:
+                group.append(colRef.getTableAlias() + "_" + colRef.getName(), Binary.fromString(value));
+                break;
+        }
+    }
+
+    private void parseMeaValue(final Group group, final MeasureDesc measureDesc, final byte[] value, final int offset, final int length) throws IOException {
+        if (value==null) {
+            logger.error("value is null");
+            return;
+        }
+        switch (meaTypeMap.get(measureDesc)) {
+            case DATATYPE_LONG:
+                group.append(measureDesc.getName(), BytesUtil.readVLong(ByteBuffer.wrap(value, offset, length)));
+                break;
+            case DATATYPE_DOUBLE:
+                group.append(measureDesc.getName(), ByteBuffer.wrap(value, offset, length).getDouble());
+                break;
+            case DATATYPE_DECIMAL:
+                BigDecimal decimal = serializer.deserialize(ByteBuffer.wrap(value, offset, length));
+                decimal = decimal.setScale(4);
+                group.append(measureDesc.getName(), Binary.fromByteArray(decimal.unscaledValue().toByteArray()));
+                break;
+            default:
+                group.append(measureDesc.getName(), Binary.fromByteArray(value, offset, length));
+                break;
+        }
+    }
+
+    protected static MessageType cuboidToMessageType(Cuboid cuboid, IDimensionEncodingMap dimEncMap, CubeDesc cubeDesc) {
+        Types.MessageTypeBuilder builder = Types.buildMessage();
+
+        List<TblColRef> colRefs = cuboid.getColumns();
+
+        builder.optional(PrimitiveType.PrimitiveTypeName.INT64).named(FIELD_CUBOID_ID);
+
+        for (TblColRef colRef : colRefs) {
+            DimensionEncoding dimEnc = dimEncMap.get(colRef);
+            colToMessageType(dimEnc, colRef, builder);
+        }
+
+        MeasureIngester[] aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures());
+
+        for (int i = 0; i < cubeDesc.getMeasures().size(); i++) {
+            MeasureDesc measureDesc = cubeDesc.getMeasures().get(i);
+            DataType meaDataType = measureDesc.getFunction().getReturnDataType();
+            MeasureType measureType = measureDesc.getFunction().getMeasureType();
+
+            meaColToMessageType(measureType, measureDesc.getName(), meaDataType, aggrIngesters[i], builder);
+        }
+
+        return builder.named(String.valueOf(cuboid.getId()));
+    }
+
+    protected static void generateTypeMap(Cuboid cuboid, IDimensionEncodingMap dimEncMap, CubeDesc cubeDesc, Map<TblColRef, String> colTypeMap, Map<MeasureDesc, String> meaTypeMap){
+        List<TblColRef> colRefs = cuboid.getColumns();
+
+        for (TblColRef colRef : colRefs) {
+            DimensionEncoding dimEnc = dimEncMap.get(colRef);
+            addColTypeMap(dimEnc, colRef, colTypeMap);
+        }
+
+        MeasureIngester[] aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures());
+
+        for (int i = 0; i < cubeDesc.getMeasures().size(); i++) {
+            MeasureDesc measureDesc = cubeDesc.getMeasures().get(i);
+            MeasureType measureType = measureDesc.getFunction().getMeasureType();
+            addMeaColTypeMap(measureType, measureDesc, aggrIngesters[i], meaTypeMap);
+        }
+    }
+    private static String getColName(TblColRef colRef) {
+        return colRef.getTableAlias() + "_" + colRef.getName();
+    }
+
+    private static void addColTypeMap(DimensionEncoding dimEnc, TblColRef colRef, Map<TblColRef, String> colTypeMap) {
+        if (dimEnc instanceof AbstractDateDimEnc) {
+            colTypeMap.put(colRef, DATATYPE_LONG);
+        } else if (dimEnc instanceof FixedLenDimEnc || dimEnc instanceof FixedLenHexDimEnc) {
+            DataType colDataType = colRef.getType();
+            if (colDataType.isNumberFamily() || colDataType.isDateTimeFamily()){
+                colTypeMap.put(colRef, DATATYPE_LONG);
+            } else {
+                // stringFamily && default
+                colTypeMap.put(colRef, DATATYPE_STRING);
+            }
+        } else {
+            colTypeMap.put(colRef, DATATYPE_INT);
+        }
+    }
+
+    private static Map<MeasureDesc, String> addMeaColTypeMap(MeasureType measureType, MeasureDesc measureDesc, MeasureIngester aggrIngester, Map<MeasureDesc, String> meaTypeMap) {
+        if (measureType instanceof BasicMeasureType) {
+            MeasureIngester measureIngester = aggrIngester;
+            if (measureIngester instanceof LongIngester) {
+                meaTypeMap.put(measureDesc, DATATYPE_LONG);
+            } else if (measureIngester instanceof DoubleIngester) {
+                meaTypeMap.put(measureDesc, DATATYPE_DOUBLE);
+            } else if (measureIngester instanceof BigDecimalIngester) {
+                meaTypeMap.put(measureDesc, DATATYPE_DECIMAL);
+            } else {
+                meaTypeMap.put(measureDesc, DATATYPE_BINARY);
+            }
+        } else {
+            meaTypeMap.put(measureDesc, DATATYPE_BINARY);
+        }
+        return meaTypeMap;
+    }
+
+    private static void colToMessageType(DimensionEncoding dimEnc, TblColRef colRef, Types.MessageTypeBuilder builder) {
+        if (dimEnc instanceof AbstractDateDimEnc) {
+            builder.optional(PrimitiveType.PrimitiveTypeName.INT64).named(getColName(colRef));
+        } else if (dimEnc instanceof FixedLenDimEnc || dimEnc instanceof FixedLenHexDimEnc) {
+            DataType colDataType = colRef.getType();
+            if (colDataType.isNumberFamily() || colDataType.isDateTimeFamily()){
+                builder.optional(PrimitiveType.PrimitiveTypeName.INT64).named(getColName(colRef));
+            } else {
+                // stringFamily && default
+                builder.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(getColName(colRef));
+            }
+        } else {
+            builder.optional(PrimitiveType.PrimitiveTypeName.INT32).named(getColName(colRef));
+        }
+    }
+
+    private static void meaColToMessageType(MeasureType measureType, String meaDescName, DataType meaDataType, MeasureIngester aggrIngester, Types.MessageTypeBuilder builder) {
+        if (measureType instanceof BasicMeasureType) {
+            MeasureIngester measureIngester = aggrIngester;
+            if (measureIngester instanceof LongIngester) {
+                builder.required(PrimitiveType.PrimitiveTypeName.INT64).named(meaDescName);
+            } else if (measureIngester instanceof DoubleIngester) {
+                builder.required(PrimitiveType.PrimitiveTypeName.DOUBLE).named(meaDescName);
+            } else if (measureIngester instanceof BigDecimalIngester) {
+                builder.required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.DECIMAL).precision(meaDataType.getPrecision()).scale(meaDataType.getScale()).named(meaDescName);
+            } else {
+                builder.required(PrimitiveType.PrimitiveTypeName.BINARY).named(meaDescName);
+            }
+        } else {
+            builder.required(PrimitiveType.PrimitiveTypeName.BINARY).named(meaDescName);
+        }
+    }
+}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetJobSteps.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetJobSteps.java
index b47a03a..625737a 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetJobSteps.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetJobSteps.java
@@ -32,6 +32,9 @@ public abstract class ParquetJobSteps extends JobBuilderSupport {
         super(seg, null);
     }
 
-
-    abstract public AbstractExecutable createConvertToParquetStep(String jobId);
+    public static String getCuboidOutputFileName(long cuboid, int partNum) {
+        String fileName = "cuboid_" + cuboid + "_part" + partNum;
+        return fileName;
+    }
+    abstract public AbstractExecutable convertToParquetStep(String jobId);
 }
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMROutput.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMROutput.java
index fe85e24..f54a7a0 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMROutput.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMROutput.java
@@ -26,11 +26,18 @@ import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.MapReduceUtil;
+import org.apache.kylin.engine.mr.steps.HiveToBaseCuboidMapper;
+import org.apache.kylin.engine.mr.steps.InMemCuboidMapper;
+import org.apache.kylin.engine.mr.steps.NDCuboidMapper;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.model.IEngineAware;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+import java.util.Map;
+
 
 /**
  * Created by Yichen on 10/16/18.
@@ -42,14 +49,7 @@ public class ParquetMROutput implements IMROutput2 {
     @Override
     public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(CubeSegment seg) {
 
-        boolean useSpark = seg.getCubeDesc().getEngineType() == IEngineAware.ID_SPARK;
-
-
-        // TODO need refactor
-        if (!useSpark){
-            throw new RuntimeException("Cannot adapt to MR engine");
-        }
-        final ParquetJobSteps steps = new ParquetSparkSteps(seg);
+        final ParquetJobSteps steps = new ParquetMRSteps(seg);
 
         return new IMRBatchCubingOutputSide2() {
 
@@ -59,6 +59,7 @@ public class ParquetMROutput implements IMROutput2 {
 
             @Override
             public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
+                jobFlow.addTask(steps.convertToParquetStep(jobFlow.getId()));
             }
 
             @Override
@@ -83,21 +84,76 @@ public class ParquetMROutput implements IMROutput2 {
         @Override
         public void configureJobOutput(Job job, String output, CubeSegment segment, CuboidScheduler cuboidScheduler,
                                        int level) throws Exception {
+            int reducerNum = 1;
+            Class mapperClass = job.getMapperClass();
+
+            //allow user specially set config for base cuboid step
+            if (mapperClass == HiveToBaseCuboidMapper.class) {
+                for (Map.Entry<String, String> entry : segment.getConfig().getBaseCuboidMRConfigOverride().entrySet()) {
+                    job.getConfiguration().set(entry.getKey(), entry.getValue());
+                }
+            }
 
+            if (mapperClass == HiveToBaseCuboidMapper.class || mapperClass == NDCuboidMapper.class) {
+                reducerNum = MapReduceUtil.getLayeredCubingReduceTaskNum(segment, cuboidScheduler,
+                        AbstractHadoopJob.getTotalMapInputMB(job), level);
+            } else if (mapperClass == InMemCuboidMapper.class) {
+                reducerNum = MapReduceUtil.getInmemCubingReduceTaskNum(segment, cuboidScheduler);
+            }
             Path outputPath = new Path(output);
             FileOutputFormat.setOutputPath(job, outputPath);
             job.setOutputFormatClass(SequenceFileOutputFormat.class);
+            job.setNumReduceTasks(reducerNum);
             HadoopUtil.deletePath(job.getConfiguration(), outputPath);
         }
     }
 
     @Override
     public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(CubeSegment seg) {
-        return null;
+        final ParquetJobSteps steps = new ParquetMRSteps(seg);
+        return new IMRBatchMergeOutputSide2() {
+            @Override
+            public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+            }
+
+            @Override
+            public void addStepPhase2_BuildCube(CubeSegment set, List<CubeSegment> mergingSegments, DefaultChainedExecutable jobFlow) {
+                jobFlow.addTask(steps.convertToParquetStep(jobFlow.getId()));
+            }
+
+            @Override
+            public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) {
+            }
+
+            @Override
+            public IMRMergeOutputFormat getOuputFormat() {
+                return null;
+            }
+        };
     }
 
     @Override
     public IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide(CubeSegment seg) {
-        return null;
+        final ParquetJobSteps steps = new ParquetMRSteps(seg);
+
+        return new IMRBatchOptimizeOutputSide2() {
+            @Override
+            public void addStepPhase2_CreateHTable(DefaultChainedExecutable jobFlow) {
+            }
+
+            @Override
+            public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
+                jobFlow.addTask(steps.convertToParquetStep(jobFlow.getId()));
+            }
+
+            @Override
+            public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+            }
+
+            @Override
+            public void addStepPhase5_Cleanup(DefaultChainedExecutable jobFlow) {
+            }
+
+        };
     }
 }
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMRSteps.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMRSteps.java
new file mode 100644
index 0000000..829f074
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMRSteps.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.storage.parquet.steps;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.AbstractExecutable;
+
+/**
+ * Created by Yichen on 11/8/18.
+ */
+public class ParquetMRSteps extends ParquetJobSteps{
+    public ParquetMRSteps(CubeSegment seg) {
+        super(seg);
+    }
+
+    @Override
+    public AbstractExecutable convertToParquetStep(String jobId) {
+        String cuboidRootPath = getCuboidRootPath(jobId);
+        String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/") + "*";
+
+        final MapReduceExecutable mrExecutable = new MapReduceExecutable();
+        mrExecutable.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_PARQUET);
+        mrExecutable.setMapReduceJobClass(MRCubeParquetJob.class);
+        StringBuilder cmd = new StringBuilder();
+
+        appendMapReduceParameters(cmd);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getParquetOutputPath(jobId));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Parquet_Generator_" + seg.getRealization().getName()+ "_Step");
+
+        mrExecutable.setMapReduceParams(cmd.toString());
+        mrExecutable.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_PARQUET);
+        mrExecutable.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
+
+        return mrExecutable;
+    }
+
+}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkOutput.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkOutput.java
index 176afd0..794ede4 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkOutput.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkOutput.java
@@ -36,7 +36,7 @@ public class ParquetSparkOutput implements ISparkOutput {
 
             @Override
             public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
-                jobFlow.addTask(steps.createConvertToParquetStep(jobFlow.getId()));
+                jobFlow.addTask(steps.convertToParquetStep(jobFlow.getId()));
 
             }
 
@@ -58,7 +58,7 @@ public class ParquetSparkOutput implements ISparkOutput {
 
             @Override
             public void addStepPhase2_BuildCube(CubeSegment set, List<CubeSegment> mergingSegments, DefaultChainedExecutable jobFlow) {
-                jobFlow.addTask(steps.createConvertToParquetStep(jobFlow.getId()));
+                jobFlow.addTask(steps.convertToParquetStep(jobFlow.getId()));
 
             }
 
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkSteps.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkSteps.java
index 296bc68..65bd30a 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkSteps.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkSteps.java
@@ -35,7 +35,7 @@ public class ParquetSparkSteps extends ParquetJobSteps {
     }
 
     @Override
-    public AbstractExecutable createConvertToParquetStep(String jobId) {
+    public AbstractExecutable convertToParquetStep(String jobId) {
 
         String cuboidRootPath = getCuboidRootPath(jobId);
         String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/");
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/SparkCubeParquet.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/SparkCubeParquet.java
index def4d8d..4ad7cb3 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/SparkCubeParquet.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/SparkCubeParquet.java
@@ -17,14 +17,10 @@
  */
 package org.apache.kylin.storage.parquet.steps;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -34,54 +30,26 @@ import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractApplication;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.kv.RowKeyDecoder;
-import org.apache.kylin.cube.kv.RowKeyDecoderParquet;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dimension.AbstractDateDimEnc;
-import org.apache.kylin.dimension.DimensionEncoding;
-import org.apache.kylin.dimension.FixedLenDimEnc;
-import org.apache.kylin.dimension.FixedLenHexDimEnc;
 import org.apache.kylin.dimension.IDimensionEncodingMap;
 import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.engine.mr.common.SerializableConfiguration;
 import org.apache.kylin.engine.spark.KylinSparkJobListener;
 import org.apache.kylin.engine.spark.SparkUtil;
 import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.measure.BufferedMeasureCodec;
-import org.apache.kylin.measure.MeasureIngester;
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.measure.basic.BasicMeasureType;
-import org.apache.kylin.measure.basic.BigDecimalIngester;
-import org.apache.kylin.measure.basic.DoubleIngester;
-import org.apache.kylin.measure.basic.LongIngester;
-import org.apache.kylin.metadata.datatype.BigDecimalSerializer;
-import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.parquet.example.data.Group;
-import org.apache.parquet.example.data.GroupFactory;
-import org.apache.parquet.example.data.simple.SimpleGroupFactory;
 import org.apache.parquet.hadoop.ParquetOutputFormat;
 import org.apache.parquet.hadoop.example.GroupWriteSupport;
-import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.OriginalType;
-import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.Types;
 import org.apache.spark.Partitioner;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -93,9 +61,6 @@ import scala.Tuple2;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.util.List;
 import java.util.Map;
 
 
@@ -172,11 +137,26 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
             logger.info("Input path: {}", inputPath);
             logger.info("Output path: {}", outputPath);
 
+            final Map<TblColRef, String> colTypeMap = Maps.newHashMap();
+            final Map<MeasureDesc, String> meaTypeMap = Maps.newHashMap();
+
+            final IDimensionEncodingMap dimEncMap = cubeSegment.getDimensionEncodingMap();
+
+            Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeSegment.getCubeDesc());
+            MessageType schema = ParquetConvertor.cuboidToMessageType(baseCuboid, dimEncMap, cubeSegment.getCubeDesc());
+            ParquetConvertor.generateTypeMap(baseCuboid, dimEncMap, cubeSegment.getCubeDesc(), colTypeMap, meaTypeMap);
+            GroupWriteSupport.setSchema(schema, job.getConfiguration());
+
+            GenerateGroupRDDFunction groupPairFunction = new GenerateGroupRDDFunction(cubeName, cubeSegment.getUuid(), metaUrl, new SerializableConfiguration(job.getConfiguration()), colTypeMap, meaTypeMap);
+
+
+            logger.info("Schema: {}", schema.toString());
+
             // Read from cuboid and save to parquet
             for (int level = 0; level <= totalLevels; level++) {
                 String cuboidPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(inputPath, level);
                 allRDDs[level] = SparkUtil.parseInputPath(cuboidPath, fs, sc, Text.class, Text.class);
-                saveToParquet(allRDDs[level], metaUrl, cubeName, cubeSegment, outputPath, level, job, envConfig);
+                saveToParquet(allRDDs[level], groupPairFunction, cubeSegment, outputPath, level, job, envConfig);
             }
 
             logger.info("HDFS: Number of bytes written={}", jobListener.metrics.getBytesWritten());
@@ -190,45 +170,28 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
 
     }
 
-    protected void saveToParquet(JavaPairRDD<Text, Text> rdd, String metaUrl, String cubeName, CubeSegment cubeSeg, String parquetOutput, int level, Job job, KylinConfig kylinConfig) throws Exception {
-        final IDimensionEncodingMap dimEncMap = cubeSeg.getDimensionEncodingMap();
-
-        Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeSeg.getCubeDesc());
-
-        final Map<TblColRef, String> colTypeMap = Maps.newHashMap();
-        final Map<MeasureDesc, String> meaTypeMap = Maps.newHashMap();
-
-        MessageType schema = cuboidToMessageType(baseCuboid, dimEncMap, cubeSeg.getCubeDesc(), colTypeMap, meaTypeMap);
-
-        logger.info("Schema: {}", schema.toString());
-
+    protected void saveToParquet(JavaPairRDD<Text, Text> rdd, GenerateGroupRDDFunction groupRDDFunction, CubeSegment cubeSeg, String parquetOutput, int level, Job job, KylinConfig kylinConfig) throws IOException {
         final CuboidToPartitionMapping cuboidToPartitionMapping = new CuboidToPartitionMapping(cubeSeg, kylinConfig, level);
 
         logger.info("CuboidToPartitionMapping: {}", cuboidToPartitionMapping.toString());
 
-        JavaPairRDD<Text, Text> repartitionedRDD = rdd.partitionBy(new CuboidPartitioner(cuboidToPartitionMapping));
-
         String output = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(parquetOutput, level);
 
         job.setOutputFormatClass(CustomParquetOutputFormat.class);
-        GroupWriteSupport.setSchema(schema, job.getConfiguration());
         CustomParquetOutputFormat.setOutputPath(job, new Path(output));
         CustomParquetOutputFormat.setWriteSupportClass(job, GroupWriteSupport.class);
         CustomParquetOutputFormat.setCuboidToPartitionMapping(job, cuboidToPartitionMapping);
 
-        JavaPairRDD<Void, Group> groupRDD = rdd.partitionBy(new CuboidPartitioner(cuboidToPartitionMapping))
-                                                .mapToPair(new GenerateGroupRDDFunction(cubeName, cubeSeg.getUuid(), metaUrl, new SerializableConfiguration(job.getConfiguration()), colTypeMap, meaTypeMap));
+        JavaPairRDD<Void, Group> groupRDD = rdd.partitionBy(new CuboidPartitioner(cuboidToPartitionMapping)).mapToPair(groupRDDFunction);
 
         groupRDD.saveAsNewAPIHadoopDataset(job.getConfiguration());
     }
 
     static class CuboidPartitioner extends Partitioner {
         private CuboidToPartitionMapping mapping;
-        private boolean enableSharding;
 
-        public CuboidPartitioner(CuboidToPartitionMapping cuboidToPartitionMapping, boolean enableSharding) {
+        public CuboidPartitioner(CuboidToPartitionMapping cuboidToPartitionMapping) {
             this.mapping = cuboidToPartitionMapping;
-            this.enableSharding =enableSharding;
         }
 
         @Override
@@ -239,121 +202,11 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
         @Override
         public int getPartition(Object key) {
             Text textKey = (Text)key;
-            return mapping.getPartitionForCuboidId(textKey.getBytes());
-        }
-    }
-
-    public static class CuboidToPartitionMapping implements Serializable {
-        private Map<Long, List<Integer>> cuboidPartitions;
-        private int partitionNum;
-
-        public CuboidToPartitionMapping(Map<Long, List<Integer>> cuboidPartitions) {
-            this.cuboidPartitions = cuboidPartitions;
-            int partitions = 0;
-            for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
-                partitions = partitions + entry.getValue().size();
-            }
-            this.partitionNum = partitions;
-        }
-
-        public CuboidToPartitionMapping(CubeSegment cubeSeg, KylinConfig kylinConfig, int level) throws IOException {
-            cuboidPartitions = Maps.newHashMap();
-
-            List<Long> layeredCuboids = cubeSeg.getCuboidScheduler().getCuboidsByLayer().get(level);
-            CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSeg, kylinConfig);
-
-            int position = 0;
-            for (Long cuboidId : layeredCuboids) {
-                int partition = estimateCuboidPartitionNum(cuboidId, cubeStatsReader, kylinConfig);
-                List<Integer> positions = Lists.newArrayListWithCapacity(partition);
-
-                for (int i = position; i < position + partition; i++) {
-                    positions.add(i);
-                }
-
-                cuboidPartitions.put(cuboidId, positions);
-                position = position + partition;
-            }
-
-            this.partitionNum = position;
-        }
-
-        public String serialize() throws JsonProcessingException {
-            return JsonUtil.writeValueAsString(cuboidPartitions);
-        }
-
-        public static CuboidToPartitionMapping deserialize(String jsonMapping) throws IOException {
-            Map<Long, List<Integer>> cuboidPartitions = JsonUtil.readValue(jsonMapping, new TypeReference<Map<Long, List<Integer>>>() {});
-            return new CuboidToPartitionMapping(cuboidPartitions);
-        }
-
-        public int getNumPartitions() {
-            return this.partitionNum;
-        }
-
-        public long getCuboidIdByPartition(int partition) {
-            for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
-                if (entry.getValue().contains(partition)) {
-                    return entry.getKey();
-                }
-            }
-
-            throw new IllegalArgumentException("No cuboidId for partition id: " + partition);
-        }
-
-        public int getPartitionForCuboidId(byte[] key) {
-            long cuboidId = Bytes.toLong(key, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN);
-            List<Integer> partitions = cuboidPartitions.get(cuboidId);
-            int partitionKey = mod(key, RowConstants.ROWKEY_COL_DEFAULT_LENGTH, key.length, partitions.size());
-
-            return partitions.get(partitionKey);
-        }
-
-        private int mod(byte[] src, int start, int end, int total) {
-            int sum = Bytes.hashBytes(src, start, end - start);
-            int mod = sum % total;
-            if (mod < 0)
-                mod += total;
-
-            return mod;
-        }
-
-        public int getPartitionNumForCuboidId(long cuboidId) {
-            return cuboidPartitions.get(cuboidId).size();
-        }
-
-        public String getPartitionFilePrefix(int partition) {
-            String prefix = "cuboid_";
-            long cuboid = getCuboidIdByPartition(partition);
-            int partNum = partition % getPartitionNumForCuboidId(cuboid);
-            prefix = prefix + cuboid + "_part" + partNum;
-
-            return prefix;
-        }
-
-        private int estimateCuboidPartitionNum(long cuboidId, CubeStatsReader cubeStatsReader, KylinConfig kylinConfig) {
-            double cuboidSize = cubeStatsReader.estimateCuboidSize(cuboidId);
-            float rddCut = kylinConfig.getSparkRDDPartitionCutMB();
-            int partition = (int) (cuboidSize / (rddCut * 10));
-            partition = Math.max(kylinConfig.getSparkMinPartition(), partition);
-            partition = Math.min(kylinConfig.getSparkMaxPartition(), partition);
-            logger.info("cuboid:{}, est_size:{}, partitions:{}", cuboidId, cuboidSize, partition);
-            return partition;
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder sb = new StringBuilder();
-            for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
-                sb.append("cuboidId:").append(entry.getKey()).append(" [").append(StringUtils.join(entry.getValue(), ",")).append("]\n");
-            }
-
-            return sb.toString();
+            return mapping.getPartitionByKey(textKey.getBytes());
         }
     }
 
     public static class CustomParquetOutputFormat extends ParquetOutputFormat {
-        public static final String CUBOID_TO_PARTITION_MAPPING = "cuboidToPartitionMapping";
 
         @Override
         public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException {
@@ -361,7 +214,7 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
             TaskID taskId = context.getTaskAttemptID().getTaskID();
             int partition = taskId.getId();
 
-            CuboidToPartitionMapping mapping = CuboidToPartitionMapping.deserialize(context.getConfiguration().get(CUBOID_TO_PARTITION_MAPPING));
+            CuboidToPartitionMapping mapping = CuboidToPartitionMapping.deserialize(context.getConfiguration().get(BatchConstants.ARG_CUBOID_TO_PARTITION_MAPPING));
 
             return new Path(committer.getWorkPath(), getUniqueFile(context, mapping.getPartitionFilePrefix(partition)+ "-" + getOutputName(context), extension));
         }
@@ -369,7 +222,7 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
         public static void setCuboidToPartitionMapping(Job job, CuboidToPartitionMapping cuboidToPartitionMapping) throws IOException {
             String jsonStr = cuboidToPartitionMapping.serialize();
 
-            job.getConfiguration().set(CUBOID_TO_PARTITION_MAPPING, jsonStr);
+            job.getConfiguration().set(BatchConstants.ARG_CUBOID_TO_PARTITION_MAPPING, jsonStr);
         }
     }
 
@@ -379,14 +232,10 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
         private String segmentId;
         private String metaUrl;
         private SerializableConfiguration conf;
-        private List<MeasureDesc> measureDescs;
-        private RowKeyDecoder decoder;
         private Map<TblColRef, String> colTypeMap;
         private Map<MeasureDesc, String> meaTypeMap;
-        private GroupFactory factory;
-        private BufferedMeasureCodec measureCodec;
-        private BigDecimalSerializer serializer;
-        private int count = 0;
+
+        private transient ParquetConvertor convertor;
 
         public GenerateGroupRDDFunction(String cubeName, String segmentId, String metaurl, SerializableConfiguration conf, Map<TblColRef, String> colTypeMap, Map<MeasureDesc, String> meaTypeMap) {
             this.cubeName = cubeName;
@@ -398,17 +247,8 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
         }
 
         private void init() {
-            KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
-            KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
-            CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
-            CubeDesc cubeDesc = cubeInstance.getDescriptor();
-            CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
-            measureDescs = cubeDesc.getMeasures();
-            decoder = new RowKeyDecoderParquet(cubeSegment);
-            factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(conf.get()));
-            measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures());
-            serializer = new BigDecimalSerializer(DataType.getType("decimal"));
-            initialized = true;
+            KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            convertor = new ParquetConvertor(cubeName, segmentId, kylinConfig, conf, colTypeMap, meaTypeMap);
         }
 
         @Override
@@ -421,133 +261,9 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
                     }
                 }
             }
-
-            long cuboid = decoder.decode4Parquet(tuple._1.getBytes());
-            List<String> values = decoder.getValues();
-            List<TblColRef> columns = decoder.getColumns();
-
-            Group group = factory.newGroup();
-
-            // for check
-            group.append("cuboidId", cuboid);
-
-            for (int i = 0; i < columns.size(); i++) {
-                TblColRef column = columns.get(i);
-                parseColValue(group, column, values.get(i));
-            }
-
-            count ++;
-
-            byte[] encodedBytes = tuple._2().getBytes();
-            int[] valueLengths = measureCodec.getCodec().getPeekLength(ByteBuffer.wrap(encodedBytes));
-
-            int valueOffset = 0;
-            for (int i = 0; i < valueLengths.length; ++i) {
-                MeasureDesc measureDesc = measureDescs.get(i);
-                parseMeaValue(group, measureDesc, encodedBytes, valueOffset, valueLengths[i]);
-                valueOffset += valueLengths[i];
-            }
+            Group group = convertor.parseValueToGroup(tuple._1(), tuple._2());
 
             return new Tuple2<>(null, group);
         }
-
-        private void parseColValue(final Group group, final TblColRef colRef, final String value) {
-            if (value==null) {
-                logger.info("value is null");
-                return;
-            }
-            switch (colTypeMap.get(colRef)) {
-                case "int":
-                    group.append(colRef.getTableAlias() + "_" + colRef.getName(), Integer.valueOf(value));
-                    break;
-                case "long":
-                    group.append(colRef.getTableAlias() + "_" + colRef.getName(), Long.valueOf(value));
-                    break;
-                default:
-                    group.append(colRef.getTableAlias() + "_" + colRef.getName(), Binary.fromString(value));
-                    break;
-            }
-        }
-
-        private void parseMeaValue(final Group group, final MeasureDesc measureDesc, final byte[] value, final int offset, final int length) throws IOException {
-            if (value==null) {
-                logger.info("value is null");
-                return;
-            }
-            switch (meaTypeMap.get(measureDesc)) {
-                case "long":
-                    group.append(measureDesc.getName(), BytesUtil.readVLong(ByteBuffer.wrap(value, offset, length)));
-                    break;
-                case "double":
-                    group.append(measureDesc.getName(), ByteBuffer.wrap(value, offset, length).getDouble());
-                    break;
-                case "decimal":
-                    BigDecimal decimal = serializer.deserialize(ByteBuffer.wrap(value, offset, length));
-                    decimal = decimal.setScale(4);
-                    group.append(measureDesc.getName(), Binary.fromConstantByteArray(decimal.unscaledValue().toByteArray()));
-                    break;
-                default:
-                    group.append(measureDesc.getName(), Binary.fromConstantByteArray(value, offset, length));
-                    break;
-            }
-        }
-    }
-
-    private MessageType cuboidToMessageType(Cuboid cuboid, IDimensionEncodingMap dimEncMap, CubeDesc cubeDesc, Map<TblColRef, String> colTypeMap, Map<MeasureDesc, String> meaTypeMap) {
-        Types.MessageTypeBuilder builder = Types.buildMessage();
-
-        List<TblColRef> colRefs = cuboid.getColumns();
-
-        builder.optional(PrimitiveType.PrimitiveTypeName.INT64).named("cuboidId");
-
-        for (TblColRef colRef : colRefs) {
-            DimensionEncoding dimEnc = dimEncMap.get(colRef);
-
-            if (dimEnc instanceof AbstractDateDimEnc) {
-                builder.optional(PrimitiveType.PrimitiveTypeName.INT64).named(getColName(colRef));
-                colTypeMap.put(colRef, "long");
-            } else if (dimEnc instanceof FixedLenDimEnc || dimEnc instanceof FixedLenHexDimEnc) {
-                org.apache.kylin.metadata.datatype.DataType colDataType = colRef.getType();
-                builder.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(getColName(colRef));
-                colTypeMap.put(colRef, "string");
-            } else {
-                builder.optional(PrimitiveType.PrimitiveTypeName.INT32).named(getColName(colRef));
-                colTypeMap.put(colRef, "int");
-            }
-        }
-
-        MeasureIngester[] aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures());
-
-        for (int i = 0; i < cubeDesc.getMeasures().size(); i++) {
-            MeasureDesc measureDesc = cubeDesc.getMeasures().get(i);
-            org.apache.kylin.metadata.datatype.DataType meaDataType = measureDesc.getFunction().getReturnDataType();
-            MeasureType measureType = measureDesc.getFunction().getMeasureType();
-
-            if (measureType instanceof BasicMeasureType) {
-                MeasureIngester measureIngester = aggrIngesters[i];
-                if (measureIngester instanceof LongIngester) {
-                    builder.required(PrimitiveType.PrimitiveTypeName.INT64).named(measureDesc.getName());
-                    meaTypeMap.put(measureDesc, "long");
-                } else if (measureIngester instanceof DoubleIngester) {
-                    builder.required(PrimitiveType.PrimitiveTypeName.DOUBLE).named(measureDesc.getName());
-                    meaTypeMap.put(measureDesc, "double");
-                } else if (measureIngester instanceof BigDecimalIngester) {
-                    builder.required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.DECIMAL).precision(meaDataType.getPrecision()).scale(meaDataType.getScale()).named(measureDesc.getName());
-                    meaTypeMap.put(measureDesc, "decimal");
-                } else {
-                    builder.required(PrimitiveType.PrimitiveTypeName.BINARY).named(measureDesc.getName());
-                    meaTypeMap.put(measureDesc, "binary");
-                }
-            } else {
-                builder.required(PrimitiveType.PrimitiveTypeName.BINARY).named(measureDesc.getName());
-                meaTypeMap.put(measureDesc, "binary");
-            }
-        }
-
-        return builder.named(String.valueOf(cuboid.getId()));
-    }
-
-    private String getColName(TblColRef colRef) {
-        return colRef.getTableAlias() + "_" + colRef.getName();
     }
 }
diff --git a/tomcat-ext/src/main/java/org/apache/kylin/ext/ItClassLoader.java b/tomcat-ext/src/main/java/org/apache/kylin/ext/ItClassLoader.java
index 0590999..e0a9508 100644
--- a/tomcat-ext/src/main/java/org/apache/kylin/ext/ItClassLoader.java
+++ b/tomcat-ext/src/main/java/org/apache/kylin/ext/ItClassLoader.java
@@ -79,9 +79,9 @@ public class ItClassLoader extends URLClassLoader {
                 e.printStackTrace();
             }
         }
-        String spark_home = System.getenv("SPARK_HOME");
+        String sparkHome = System.getenv("SPARK_HOME");
         try {
-            File sparkJar = findFile(spark_home + "/jars", "spark-yarn_.*.jar");
+            File sparkJar = findFile(sparkHome + "/jars", "spark-yarn_.*.jar");
             addURL(sparkJar.toURI().toURL());
             addURL(new File("../examples/test_case_data/sandbox").toURI().toURL());
         } catch (MalformedURLException e) {
diff --git a/tomcat-ext/src/main/java/org/apache/kylin/ext/ItSparkClassLoader.java b/tomcat-ext/src/main/java/org/apache/kylin/ext/ItSparkClassLoader.java
index c69ef9c..afca915 100644
--- a/tomcat-ext/src/main/java/org/apache/kylin/ext/ItSparkClassLoader.java
+++ b/tomcat-ext/src/main/java/org/apache/kylin/ext/ItSparkClassLoader.java
@@ -66,15 +66,15 @@ public class ItSparkClassLoader extends URLClassLoader {
     }
 
     public void init() throws MalformedURLException {
-        String spark_home = System.getenv("SPARK_HOME");
-        if (spark_home == null) {
-            spark_home = System.getProperty("SPARK_HOME");
-            if (spark_home == null) {
+        String sparkHome = System.getenv("SPARK_HOME");
+        if (sparkHome == null) {
+            sparkHome = System.getProperty("SPARK_HOME");
+            if (sparkHome == null) {
                 throw new RuntimeException(
                         "Spark home not found; set it explicitly or use the SPARK_HOME environment variable.");
             }
         }
-        File file = new File(spark_home + "/jars");
+        File file = new File(sparkHome + "/jars");
         File[] jars = file.listFiles();
         for (File jar : jars) {
             addURL(jar.toURI().toURL());
diff --git a/tomcat-ext/src/main/java/org/apache/kylin/ext/SparkClassLoader.java b/tomcat-ext/src/main/java/org/apache/kylin/ext/SparkClassLoader.java
index dba782b..8fe211e 100644
--- a/tomcat-ext/src/main/java/org/apache/kylin/ext/SparkClassLoader.java
+++ b/tomcat-ext/src/main/java/org/apache/kylin/ext/SparkClassLoader.java
@@ -62,25 +62,25 @@ public class SparkClassLoader extends URLClassLoader {
     private static Logger logger = LoggerFactory.getLogger(SparkClassLoader.class);
 
     static {
-        String sparkclassloader_spark_cl_preempt_classes = System.getenv("SPARKCLASSLOADER_SPARK_CL_PREEMPT_CLASSES");
-        if (!StringUtils.isEmpty(sparkclassloader_spark_cl_preempt_classes)) {
-            SPARK_CL_PREEMPT_CLASSES = StringUtils.split(sparkclassloader_spark_cl_preempt_classes, ",");
+        String sparkclassloaderSparkClPreemptClasses = System.getenv("SPARKCLASSLOADER_SPARK_CL_PREEMPT_CLASSES");
+        if (!StringUtils.isEmpty(sparkclassloaderSparkClPreemptClasses)) {
+            SPARK_CL_PREEMPT_CLASSES = StringUtils.split(sparkclassloaderSparkClPreemptClasses, ",");
         }
 
-        String sparkclassloader_spark_cl_preempt_files = System.getenv("SPARKCLASSLOADER_SPARK_CL_PREEMPT_FILES");
-        if (!StringUtils.isEmpty(sparkclassloader_spark_cl_preempt_files)) {
-            SPARK_CL_PREEMPT_FILES = StringUtils.split(sparkclassloader_spark_cl_preempt_files, ",");
+        String sparkclassloaderSparkClPreemptFiles = System.getenv("SPARKCLASSLOADER_SPARK_CL_PREEMPT_FILES");
+        if (!StringUtils.isEmpty(sparkclassloaderSparkClPreemptFiles)) {
+            SPARK_CL_PREEMPT_FILES = StringUtils.split(sparkclassloaderSparkClPreemptFiles, ",");
         }
 
-        String sparkclassloader_this_cl_precedent_classes = System.getenv("SPARKCLASSLOADER_THIS_CL_PRECEDENT_CLASSES");
-        if (!StringUtils.isEmpty(sparkclassloader_this_cl_precedent_classes)) {
-            THIS_CL_PRECEDENT_CLASSES = StringUtils.split(sparkclassloader_this_cl_precedent_classes, ",");
+        String sparkclassloaderThisClPrecedentClasses = System.getenv("SPARKCLASSLOADER_THIS_CL_PRECEDENT_CLASSES");
+        if (!StringUtils.isEmpty(sparkclassloaderThisClPrecedentClasses)) {
+            THIS_CL_PRECEDENT_CLASSES = StringUtils.split(sparkclassloaderThisClPrecedentClasses, ",");
         }
 
-        String sparkclassloader_parent_cl_precedent_classes = System
+        String sparkclassloaderParentClPrecedentClasses = System
                 .getenv("SPARKCLASSLOADER_PARENT_CL_PRECEDENT_CLASSES");
-        if (!StringUtils.isEmpty(sparkclassloader_parent_cl_precedent_classes)) {
-            PARENT_CL_PRECEDENT_CLASSES = StringUtils.split(sparkclassloader_parent_cl_precedent_classes, ",");
+        if (!StringUtils.isEmpty(sparkclassloaderParentClPrecedentClasses)) {
+            PARENT_CL_PRECEDENT_CLASSES = StringUtils.split(sparkclassloaderParentClPrecedentClasses, ",");
         }
 
         try {
@@ -111,6 +111,7 @@ public class SparkClassLoader extends URLClassLoader {
         init();
     }
 
+    @SuppressWarnings("checkstyle:LocalVariableName")
     public void init() throws MalformedURLException {
         String spark_home = System.getenv("SPARK_HOME");
         if (spark_home == null) {
diff --git a/tomcat-ext/src/main/java/org/apache/kylin/ext/TomcatClassLoader.java b/tomcat-ext/src/main/java/org/apache/kylin/ext/TomcatClassLoader.java
index 89717ec..be0ecd9 100644
--- a/tomcat-ext/src/main/java/org/apache/kylin/ext/TomcatClassLoader.java
+++ b/tomcat-ext/src/main/java/org/apache/kylin/ext/TomcatClassLoader.java
@@ -48,21 +48,21 @@ public class TomcatClassLoader extends ParallelWebappClassLoader {
     private static final Set<String> wontFindClasses = new HashSet<>();
 
     static {
-        String tomcatclassloader_parent_cl_precedent_classes = System
+        String tomcatclassloaderParentClPrecedentClasses = System
                 .getenv("TOMCATCLASSLOADER_PARENT_CL_PRECEDENT_CLASSES");
-        if (!StringUtils.isEmpty(tomcatclassloader_parent_cl_precedent_classes)) {
-            PARENT_CL_PRECEDENT_CLASSES = StringUtils.split(tomcatclassloader_parent_cl_precedent_classes, ",");
+        if (!StringUtils.isEmpty(tomcatclassloaderParentClPrecedentClasses)) {
+            PARENT_CL_PRECEDENT_CLASSES = StringUtils.split(tomcatclassloaderParentClPrecedentClasses, ",");
         }
 
-        String tomcatclassloader_this_cl_precedent_classes = System
+        String tomcatclassloaderThisClPrecedentClasses = System
                 .getenv("TOMCATCLASSLOADER_THIS_CL_PRECEDENT_CLASSES");
-        if (!StringUtils.isEmpty(tomcatclassloader_this_cl_precedent_classes)) {
-            THIS_CL_PRECEDENT_CLASSES = StringUtils.split(tomcatclassloader_this_cl_precedent_classes, ",");
+        if (!StringUtils.isEmpty(tomcatclassloaderThisClPrecedentClasses)) {
+            THIS_CL_PRECEDENT_CLASSES = StringUtils.split(tomcatclassloaderThisClPrecedentClasses, ",");
         }
 
-        String tomcatclassloader_codegen_classes = System.getenv("TOMCATCLASSLOADER_CODEGEN_CLASSES");
-        if (!StringUtils.isEmpty(tomcatclassloader_codegen_classes)) {
-            CODEGEN_CLASSES = StringUtils.split(tomcatclassloader_codegen_classes, ",");
+        String tomcatclassloaderCodegenClasses = System.getenv("TOMCATCLASSLOADER_CODEGEN_CLASSES");
+        if (!StringUtils.isEmpty(tomcatclassloaderCodegenClasses)) {
+            CODEGEN_CLASSES = StringUtils.split(tomcatclassloaderCodegenClasses, ",");
         }
 
         wontFindClasses.add("Class");
@@ -98,6 +98,7 @@ public class TomcatClassLoader extends ParallelWebappClassLoader {
         init();
     }
 
+    @SuppressWarnings("checkstyle:LocalVariableName")
     public void init() {
         String spark_home = System.getenv("SPARK_HOME");
         try {