You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/12/25 07:59:31 UTC

[1/3] kylin git commit: spark cubing init commit

Repository: kylin
Updated Branches:
  refs/heads/sparkcubing [created] 2e0665c9d


http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index fc2b982..763b537 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.collect.Sets;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -56,8 +57,6 @@ import org.apache.kylin.metadata.model.TableDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Sets;
-
 public class HiveMRInput implements IMRInput {
 
     public static String getTableNameForHCat(TableDesc table) {
@@ -74,7 +73,7 @@ public class HiveMRInput implements IMRInput {
     public IMRTableInputFormat getTableInputFormat(TableDesc table) {
         return new HiveTableInputFormat(getTableNameForHCat(table));
     }
-    
+
     @Override
     public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
         return new IMRBatchMergeInputSide() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/tool/pom.xml
----------------------------------------------------------------------
diff --git a/tool/pom.xml b/tool/pom.xml
index 8eddec4..b577ae9 100644
--- a/tool/pom.xml
+++ b/tool/pom.xml
@@ -40,6 +40,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-engine-spark</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-source-kafka</artifactId>
         </dependency>
         <dependency>


[3/3] kylin git commit: spark cubing init commit

Posted by sh...@apache.org.
spark cubing init commit


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/2e0665c9
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/2e0665c9
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/2e0665c9

Branch: refs/heads/sparkcubing
Commit: 2e0665c9d2ed5b24d5cafd3ff8f7334f4f3d5a49
Parents: d11d019
Author: shaofengshi <sh...@apache.org>
Authored: Sun Dec 25 15:59:16 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sun Dec 25 15:59:16 2016 +0800

----------------------------------------------------------------------
 assembly/pom.xml                                |   5 +-
 .../main/config/assemblies/source-assembly.xml  |   3 -
 .../java/org/apache/kylin/job/DeployUtil.java   |   1 -
 build/conf/kylin-spark-conf.properties          |  27 ++
 .../apache/kylin/common/KylinConfigBase.java    |  38 +-
 .../kylin/common/persistence/ResourceStore.java |  35 ++
 .../org/apache/kylin/common/util/Array.java     |   2 +-
 .../apache/kylin/common/util/SplittedBytes.java |   2 +-
 .../java/org/apache/kylin/cube/CubeSegment.java |   2 +-
 .../kylin/cube/common/RowKeySplitter.java       |  12 +-
 .../org/apache/kylin/cube/cuboid/Cuboid.java    |  27 +-
 .../kylin/cube/cuboid/CuboidScheduler.java      |  15 +-
 .../kylin/cube/kv/AbstractRowKeyEncoder.java    |   6 +-
 .../org/apache/kylin/cube/kv/CubeDimEncMap.java |   7 +-
 .../apache/kylin/cube/kv/RowKeyColumnIO.java    |   2 +-
 .../org/apache/kylin/cube/kv/RowKeyEncoder.java |  19 +-
 .../kylin/cube/model/AggregationGroup.java      |   4 +-
 .../org/apache/kylin/cube/model/CubeDesc.java   |   2 +-
 .../cube/model/CubeJoinedFlatTableDesc.java     |   2 +-
 .../cube/model/CubeJoinedFlatTableEnrich.java   |   6 +-
 .../apache/kylin/cube/model/DictionaryDesc.java |   2 +-
 .../apache/kylin/cube/model/DimensionDesc.java  |   2 +-
 .../kylin/cube/model/HBaseColumnDesc.java       |  11 +-
 .../kylin/cube/model/HBaseColumnFamilyDesc.java |   9 +-
 .../kylin/cube/model/HBaseMappingDesc.java      |  15 +-
 .../apache/kylin/cube/model/HierarchyDesc.java  |   2 +-
 .../apache/kylin/cube/model/RowKeyColDesc.java  |   2 +-
 .../org/apache/kylin/cube/model/RowKeyDesc.java |  17 +-
 .../org/apache/kylin/cube/model/SelectRule.java |   2 +-
 .../dict/NumberDictionaryForestBuilder.java     |   2 +-
 .../apache/kylin/dict/StringBytesConverter.java |   2 +-
 .../kylin/job/execution/ExecutableManager.java  |  21 +-
 .../kylin/dimension/AbstractDateDimEnc.java     |  12 +-
 .../apache/kylin/dimension/BooleanDimEnc.java   |   4 +-
 .../kylin/dimension/DictionaryDimEnc.java       |  21 +-
 .../apache/kylin/dimension/FixedLenDimEnc.java  |   4 +-
 .../kylin/dimension/FixedLenHexDimEnc.java      |   4 +-
 .../org/apache/kylin/dimension/IntDimEnc.java   |   4 +-
 .../apache/kylin/dimension/IntegerDimEnc.java   |   4 +-
 .../kylin/dimension/OneMoreByteVLongDimEnc.java |  14 +-
 .../kylin/measure/BufferedMeasureCodec.java     |  10 +-
 .../org/apache/kylin/measure/MeasureCodec.java  |   8 +-
 .../apache/kylin/measure/MeasureIngester.java   |  12 +-
 .../org/apache/kylin/measure/MeasureType.java   |  12 +-
 .../kylin/measure/basic/DoubleIngester.java     |   5 +
 .../measure/basic/DoubleSumAggregator.java      |   3 +-
 .../kylin/measure/basic/LongIngester.java       |   5 +
 .../kylin/measure/basic/LongSumAggregator.java  |   3 +-
 .../kylin/measure/bitmap/BitmapCounter.java     |   8 +-
 .../kylin/measure/bitmap/BitmapMeasureType.java |   5 +
 .../kylin/measure/hllc/HLLCMeasureType.java     |   5 +
 .../kylin/measure/hllc/HLLCSerializer.java      |  11 +-
 .../apache/kylin/measure/raw/RawSerializer.java |   4 +-
 .../apache/kylin/measure/topn/TopNCounter.java  |   2 +-
 .../kylin/measure/topn/TopNMeasureType.java     |   4 +-
 .../metadata/datatype/BigDecimalSerializer.java |   2 +-
 .../metadata/datatype/BooleanSerializer.java    |   9 +-
 .../metadata/datatype/DataTypeSerializer.java   |  17 +-
 .../metadata/datatype/DateTimeSerializer.java   |   9 +-
 .../metadata/datatype/DoubleSerializer.java     |   5 +-
 .../kylin/metadata/datatype/Int4Serializer.java |   9 +-
 .../metadata/datatype/Long8Serializer.java      |   9 +-
 .../kylin/metadata/datatype/LongSerializer.java |   9 +-
 .../kylin/metadata/model/FunctionDesc.java      |  22 +-
 .../apache/kylin/metadata/model/JoinDesc.java   |   7 +-
 .../kylin/metadata/model/JoinTableDesc.java     |   4 +-
 .../apache/kylin/metadata/model/JoinsTree.java  |   9 +-
 .../kylin/metadata/model/MeasureDesc.java       |   9 +-
 .../metadata/model/ModelDimensionDesc.java      |  10 +-
 .../kylin/metadata/model/ParameterDesc.java     |  13 +-
 .../kylin/metadata/model/PartitionDesc.java     |   6 +-
 .../apache/kylin/metadata/model/TableRef.java   |  17 +-
 .../kylin/engine/mr/BatchCubingJobBuilder2.java |   9 +-
 .../kylin/engine/mr/JobBuilderSupport.java      |  12 +-
 .../engine/mr/common/BaseCuboidBuilder.java     | 175 +++++++++
 .../kylin/engine/mr/common/NDCuboidBuilder.java |  96 +++++
 .../engine/mr/steps/BaseCuboidMapperBase.java   | 131 +------
 .../kylin/engine/mr/steps/NDCuboidMapper.java   |  58 +--
 engine-spark/pom.xml                            |   5 +
 .../engine/spark/SparkBatchCubingEngine2.java   |  33 ++
 .../spark/SparkBatchCubingJobBuilder2.java      |  85 +++++
 .../apache/kylin/engine/spark/SparkCubing.java  |  79 ++++-
 .../kylin/engine/spark/SparkCubingV3.java       | 354 +++++++++++++++++++
 .../kylin/engine/spark/SparkExecutable.java     |  26 +-
 .../sandbox/kylin-spark-conf.properties         |  27 ++
 .../test_case_data/sandbox/kylin.properties     |  11 +-
 pom.xml                                         |   2 +-
 .../kylin/rest/controller/CubeController.java   |   8 +-
 server/pom.xml                                  |   7 +
 .../apache/kylin/source/hive/HiveMRInput.java   |   5 +-
 tool/pom.xml                                    |   4 +
 91 files changed, 1299 insertions(+), 462 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 73f9e12..fc49a62 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -47,6 +47,10 @@
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-engine-mr</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-engine-spark</artifactId>
+        </dependency>
 
         <!-- Env & Test -->
         <dependency>
@@ -173,7 +177,6 @@
                             <shadedClassifierName>job</shadedClassifierName>
                             <artifactSet>
                                 <excludes>
-                                    <exclude>io.netty:*</exclude>
                                     <exclude>org.apache.zookeeper:*</exclude>
                                     <exclude>net.sf.ehcache:*</exclude>
                                     <exclude>org.apache.httpcomponents:*</exclude>

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/assembly/src/main/config/assemblies/source-assembly.xml
----------------------------------------------------------------------
diff --git a/assembly/src/main/config/assemblies/source-assembly.xml b/assembly/src/main/config/assemblies/source-assembly.xml
index fad45aa..92584b5 100644
--- a/assembly/src/main/config/assemblies/source-assembly.xml
+++ b/assembly/src/main/config/assemblies/source-assembly.xml
@@ -97,9 +97,6 @@ limitations under the License.
                 <exclude>%regex[(?!((?!${project.build.directory}/)[^/]+/)*src/)(.*/)?docs(/.*)?]
                 </exclude>
 
-                <!-- exclude unmaintained -->
-                <exclude>%regex[(?!((?!${project.build.directory}/)))?engine-spark(/.*)?]
-                </exclude>
             </excludes>
         </fileSet>
         <!-- LICENSE, NOTICE, DEPENDENCIES, git.properties, etc. calculated at build time -->

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index 23b3670..f51c208 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -81,7 +81,6 @@ public class DeployUtil {
 
         config().overrideMRJobJarPath(jobJar.getAbsolutePath());
         config().overrideCoprocessorLocalJar(coprocessorJar.getAbsolutePath());
-        config().overrideSparkJobJarPath(getSparkJobJarFile().getAbsolutePath());
     }
 
     private static String getPomVersion() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/build/conf/kylin-spark-conf.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin-spark-conf.properties b/build/conf/kylin-spark-conf.properties
new file mode 100644
index 0000000..7f53f50
--- /dev/null
+++ b/build/conf/kylin-spark-conf.properties
@@ -0,0 +1,27 @@
+spark.yarn.submit.file.replication=1
+spark.yarn.executor.memoryOverhead=200
+spark.yarn.driver.memoryOverhead=384
+#spark.master=local[4]
+spark.master=yarn
+spark.submit.deployMode=cluster
+spark.eventLog.enabled=true
+spark.yarn.scheduler.heartbeat.interval-ms=5000
+spark.yarn.preserve.staging.files=true
+spark.yarn.queue=default
+spark.yarn.containerLauncherMaxThreads=25
+spark.yarn.max.executor.failures=3
+spark.eventLog.dir=hdfs\:///kylin/spark-history
+spark.history.kerberos.enabled=true
+spark.history.provider=org.apache.spark.deploy.history.FsHistoryProvider
+spark.history.ui.port=18080
+spark.history.fs.logDirectory=hdfs\:///kylin/spark-history
+spark.executor.memory=1G
+spark.storage.memoryFraction=0.3
+spark.executor.cores=1
+spark.executor.instances=1
+spark.history.kerberos.keytab=none
+spark.history.kerberos.principal=none
+#spark.yarn.jar=hdfs://sandbox.hortonworks.com:8020/apps/spark/spark-assembly-1.6.3-hadoop2.6.0.jar
+spark.driver.extraJavaOptions=-Dhdp.version=current
+spark.yarn.am.extraJavaOptions=-Dhdp.version=current
+spark.executor.extraJavaOptions=-Dhdp.version=current

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 610c2af..c0a684c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -668,6 +668,7 @@ abstract public class KylinConfigBase implements Serializable {
         // ref constants in IEngineAware
         r.put(0, "org.apache.kylin.engine.mr.MRBatchCubingEngine");
         r.put(2, "org.apache.kylin.engine.mr.MRBatchCubingEngine2");
+        r.put(4, "org.apache.kylin.engine.spark.SparkBatchCubingEngine2");
         return r;
     }
 
@@ -744,29 +745,32 @@ abstract public class KylinConfigBase implements Serializable {
     // ENGINE.SPARK
     // ============================================================================
 
-    public String getKylinSparkJobJarPath() {
-        final String jobJar = getOptional("kylin.engine.spark.job-jar");
-        if (StringUtils.isNotEmpty(jobJar)) {
-            return jobJar;
-        }
-        String kylinHome = getKylinHome();
-        if (StringUtils.isEmpty(kylinHome)) {
-            return "";
-        }
-        return getFileName(kylinHome + File.separator + "lib", SPARK_JOB_JAR_NAME_PATTERN);
+    public String getSparkHome() {
+        return getRequired("kylin.engine.spark.spark-home");
     }
 
-    public void overrideSparkJobJarPath(String path) {
-        logger.info("override " + "kylin.engine.spark.job-jar" + " to " + path);
-        System.setProperty("kylin.engine.spark.job-jar", path);
+    public String getSparkHadoopConfDir() {
+        return getRequired("kylin.engine.spark.env.hadoop-conf-dir");
     }
 
-    public String getSparkHome() {
-        return getRequired("kylin.engine.spark.spark-home");
+    public String getSparkConfFile() {
+        String conf = getOptional("kylin.engine.spark.properties-file", "conf/kylin-spark-conf.properties");
+        File f = new File(conf);
+        if (f.exists()) {
+            return f.getAbsolutePath();
+        } else {
+            String home = getKylinHome();
+            f = new File(home, conf);
+            if (f.exists()) {
+                return f.getAbsolutePath();
+            }
+        }
+
+        throw new IllegalArgumentException("Spark conf file '" + conf + "' does not exist.");
     }
 
-    public String getSparkMaster() {
-        return getRequired("kylin.engine.spark.spark-master");
+    public String getSparkAdditionalJars() {
+        return getOptional("kylin.engine.spark.additional-jars", "");
     }
 
     // ============================================================================

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 466d9d2..5865cca 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -21,17 +21,21 @@ package org.apache.kylin.common.persistence;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.OptionsHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -311,4 +315,35 @@ abstract public class ResourceStore {
         return collector;
     }
 
+    public static String dumpResources(KylinConfig kylinConfig, Collection<String> dumpList) throws IOException {
+        File tmp = File.createTempFile("kylin_job_meta", "");
+        FileUtils.forceDelete(tmp); // we need a directory, so delete the file first
+
+        File metaDir = new File(tmp, "meta");
+        metaDir.mkdirs();
+
+        // write kylin.properties
+        File kylinPropsFile = new File(metaDir, "kylin.properties");
+        kylinConfig.writeProperties(kylinPropsFile);
+
+        ResourceStore from = ResourceStore.getStore(kylinConfig);
+        KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath());
+        ResourceStore to = ResourceStore.getStore(localConfig);
+        for (String path : dumpList) {
+            RawResource res = from.getResource(path);
+            if (res == null)
+                throw new IllegalStateException("No resource found at -- " + path);
+            to.putResource(path, res.inputStream, res.timestamp);
+            res.inputStream.close();
+        }
+
+        String metaDirURI = OptionsHelper.convertToFileURL(metaDir.getAbsolutePath());
+        if (metaDirURI.startsWith("/")) // note Path on windows is like "d:/../..."
+            metaDirURI = "file://" + metaDirURI;
+        else
+            metaDirURI = "file:///" + metaDirURI;
+        logger.info("meta dir is: " + metaDirURI);
+
+        return metaDirURI;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-common/src/main/java/org/apache/kylin/common/util/Array.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Array.java b/core-common/src/main/java/org/apache/kylin/common/util/Array.java
index 7447b46..b25b764 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/Array.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/Array.java
@@ -23,7 +23,7 @@ import java.util.Arrays;
 /*
  * An array with correct equals(), hashCode(), compareTo() and toString()
  */
-public class Array<T> implements Comparable<Array<T>> {
+public class Array<T> implements Comparable<Array<T>>, java.io.Serializable {
     public T[] data;
 
     public Array(T[] data) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-common/src/main/java/org/apache/kylin/common/util/SplittedBytes.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/SplittedBytes.java b/core-common/src/main/java/org/apache/kylin/common/util/SplittedBytes.java
index 8751b78..ae380cb 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/SplittedBytes.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/SplittedBytes.java
@@ -22,7 +22,7 @@ package org.apache.kylin.common.util;
  * @author George Song (ysong1)
  * 
  */
-public class SplittedBytes {
+public class SplittedBytes implements java.io.Serializable {
     public SplittedBytes(int length) {
         this.value = new byte[length];
         this.length = 0;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index e155f86..e64d7ea 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -51,7 +51,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class CubeSegment implements Comparable<CubeSegment>, IBuildable, ISegment {
+public class CubeSegment implements Comparable<CubeSegment>, IBuildable, ISegment, java.io.Serializable {
 
     @JsonBackReference
     private CubeInstance cubeInstance;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
index 67f1751..3602fff 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
@@ -26,9 +26,11 @@ import org.apache.kylin.cube.kv.CubeDimEncMap;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.kv.RowKeyColumnIO;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.RowKeyColDesc;
+import org.apache.kylin.dimension.IDimensionEncodingMap;
 import org.apache.kylin.metadata.model.TblColRef;
 
-public class RowKeySplitter {
+public class RowKeySplitter implements java.io.Serializable {
 
     private CubeDesc cubeDesc;
     private RowKeyColumnIO colIO;
@@ -64,7 +66,13 @@ public class RowKeySplitter {
     public RowKeySplitter(CubeSegment cubeSeg, int splitLen, int bytesLen) {
         this.enableSharding = cubeSeg.isEnableSharding();
         this.cubeDesc = cubeSeg.getCubeDesc();
-        this.colIO = new RowKeyColumnIO(new CubeDimEncMap(cubeSeg));
+        IDimensionEncodingMap dimEncoding = new CubeDimEncMap(cubeSeg);
+
+        for (RowKeyColDesc rowKeyColDesc : cubeDesc.getRowkey().getRowKeyColumns()) {
+            dimEncoding.get(rowKeyColDesc.getColRef());
+        }
+
+        this.colIO = new RowKeyColumnIO(dimEncoding);
 
         this.splitBuffers = new SplittedBytes[splitLen];
         this.splitOffsets = new int[splitLen];

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
index 7503fbf..dd22d6a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
@@ -18,15 +18,10 @@
 
 package org.apache.kylin.cube.cuboid;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
@@ -37,12 +32,16 @@ import org.apache.kylin.cube.model.RowKeyColDesc;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
-public class Cuboid implements Comparable<Cuboid> {
+public class Cuboid implements Comparable<Cuboid>, java.io.Serializable {
 
     private final static Map<String, Map<Long, Cuboid>> CUBOID_CACHE = new ConcurrentHashMap<String, Map<Long, Cuboid>>();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
index 733aded..ffb0a5e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
@@ -21,6 +21,12 @@ package org.apache.kylin.cube.cuboid;
 /**
  */
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.kylin.cube.model.AggregationGroup;
+import org.apache.kylin.cube.model.CubeDesc;
+
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -28,14 +34,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.kylin.cube.model.AggregationGroup;
-import org.apache.kylin.cube.model.CubeDesc;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-public class CuboidScheduler {
+public class CuboidScheduler implements java.io.Serializable {
 
     private final CubeDesc cubeDesc;
     private final long max;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
index bfe6eb4..2becde4 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
@@ -18,8 +18,6 @@
 
 package org.apache.kylin.cube.kv;
 
-import java.util.Map;
-
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.cube.CubeSegment;
@@ -30,12 +28,14 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+
 /**
  * 
  * @author xjiang
  * 
  */
-public abstract class AbstractRowKeyEncoder {
+public abstract class AbstractRowKeyEncoder implements java.io.Serializable {
 
     protected static final Logger logger = LoggerFactory.getLogger(AbstractRowKeyEncoder.class);
     public static final byte DEFAULT_BLANK_BYTE = DimensionEncoding.NULL;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java
index a4d2d6f..bd9554a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java
@@ -18,8 +18,7 @@
 
 package org.apache.kylin.cube.kv;
 
-import java.util.Map;
-
+import com.google.common.collect.Maps;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
@@ -33,9 +32,9 @@ import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Maps;
+import java.util.Map;
 
-public class CubeDimEncMap implements IDimensionEncodingMap {
+public class CubeDimEncMap implements IDimensionEncodingMap, java.io.Serializable {
 
     private static final Logger logger = LoggerFactory.getLogger(CubeDimEncMap.class);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
index fbb93db..65911a0 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
@@ -29,7 +29,7 @@ import org.apache.kylin.metadata.model.TblColRef;
  *
  * @author yangli9
  */
-public class RowKeyColumnIO {
+public class RowKeyColumnIO implements java.io.Serializable {
 
     //private static final Logger logger = LoggerFactory.getLogger(RowKeyColumnIO.class);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
index bf20de1..a669fb1 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
@@ -18,11 +18,7 @@
 
 package org.apache.kylin.cube.kv;
 
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
+import com.google.common.base.Preconditions;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.ImmutableBitSet;
@@ -32,9 +28,12 @@ import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.metadata.model.TblColRef;
 
-import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
-public class RowKeyEncoder extends AbstractRowKeyEncoder {
+public class RowKeyEncoder extends AbstractRowKeyEncoder implements java.io.Serializable {
 
     private int bodyLength = 0;
     private RowKeyColumnIO colIO;
@@ -42,11 +41,13 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder {
     protected boolean enableSharding;
     private int uhcOffset = -1;//it's a offset to the beginning of body
     private int uhcLength = -1;
+    private int headerLength;
 
     public RowKeyEncoder(CubeSegment cubeSeg, Cuboid cuboid) {
         super(cubeSeg, cuboid);
         enableSharding = cubeSeg.isEnableSharding();
-        Set<TblColRef> shardByColumns = cubeSeg.getShardByColumns();
+        headerLength = cubeSeg.getRowKeyPreambleSize();
+        Set<TblColRef> shardByColumns = cubeSeg.getCubeDesc().getShardByColumns();
         if (shardByColumns.size() > 1) {
             throw new IllegalStateException("Does not support multiple UHC now");
         }
@@ -61,7 +62,7 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder {
     }
 
     public int getHeaderLength() {
-        return cubeSeg.getRowKeyPreambleSize();
+        return headerLength;
     }
 
     public int getBytesLength() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-cube/src/main/java/org/apache/kylin/cube/model/AggregationGroup.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/AggregationGroup.java b/core-cube/src/main/java/org/apache/kylin/cube/model/AggregationGroup.java
index 9bd082f..5f3c92c 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/AggregationGroup.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/AggregationGroup.java
@@ -35,8 +35,8 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class AggregationGroup {
-    public static class HierarchyMask {
+public class AggregationGroup implements java.io.Serializable {
+    public static class HierarchyMask implements java.io.Serializable {
         public long fullMask; // 00000111
         public long[] allMasks; // 00000100,00000110,00000111
         public long[] dims; // 00000100,00000010,00000001

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 6088bb4..1993ef1 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -100,7 +100,7 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware {
         LOOKUP, PK_FK, EXTENDED_COLUMN
     }
 
-    public static class DeriveInfo {
+    public static class DeriveInfo implements java.io.Serializable {
         public DeriveType type;
         public JoinDesc join;
         public TblColRef[] columns;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index f37f86e..7516df7 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -36,7 +36,7 @@ import com.google.common.collect.Maps;
 
 /**
  */
-public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
+public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc, java.io.Serializable {
 
     protected final String tableName;
     protected final CubeDesc cubeDesc;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
index 979af76..d072631 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
@@ -18,8 +18,6 @@
 
 package org.apache.kylin.cube.model;
 
-import java.util.List;
-
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.FunctionDesc;
@@ -28,10 +26,12 @@ import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 
+import java.util.List;
+
 /**
  * An enrich of IJoinedFlatTableDesc for cubes
  */
-public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc {
+public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc, java.io.Serializable {
 
     private CubeDesc cubeDesc;
     private IJoinedFlatTableDesc flatDesc;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-cube/src/main/java/org/apache/kylin/cube/model/DictionaryDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/DictionaryDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/DictionaryDesc.java
index f471f9f..ca2183a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/DictionaryDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/DictionaryDesc.java
@@ -27,7 +27,7 @@ import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class DictionaryDesc {
+public class DictionaryDesc implements java.io.Serializable {
 
     @JsonProperty("column")
     private String column;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-cube/src/main/java/org/apache/kylin/cube/model/DimensionDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/DimensionDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/DimensionDesc.java
index cd75228..93bf4bd 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/DimensionDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/DimensionDesc.java
@@ -35,7 +35,7 @@ import com.google.common.base.Objects;
 /**
  */
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class DimensionDesc {
+public class DimensionDesc implements java.io.Serializable {
 
     @JsonProperty("name")
     private String name;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnDesc.java
index fb491f8..7007342 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnDesc.java
@@ -18,19 +18,18 @@
 
 package org.apache.kylin.cube.model;
 
-import java.util.Arrays;
-
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+import java.util.Arrays;
 
 /**
  */
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class HBaseColumnDesc {
+public class HBaseColumnDesc implements java.io.Serializable {
 
     @JsonProperty("qualifier")
     private String qualifier;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnFamilyDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnFamilyDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnFamilyDesc.java
index c5b2e19..85c2c17 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnFamilyDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseColumnFamilyDesc.java
@@ -18,18 +18,17 @@
 
 package org.apache.kylin.cube.model;
 
-import java.util.Arrays;
-
-import org.apache.kylin.metadata.model.MeasureDesc;
-
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+import java.util.Arrays;
 
 /**
  */
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class HBaseColumnFamilyDesc {
+public class HBaseColumnFamilyDesc implements java.io.Serializable {
 
     @JsonProperty("name")
     private String name;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseMappingDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseMappingDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseMappingDesc.java
index 2ef1e17..9ad8407 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseMappingDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/HBaseMappingDesc.java
@@ -18,22 +18,21 @@
 
 package org.apache.kylin.cube.model;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.LinkedList;
-
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
-import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
 
 /**
  */
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class HBaseMappingDesc {
+public class HBaseMappingDesc implements java.io.Serializable {
 
     @JsonProperty("column_family")
     private HBaseColumnFamilyDesc[] columnFamily;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-cube/src/main/java/org/apache/kylin/cube/model/HierarchyDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/HierarchyDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/HierarchyDesc.java
index f88d4d2..b72f220 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/HierarchyDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/HierarchyDesc.java
@@ -27,7 +27,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 /**
  */
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class HierarchyDesc {
+public class HierarchyDesc implements java.io.Serializable {
 
     @JsonProperty("level")
     private String level;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyColDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyColDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyColDesc.java
index 3b49323..39bd6c4 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyColDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyColDesc.java
@@ -40,7 +40,7 @@ import com.google.common.base.Preconditions;
  * 
  */
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class RowKeyColDesc {
+public class RowKeyColDesc implements java.io.Serializable {
 
     @JsonProperty("column")
     private String column;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyDesc.java
index f1a403d..00557c5 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/RowKeyDesc.java
@@ -18,24 +18,23 @@
 
 package org.apache.kylin.cube.model;
 
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Objects;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.kylin.metadata.model.TblColRef;
+
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Objects;
-
 /**
  */
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class RowKeyDesc {
+public class RowKeyDesc implements java.io.Serializable {
 
     @JsonProperty("rowkey_columns")
     private RowKeyColDesc[] rowkeyColumns;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-cube/src/main/java/org/apache/kylin/cube/model/SelectRule.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/SelectRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/SelectRule.java
index 63b0fc4..4a6c510 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/SelectRule.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/SelectRule.java
@@ -20,7 +20,7 @@ package org.apache.kylin.cube.model;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-public class SelectRule {
+public class SelectRule implements java.io.Serializable {
     @JsonProperty("hierarchy_dims")
     public String[][] hierarchy_dims;
     @JsonProperty("mandatory_dims")

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java
index 5502a74..60c9f59 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java
@@ -26,7 +26,7 @@ import org.apache.kylin.dict.NumberDictionary.NumberBytesCodec;
  */
 public class NumberDictionaryForestBuilder extends TrieDictionaryForestBuilder<String> {
 
-    public static class Number2BytesConverter implements BytesConverter<String> {
+    public static class Number2BytesConverter implements BytesConverter<String>, java.io.Serializable {
 
         static final int MAX_DIGITS_BEFORE_DECIMAL_POINT = NumberDictionary.MAX_DIGITS_BEFORE_DECIMAL_POINT;
         static final ThreadLocal<NumberBytesCodec> LOCAL = new ThreadLocal<NumberBytesCodec>();

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-dictionary/src/main/java/org/apache/kylin/dict/StringBytesConverter.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/StringBytesConverter.java b/core-dictionary/src/main/java/org/apache/kylin/dict/StringBytesConverter.java
index 0bec6a1..9107a4c 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/StringBytesConverter.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/StringBytesConverter.java
@@ -20,7 +20,7 @@ package org.apache.kylin.dict;
 
 import org.apache.kylin.common.util.Bytes;
 
-public class StringBytesConverter implements BytesConverter<String> {
+public class StringBytesConverter implements BytesConverter<String>, java.io.Serializable {
 
     @Override
     public byte[] convertToBytes(String v) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 4351e31..34f9d84 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -18,12 +18,9 @@
 
 package org.apache.kylin.job.execution;
 
-import java.lang.reflect.Constructor;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.job.dao.ExecutableDao;
@@ -34,9 +31,11 @@ import org.apache.kylin.job.exception.PersistentException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  */
@@ -308,6 +307,10 @@ public class ExecutableManager {
                 }
             }
         }
+
+        if (job.getStatus() == ExecutableState.SUCCEED) {
+            updateJobOutput(job.getId(), ExecutableState.READY, null, null);
+        }
     }
 
     public void pauseJob(String jobId) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/dimension/AbstractDateDimEnc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/AbstractDateDimEnc.java b/core-metadata/src/main/java/org/apache/kylin/dimension/AbstractDateDimEnc.java
index ec6347f..a54bcda 100644
--- a/core-metadata/src/main/java/org/apache/kylin/dimension/AbstractDateDimEnc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/dimension/AbstractDateDimEnc.java
@@ -18,6 +18,10 @@
 
 package org.apache.kylin.dimension;
 
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
@@ -25,10 +29,6 @@ import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.metadata.datatype.DataTypeSerializer;
-
 public class AbstractDateDimEnc extends DimensionEncoding {
     private static final long serialVersionUID = 1L;
 
@@ -81,11 +81,9 @@ public class AbstractDateDimEnc extends DimensionEncoding {
     @Override
     public DataTypeSerializer<Object> asDataTypeSerializer() {
         return new DataTypeSerializer<Object>() {
-            // be thread-safe and avoid repeated obj creation
-            private ThreadLocal<byte[]> current = new ThreadLocal<byte[]>();
 
             private byte[] currentBuf() {
-                byte[] buf = current.get();
+                byte[] buf = (byte[]) current.get();
                 if (buf == null) {
                     buf = new byte[fixedLen];
                     current.set(buf);

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/dimension/BooleanDimEnc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/BooleanDimEnc.java b/core-metadata/src/main/java/org/apache/kylin/dimension/BooleanDimEnc.java
index c3f4c11..75e50a1 100644
--- a/core-metadata/src/main/java/org/apache/kylin/dimension/BooleanDimEnc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/dimension/BooleanDimEnc.java
@@ -109,11 +109,9 @@ public class BooleanDimEnc extends DimensionEncoding {
     }
 
     private class BooleanSerializer extends DataTypeSerializer<Object> {
-        // be thread-safe and avoid repeated obj creation
-        private ThreadLocal<byte[]> current = new ThreadLocal<byte[]>();
 
         private byte[] currentBuf() {
-            byte[] buf = current.get();
+            byte[] buf = (byte[]) current.get();
             if (buf == null) {
                 buf = new byte[fixedLen];
                 current.set(buf);

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/dimension/DictionaryDimEnc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/DictionaryDimEnc.java b/core-metadata/src/main/java/org/apache/kylin/dimension/DictionaryDimEnc.java
index 500b410..48238dc 100644
--- a/core-metadata/src/main/java/org/apache/kylin/dimension/DictionaryDimEnc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/dimension/DictionaryDimEnc.java
@@ -39,12 +39,15 @@ public class DictionaryDimEnc extends DimensionEncoding {
     // ============================================================================
 
     // could use a lazy loading trick here, to prevent loading all dictionaries of a segment at once
-    private final Dictionary<String> dict;
-    private final int fixedLen;
+    private Dictionary<String> dict;
+    private int fixedLen;
 
     // used in encode(), when a value does not exist in dictionary
-    private final int roundingFlag;
-    private final byte defaultByte;
+    private int roundingFlag;
+    private byte defaultByte;
+
+    public DictionaryDimEnc() {
+    }
 
     public DictionaryDimEnc(Dictionary<String> dict) {
         this(dict, 0, NULL);
@@ -145,12 +148,18 @@ public class DictionaryDimEnc extends DimensionEncoding {
 
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
-        throw new UnsupportedOperationException();
+        out.writeInt(fixedLen);
+        out.writeInt(roundingFlag);
+        out.write(defaultByte);
+        out.writeObject(dict);
     }
 
     @Override
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        throw new UnsupportedOperationException();
+        this.fixedLen = in.readInt();
+        this.roundingFlag = in.readInt();
+        this.defaultByte = in.readByte();
+        this.dict = (Dictionary<String>) in.readObject();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenDimEnc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenDimEnc.java b/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenDimEnc.java
index b219766..f7f02a0 100644
--- a/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenDimEnc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenDimEnc.java
@@ -129,11 +129,9 @@ public class FixedLenDimEnc extends DimensionEncoding {
     }
 
     public class FixedLenSerializer extends DataTypeSerializer<Object> {
-        // be thread-safe and avoid repeated obj creation
-        private ThreadLocal<byte[]> current = new ThreadLocal<byte[]>();
 
         private byte[] currentBuf() {
-            byte[] buf = current.get();
+            byte[] buf = (byte[]) current.get();
             if (buf == null) {
                 buf = new byte[fixedLen];
                 current.set(buf);

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenHexDimEnc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenHexDimEnc.java b/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenHexDimEnc.java
index 83118fc..f90a40e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenHexDimEnc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/dimension/FixedLenHexDimEnc.java
@@ -224,11 +224,9 @@ public class FixedLenHexDimEnc extends DimensionEncoding {
     }
 
     public class FixedLenSerializer extends DataTypeSerializer<Object> {
-        // be thread-safe and avoid repeated obj creation
-        private ThreadLocal<byte[]> current = new ThreadLocal<byte[]>();
 
         private byte[] currentBuf() {
-            byte[] buf = current.get();
+            byte[] buf = (byte[]) current.get();
             if (buf == null) {
                 buf = new byte[bytelen];
                 current.set(buf);

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/dimension/IntDimEnc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/IntDimEnc.java b/core-metadata/src/main/java/org/apache/kylin/dimension/IntDimEnc.java
index f25f2a6..aa954da 100644
--- a/core-metadata/src/main/java/org/apache/kylin/dimension/IntDimEnc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/dimension/IntDimEnc.java
@@ -109,11 +109,9 @@ public class IntDimEnc extends DimensionEncoding {
     }
 
     public class IntegerSerializer extends DataTypeSerializer<Object> {
-        // be thread-safe and avoid repeated obj creation
-        private ThreadLocal<byte[]> current = new ThreadLocal<byte[]>();
 
         private byte[] currentBuf() {
-            byte[] buf = current.get();
+            byte[] buf = (byte[]) current.get();
             if (buf == null) {
                 buf = new byte[fixedLen];
                 current.set(buf);

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/dimension/IntegerDimEnc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/IntegerDimEnc.java b/core-metadata/src/main/java/org/apache/kylin/dimension/IntegerDimEnc.java
index 090dc83..0875a7f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/dimension/IntegerDimEnc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/dimension/IntegerDimEnc.java
@@ -135,11 +135,9 @@ public class IntegerDimEnc extends DimensionEncoding {
     }
 
     public class IntegerSerializer extends DataTypeSerializer<Object> {
-        // be thread-safe and avoid repeated obj creation
-        private ThreadLocal<byte[]> current = new ThreadLocal<byte[]>();
 
         private byte[] currentBuf() {
-            byte[] buf = current.get();
+            byte[] buf = (byte[]) current.get();
             if (buf == null) {
                 buf = new byte[fixedLen];
                 current.set(buf);

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/dimension/OneMoreByteVLongDimEnc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/OneMoreByteVLongDimEnc.java b/core-metadata/src/main/java/org/apache/kylin/dimension/OneMoreByteVLongDimEnc.java
index 993aac3..c0c52d1 100644
--- a/core-metadata/src/main/java/org/apache/kylin/dimension/OneMoreByteVLongDimEnc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/dimension/OneMoreByteVLongDimEnc.java
@@ -18,17 +18,17 @@
 
 package org.apache.kylin.dimension;
 
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.metadata.datatype.DataTypeSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * not being used yet, prepared for future
  */
@@ -118,11 +118,9 @@ public class OneMoreByteVLongDimEnc extends DimensionEncoding {
     }
 
     public class VLongSerializer extends DataTypeSerializer<Object> {
-        // be thread-safe and avoid repeated obj creation
-        private ThreadLocal<byte[]> current = new ThreadLocal<byte[]>();
 
         private byte[] currentBuf() {
-            byte[] buf = current.get();
+            byte[] buf = (byte[]) current.get();
             if (buf == null) {
                 buf = new byte[byteLen];
                 current.set(buf);

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureCodec.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureCodec.java b/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureCodec.java
index 8a5481c..44e5708 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureCodec.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/BufferedMeasureCodec.java
@@ -18,26 +18,26 @@
 
 package org.apache.kylin.measure;
 
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 
-import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.model.MeasureDesc;
-
 /**
  * This class embeds a reusable byte buffer for measure encoding, and is not thread-safe.
  * The buffer will grow to accommodate BufferOverflowException until a limit.
  * The problem here to solve is some measure type cannot provide accurate DataTypeSerializer.maxLength()
  */
 @SuppressWarnings({ "unchecked" })
-public class BufferedMeasureCodec {
+public class BufferedMeasureCodec implements java.io.Serializable {
     public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024; // 1 MB
     public static final int MAX_BUFFER_SIZE = 1 * 1024 * DEFAULT_BUFFER_SIZE; // 1 GB
 
     final private MeasureCodec codec;
 
-    private ByteBuffer buf;
+    private transient ByteBuffer buf;
     final private int[] measureSizes;
 
     public BufferedMeasureCodec(Collection<MeasureDesc> measureDescs) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
index edaf806..2d73e59 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
@@ -18,19 +18,19 @@
 
 package org.apache.kylin.measure;
 
-import java.nio.ByteBuffer;
-import java.util.Collection;
-
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.datatype.DataTypeSerializer;
 import org.apache.kylin.metadata.model.MeasureDesc;
 
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
 /**
  * @author yangli9
  * 
  */
 @SuppressWarnings({ "rawtypes" })
-public class MeasureCodec {
+public class MeasureCodec implements java.io.Serializable {
 
     private int nMeasures;
     private DataTypeSerializer[] serializers;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
index 0076252..26b7298 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
@@ -18,14 +18,14 @@
 
 package org.apache.kylin.measure;
 
-import java.util.Collection;
-import java.util.Map;
-
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 
-abstract public class MeasureIngester<V> {
+import java.util.Collection;
+import java.util.Map;
+
+abstract public class MeasureIngester<V> implements java.io.Serializable {
 
     public static MeasureIngester<?> create(MeasureDesc measure) {
         return measure.getFunction().getMeasureType().newIngester();
@@ -42,6 +42,10 @@ abstract public class MeasureIngester<V> {
 
     abstract public V valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap);
 
+    public void reset() {
+
+    }
+
     public V reEncodeDictionary(V value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> newDicts) {
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
index de1b442..c1b38c6 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
@@ -18,11 +18,6 @@
 
 package org.apache.kylin.measure;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
@@ -32,13 +27,18 @@ import org.apache.kylin.metadata.realization.SQLDigest;
 import org.apache.kylin.metadata.tuple.Tuple;
 import org.apache.kylin.metadata.tuple.TupleInfo;
 
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
 /**
  * MeasureType captures how a kind of aggregation is defined, how it is calculated 
  * during cube build, and how it is involved in query and storage scan.
  * 
  * @param <T> the Java type of aggregation data object, e.g. HyperLogLogPlusCounter
  */
-abstract public class MeasureType<T> {
+abstract public class MeasureType<T> implements java.io.Serializable {
 
     /* ============================================================================
      * Define

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
index 8356faa..27ee412 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
@@ -43,4 +43,9 @@ public class DoubleIngester extends MeasureIngester<DoubleMutable> {
             l.set(Double.parseDouble(values[0]));
         return l;
     }
+
+    @Override
+    public void reset() {
+        current = new DoubleMutable();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java
index f276817..29eb787 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java
@@ -30,7 +30,8 @@ public class DoubleSumAggregator extends MeasureAggregator<DoubleMutable> {
 
     @Override
     public void reset() {
-        sum.set(0.0);
+//        sum.set(0.0);
+        sum = new DoubleMutable();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
index bfe6fe8..7969114 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
@@ -43,4 +43,9 @@ public class LongIngester extends MeasureIngester<LongMutable> {
             l.set(Long.parseLong(values[0]));
         return l;
     }
+
+    @Override
+    public void reset() {
+        current = new LongMutable();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java
index e7fdc9d..0a56af8 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java
@@ -30,7 +30,8 @@ public class LongSumAggregator extends MeasureAggregator<LongMutable> {
 
     @Override
     public void reset() {
-        sum.set(0);
+//        sum.set(0);
+        sum = new LongMutable();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java
index 827390d..caab094 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapCounter.java
@@ -18,6 +18,9 @@
 
 package org.apache.kylin.measure.bitmap;
 
+import org.apache.commons.io.IOUtils;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -26,13 +29,10 @@ import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
 
-import org.apache.commons.io.IOUtils;
-import org.roaringbitmap.buffer.MutableRoaringBitmap;
-
 /**
  * Created by sunyerui on 15/12/1.
  */
-public class BitmapCounter implements Comparable<BitmapCounter> {
+public class BitmapCounter implements Comparable<BitmapCounter>, java.io.Serializable {
 
     private MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
index 8e2b2f7..6ad82a1 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
@@ -138,6 +138,11 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> {
                 }
                 return retValue;
             }
+
+            @Override
+            public void reset() {
+                current = new BitmapCounter();
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
index 0e58dca..0ba8aa4 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
@@ -105,6 +105,11 @@ public class HLLCMeasureType extends MeasureType<HyperLogLogPlusCounter> {
                 }
                 return hllc;
             }
+
+            @Override
+            public void reset() {
+                current = new HyperLogLogPlusCounter(dataType.getPrecision());
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
index 4d08b6f..ab0c33e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
@@ -18,21 +18,18 @@
 
 package org.apache.kylin.measure.hllc;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.datatype.DataTypeSerializer;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
 /**
  * @author yangli9
  * 
  */
 public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounter> {
 
-    // be thread-safe and avoid repeated obj creation
-    private ThreadLocal<HyperLogLogPlusCounter> current = new ThreadLocal<HyperLogLogPlusCounter>();
-
     private int precision;
 
     public HLLCSerializer(DataType type) {
@@ -49,7 +46,7 @@ public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounter> {
     }
 
     private HyperLogLogPlusCounter current() {
-        HyperLogLogPlusCounter hllc = current.get();
+        HyperLogLogPlusCounter hllc = (HyperLogLogPlusCounter) current.get();
         if (hllc == null) {
             hllc = new HyperLogLogPlusCounter(precision);
             current.set(hllc);

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawSerializer.java
index 021c146..68a0273 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawSerializer.java
@@ -34,13 +34,11 @@ public class RawSerializer extends DataTypeSerializer<List<ByteArray>> {
     //FIXME to config this and RowConstants.ROWVALUE_BUFFER_SIZE in properties file
     public static final int RAW_BUFFER_SIZE = 1024 * 1024;//1M
 
-    private ThreadLocal<List<ByteArray>> current = new ThreadLocal<>();
-
     public RawSerializer(DataType dataType) {
     }
 
     private List<ByteArray> current() {
-        List<ByteArray> l = current.get();
+        List<ByteArray> l = (List<ByteArray>) current.get();
         if (l == null) {
             l = new ArrayList<ByteArray>();
             current.set(l);

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
index caf7961..5e4b91e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
@@ -40,7 +40,7 @@ import com.google.common.collect.Maps;
  *
  * @param <T> type of data in the stream to be summarized
  */
-public class TopNCounter<T> implements Iterable<Counter<T>> {
+public class TopNCounter<T> implements Iterable<Counter<T>>, java.io.Serializable {
 
     public static final int EXTRA_SPACE_RATE = 50;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
index c29af6c..8c8b5a6 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Dictionary;
@@ -46,8 +47,6 @@ import org.apache.kylin.metadata.tuple.TupleInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-
 public class TopNMeasureType extends MeasureType<TopNCounter<ByteArray>> {
 
     private static final Logger logger = LoggerFactory.getLogger(TopNMeasureType.class);
@@ -156,6 +155,7 @@ public class TopNMeasureType extends MeasureType<TopNCounter<ByteArray>> {
                 return topNCounter;
             }
 
+
             @Override
             public TopNCounter<ByteArray> reEncodeDictionary(TopNCounter<ByteArray> value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> newDicts) {
                 TopNCounter<ByteArray> topNCounter = value;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java
index 64968b8..b5043f5 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java
@@ -35,7 +35,7 @@ public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> {
     private static final Logger logger = LoggerFactory.getLogger(BigDecimalSerializer.class);
 
     final DataType type;
-    transient final int maxLength;
+    final int maxLength;
 
     transient int avoidVerbose = 0;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BooleanSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BooleanSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BooleanSerializer.java
index acb6de1..998e5e9 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BooleanSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BooleanSerializer.java
@@ -18,18 +18,15 @@
 
 package org.apache.kylin.metadata.datatype;
 
-import java.nio.ByteBuffer;
-
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.BooleanUtils;
 
+import java.nio.ByteBuffer;
+
 public class BooleanSerializer extends DataTypeSerializer<LongMutable> {
 
     public final static String[] TRUE_VALUE_SET = { "true", "t", "on", "yes" };
 
-    // be thread-safe and avoid repeated obj creation
-    private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>();
-
     public BooleanSerializer(DataType type) {
     }
 
@@ -39,7 +36,7 @@ public class BooleanSerializer extends DataTypeSerializer<LongMutable> {
     }
 
     private LongMutable current() {
-        LongMutable l = current.get();
+        LongMutable l = (LongMutable) current.get();
         if (l == null) {
             l = new LongMutable();
             current.set(l);

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
index a739377..a4a35a4 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
@@ -18,19 +18,21 @@
 
 package org.apache.kylin.metadata.datatype;
 
-import java.nio.ByteBuffer;
-import java.util.Map;
-
+import com.google.common.collect.Maps;
 import org.apache.kylin.common.util.BytesSerializer;
 
-import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.nio.ByteBuffer;
+import java.util.Map;
 
 /**
  * Note: the implementations MUST be thread-safe.
  */
-abstract public class DataTypeSerializer<T> implements BytesSerializer<T> {
+abstract public class DataTypeSerializer<T> implements BytesSerializer<T>, java.io.Serializable {
 
     final static Map<String, Class<?>> implementations = Maps.newHashMap();
+    protected transient ThreadLocal current = new ThreadLocal();
     static {
         implementations.put("char", StringSerializer.class);
         implementations.put("varchar", StringSerializer.class);
@@ -94,4 +96,9 @@ abstract public class DataTypeSerializer<T> implements BytesSerializer<T> {
         else
             return value.toString();
     }
+
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+        in.defaultReadObject();
+        current = new ThreadLocal();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java
index 07f98b3..a5719bd 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java
@@ -18,14 +18,11 @@
 
 package org.apache.kylin.metadata.datatype;
 
-import java.nio.ByteBuffer;
-
 import org.apache.kylin.common.util.DateFormat;
 
-public class DateTimeSerializer extends DataTypeSerializer<LongMutable> {
+import java.nio.ByteBuffer;
 
-    // be thread-safe and avoid repeated obj creation
-    private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>();
+public class DateTimeSerializer extends DataTypeSerializer<LongMutable> {
 
     public DateTimeSerializer(DataType type) {
     }
@@ -36,7 +33,7 @@ public class DateTimeSerializer extends DataTypeSerializer<LongMutable> {
     }
 
     private LongMutable current() {
-        LongMutable l = current.get();
+        LongMutable l = (LongMutable) current.get();
         if (l == null) {
             l = new LongMutable();
             current.set(l);

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java
index 976dc51..cda4ff5 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java
@@ -24,9 +24,6 @@ import java.nio.ByteBuffer;
  */
 public class DoubleSerializer extends DataTypeSerializer<DoubleMutable> {
 
-    // be thread-safe and avoid repeated obj creation
-    private ThreadLocal<DoubleMutable> current = new ThreadLocal<DoubleMutable>();
-
     public DoubleSerializer(DataType type) {
     }
 
@@ -36,7 +33,7 @@ public class DoubleSerializer extends DataTypeSerializer<DoubleMutable> {
     }
 
     private DoubleMutable current() {
-        DoubleMutable d = current.get();
+        DoubleMutable d = (DoubleMutable) current.get();
         if (d == null) {
             d = new DoubleMutable();
             current.set(d);

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Int4Serializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Int4Serializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Int4Serializer.java
index 7b95505..c726ab6 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Int4Serializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Int4Serializer.java
@@ -18,17 +18,14 @@
 
 package org.apache.kylin.metadata.datatype;
 
-import java.nio.ByteBuffer;
-
 import org.apache.kylin.common.util.BytesUtil;
 
+import java.nio.ByteBuffer;
+
 /**
  */
 public class Int4Serializer extends DataTypeSerializer<IntMutable> {
 
-    // be thread-safe and avoid repeated obj creation
-    private ThreadLocal<IntMutable> current = new ThreadLocal<IntMutable>();
-
     public Int4Serializer(DataType type) {
     }
 
@@ -38,7 +35,7 @@ public class Int4Serializer extends DataTypeSerializer<IntMutable> {
     }
 
     private IntMutable current() {
-        IntMutable l = current.get();
+        IntMutable l = (IntMutable) current.get();
         if (l == null) {
             l = new IntMutable();
             current.set(l);

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Long8Serializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Long8Serializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Long8Serializer.java
index fa333b2..186cdb9 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Long8Serializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/Long8Serializer.java
@@ -18,17 +18,14 @@
 
 package org.apache.kylin.metadata.datatype;
 
-import java.nio.ByteBuffer;
-
 import org.apache.kylin.common.util.BytesUtil;
 
+import java.nio.ByteBuffer;
+
 /**
  */
 public class Long8Serializer extends DataTypeSerializer<LongMutable> {
 
-    // be thread-safe and avoid repeated obj creation
-    private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>();
-
     public Long8Serializer(DataType type) {
     }
 
@@ -38,7 +35,7 @@ public class Long8Serializer extends DataTypeSerializer<LongMutable> {
     }
 
     private LongMutable current() {
-        LongMutable l = current.get();
+        LongMutable l = (LongMutable) current.get();
         if (l == null) {
             l = new LongMutable();
             current.set(l);

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java
index 9306a70..d8f3f37 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java
@@ -18,17 +18,14 @@
 
 package org.apache.kylin.metadata.datatype;
 
-import java.nio.ByteBuffer;
-
 import org.apache.kylin.common.util.BytesUtil;
 
+import java.nio.ByteBuffer;
+
 /**
  */
 public class LongSerializer extends DataTypeSerializer<LongMutable> {
 
-    // be thread-safe and avoid repeated obj creation
-    private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>();
-
     public LongSerializer(DataType type) {
     }
 
@@ -38,7 +35,7 @@ public class LongSerializer extends DataTypeSerializer<LongMutable> {
     }
 
     private LongMutable current() {
-        LongMutable l = current.get();
+        LongMutable l = (LongMutable) current.get();
         if (l == null) {
             l = new LongMutable();
             current.set(l);


[2/3] kylin git commit: spark cubing init commit

Posted by sh...@apache.org.
http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index ac13f40..2d3d906 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -18,27 +18,27 @@
 
 package org.apache.kylin.metadata.model;
 
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.measure.MeasureTypeFactory;
-import org.apache.kylin.measure.basic.BasicMeasureType;
-import org.apache.kylin.metadata.datatype.DataType;
-
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.MeasureTypeFactory;
+import org.apache.kylin.measure.basic.BasicMeasureType;
+import org.apache.kylin.metadata.datatype.DataType;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
 
 /**
  */
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class FunctionDesc {
+public class FunctionDesc implements Serializable {
 
     public static FunctionDesc newInstance(String expression, ParameterDesc param, String returnType) {
         FunctionDesc r = new FunctionDesc();

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java
index 6489244..dd1500b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java
@@ -18,17 +18,18 @@
 
 package org.apache.kylin.metadata.model;
 
-import java.util.Arrays;
-
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 
+import java.io.Serializable;
+import java.util.Arrays;
+
 /**
  */
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class JoinDesc {
+public class JoinDesc implements Serializable {
 
     // inner, left, right, outer...
     @JsonProperty("type")

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java
index 5d0409a..51e5787 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java
@@ -25,8 +25,10 @@ import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.io.Serializable;
+
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class JoinTableDesc {
+public class JoinTableDesc implements Serializable {
 
     @JsonProperty("table")
     private String table;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
index a0b267d..c132d0e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java
@@ -18,15 +18,16 @@
 
 package org.apache.kylin.metadata.model;
 
+import com.google.common.base.Preconditions;
+
+import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-import com.google.common.base.Preconditions;
-
-public class JoinsTree {
+public class JoinsTree  implements Serializable {
 
     final Map<String, Chain> tableChains = new LinkedHashMap<>();
 
@@ -111,7 +112,7 @@ public class JoinsTree {
             return chain.join;
     }
 
-    static class Chain {
+    static class Chain implements java.io.Serializable {
         TableRef table; // pk side
         JoinDesc join;
         Chain fkSide;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
index 253b06b..c0719d2 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
@@ -18,18 +18,19 @@
 
 package org.apache.kylin.metadata.model;
 
-import java.util.Objects;
-
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.io.Serializable;
+import java.util.Objects;
+
 /**
  */
 
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class MeasureDesc {
+public class MeasureDesc implements Serializable {
 
     @JsonProperty("name")
     private String name;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/metadata/model/ModelDimensionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ModelDimensionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ModelDimensionDesc.java
index 6460f71..3c5c5f1 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ModelDimensionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ModelDimensionDesc.java
@@ -18,17 +18,17 @@
 
 package org.apache.kylin.metadata.model;
 
-import java.util.List;
-
-import org.apache.kylin.common.util.StringUtil;
-
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kylin.common.util.StringUtil;
+
+import java.io.Serializable;
+import java.util.List;
 
 /**
  */
 @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
-public class ModelDimensionDesc {
+public class ModelDimensionDesc implements Serializable {
     @JsonProperty("table")
     private String table;
     @JsonProperty("columns")

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
index c14d061..8ad20a8 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
@@ -18,21 +18,22 @@
 
 package org.apache.kylin.metadata.model;
 
-import java.io.UnsupportedEncodingException;
-import java.util.Arrays;
-import java.util.List;
-
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.List;
+
 /**
  */
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class ParameterDesc {
+public class ParameterDesc implements Serializable {
 
     public static ParameterDesc newInstance(Object... objs) {
         if (objs.length == 0)

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
index 9925990..c6e6425 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java
@@ -27,10 +27,12 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.io.Serializable;
+
 /**
  */
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class PartitionDesc {
+public class PartitionDesc implements Serializable {
 
     public static enum PartitionType {
         APPEND, //
@@ -175,7 +177,7 @@ public class PartitionDesc {
         String buildDateRangeCondition(PartitionDesc partDesc, long startInclusive, long endExclusive);
     }
 
-    public static class DefaultPartitionConditionBuilder implements IPartitionConditionBuilder {
+    public static class DefaultPartitionConditionBuilder implements IPartitionConditionBuilder, java.io.Serializable {
 
         @Override
         public String buildDateRangeCondition(PartitionDesc partDesc, long startInclusive, long endExclusive) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java
index 7089eba..0d9b442 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java
@@ -18,25 +18,28 @@
 
 package org.apache.kylin.metadata.model;
 
+import com.google.common.collect.Maps;
+
+import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 
-import com.google.common.collect.Maps;
+public class TableRef implements Serializable {
 
-public class TableRef {
-
-    final private DataModelDesc model;
+    final transient private DataModelDesc model;
     final private String alias;
     final private TableDesc table;
     final private Map<String, TblColRef> columns;
+    final private String modelName;
 
     TableRef(DataModelDesc model, String alias, TableDesc table) {
         this.model = model;
+        this.modelName = model.getName();
         this.alias = alias;
         this.table = table;
         this.columns = Maps.newLinkedHashMap();
-        
+
         for (ColumnDesc col : table.getColumns()) {
             columns.put(col.getName(), new TblColRef(this, col));
         }
@@ -94,7 +97,7 @@ public class TableRef {
 
         TableRef t = (TableRef) o;
 
-        if ((model == null ? t.model == null : model.getName().equals(t.model.getName())) == false)
+        if ((modelName == null ? t.modelName != null : modelName.equals(t.modelName)) == false)
             return false;
         if ((alias == null ? t.alias == null : alias.equals(t.alias)) == false)
             return false;
@@ -107,7 +110,7 @@ public class TableRef {
     @Override
     public int hashCode() {
         int result = 0;
-        result = 31 * result + model.getName().hashCode();
+        result = 31 * result + modelName.hashCode();
         result = 31 * result + alias.hashCode();
         result = 31 * result + table.getIdentity().hashCode();
         return result;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 6eba3c2..0da2a8e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -32,6 +32,7 @@ import org.apache.kylin.engine.mr.steps.NDCuboidJob;
 import org.apache.kylin.engine.mr.steps.SaveStatisticsStep;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,7 +77,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         return result;
     }
 
-    private void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
+    protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
         RowKeyDesc rowKeyDesc = ((CubeSegment) seg).getCubeDesc().getRowkey();
         final int groupRowkeyColumnsCount = ((CubeSegment) seg).getCubeDesc().getBuildLevel();
         final int totalRowkeyColumnsCount = rowKeyDesc.getRowKeyColumns().length;
@@ -100,7 +101,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         return result;
     }
 
-    private MapReduceExecutable createInMemCubingStep(String jobId, String cuboidRootPath) {
+    protected AbstractExecutable createInMemCubingStep(String jobId, String cuboidRootPath) {
         // base cuboid job
         MapReduceExecutable cubeStep = new MapReduceExecutable();
 
@@ -117,7 +118,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
 
         cubeStep.setMapReduceParams(cmd.toString());
         cubeStep.setMapReduceJobClass(getInMemCuboidJob());
-        cubeStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
+//        cubeStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
         return cubeStep;
     }
 
@@ -144,7 +145,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
 
         baseCuboidStep.setMapReduceParams(cmd.toString());
         baseCuboidStep.setMapReduceJobClass(getBaseCuboidJob());
-        baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
+//        baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
         return baseCuboidStep;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 47eb9c3..f070b7f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -75,6 +75,7 @@ public class JobBuilderSupport {
         appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Fact_Distinct_Columns_" + seg.getRealization().getName() + "_Step");
         appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
         result.setMapReduceParams(cmd.toString());
+        result.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
         return result;
     }
 
@@ -194,7 +195,7 @@ public class JobBuilderSupport {
         return buf.append(" -").append(paraName).append(" ").append(paraValue);
     }
 
-    public String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) {
+    public static String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) {
         String[] paths = new String[groupRowkeyColumnsCount + 1];
         for (int i = 0; i <= groupRowkeyColumnsCount; i++) {
             int dimNum = totalRowkeyColumnCount - i;
@@ -207,4 +208,13 @@ public class JobBuilderSupport {
         return paths;
     }
 
+    public static String getCuboidOutputPathsByLevel(String cuboidRootPath, int level) {
+        if (level == 0) {
+            return cuboidRootPath + "base_cuboid";
+        } else {
+            return cuboidRootPath + level + "level_cuboid";
+        }
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
new file mode 100644
index 0000000..8d8496c
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import com.google.common.collect.Sets;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.ParameterDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ */
+public class BaseCuboidBuilder implements java.io.Serializable {
+
+    protected static final Logger logger = LoggerFactory.getLogger(BaseCuboidBuilder.class);
+    public static final String HIVE_NULL = "\\N";
+    public static final byte[] ONE = Bytes.toBytes("1");
+    protected String cubeName;
+    protected Cuboid baseCuboid;
+    protected CubeDesc cubeDesc;
+    protected CubeSegment cubeSegment;
+    protected Set<String> nullStrs;
+    protected CubeJoinedFlatTableEnrich intermediateTableDesc;
+    protected MeasureIngester<?>[] aggrIngesters;
+    protected Map<TblColRef, Dictionary<String>> dictionaryMap;
+    protected AbstractRowKeyEncoder rowKeyEncoder;
+    protected BufferedMeasureCodec measureCodec;
+
+    protected KylinConfig kylinConfig;
+
+    public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, CubeSegment cubeSegment, CubeJoinedFlatTableEnrich intermediateTableDesc,
+                             AbstractRowKeyEncoder rowKeyEncoder, BufferedMeasureCodec measureCodec, MeasureIngester<?>[] aggrIngesters, Map<TblColRef, Dictionary<String>> dictionaryMap) {
+        this.kylinConfig = kylinConfig;
+        this.cubeDesc = cubeDesc;
+        this.cubeSegment = cubeSegment;
+        this.intermediateTableDesc = intermediateTableDesc;
+        this.rowKeyEncoder = rowKeyEncoder;
+        this.measureCodec = measureCodec;
+        this.aggrIngesters = aggrIngesters;
+        this.dictionaryMap = dictionaryMap;
+
+        init();
+    }
+
+    public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, CubeSegment cubeSegment, CubeJoinedFlatTableEnrich intermediateTableDesc) {
+        this.kylinConfig = kylinConfig;
+        this.cubeDesc = cubeDesc;
+        this.cubeSegment = cubeSegment;
+        this.intermediateTableDesc = intermediateTableDesc;
+
+        rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid);
+        measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures());
+        aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures());
+        dictionaryMap = cubeSegment.buildDictionaryMap();
+
+        init();
+    }
+
+    private void init() {
+        long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+        baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
+        initNullBytes();
+    }
+
+    private void initNullBytes() {
+        nullStrs = Sets.newHashSet();
+        nullStrs.add(HIVE_NULL);
+        String[] nullStrings = cubeDesc.getNullStrings();
+        if (nullStrings != null) {
+            for (String s : nullStrings) {
+                nullStrs.add(s);
+            }
+        }
+    }
+
+    protected boolean isNull(String v) {
+        return nullStrs.contains(v);
+    }
+
+    public byte[] buildKey(String[] flatRow) {
+        int[] rowKeyColumnIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
+        List<TblColRef> columns = baseCuboid.getColumns();
+        String[] colValues = new String[columns.size()];
+        for (int i = 0; i < columns.size(); i++) {
+            colValues[i] = getCell(rowKeyColumnIndexes[i], flatRow);
+        }
+        return rowKeyEncoder.encode(colValues);
+    }
+
+    public ByteBuffer buildValue(String[] flatRow) {
+        return measureCodec.encode(buildValueObjects(flatRow));
+    }
+
+    public Object[] buildValueObjects(String[] flatRow) {
+        Object[] measures = new Object[cubeDesc.getMeasures().size()];
+        for (int i = 0; i < measures.length; i++) {
+            measures[i] = buildValueOf(i, flatRow);
+        }
+
+        return measures;
+    }
+
+    public void resetAggrs() {
+        for (int i = 0; i < cubeDesc.getMeasures().size(); i++) {
+            aggrIngesters[i].reset();
+        }
+    }
+
+    private Object buildValueOf(int idxOfMeasure, String[] flatRow) {
+        MeasureDesc measure = cubeDesc.getMeasures().get(idxOfMeasure);
+        FunctionDesc function = measure.getFunction();
+        int[] colIdxOnFlatTable = intermediateTableDesc.getMeasureColumnIndexes()[idxOfMeasure];
+
+        int paramCount = function.getParameterCount();
+        String[] inputToMeasure = new String[paramCount];
+
+        // pick up parameter values
+        ParameterDesc param = function.getParameter();
+        int colParamIdx = 0; // index among parameters of column type
+        for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) {
+            String value;
+            if (function.isCount()) {
+                value = "1";
+            } else if (param.isColumnType()) {
+                value = getCell(colIdxOnFlatTable[colParamIdx++], flatRow);
+            } else {
+                value = param.getValue();
+            }
+            inputToMeasure[i] = value;
+        }
+
+        return aggrIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap);
+    }
+
+    private String getCell(int i, String[] flatRow) {
+        if (isNull(flatRow[i]))
+            return null;
+        else
+            return flatRow[i];
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java
new file mode 100644
index 0000000..4e98618
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.SplittedBytes;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.common.RowKeySplitter;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.kv.RowKeyEncoder;
+import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ */
+public class NDCuboidBuilder implements Serializable {
+
+    protected static final Logger logger = LoggerFactory.getLogger(NDCuboidBuilder.class);
+    protected String cubeName;
+    protected String segmentID;
+    protected CubeSegment cubeSegment;
+    private RowKeySplitter rowKeySplitter;
+    private RowKeyEncoderProvider rowKeyEncoderProvider;
+    private byte[] newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE];
+    private ByteArray newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE);
+
+    public NDCuboidBuilder(CubeSegment cubeSegment) {
+        this.cubeSegment = cubeSegment;
+        this.rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256);
+        this.rowKeyEncoderProvider = new RowKeyEncoderProvider(cubeSegment);
+    }
+
+    public NDCuboidBuilder(CubeSegment cubeSegment, RowKeyEncoderProvider rowKeyEncoderProvider) {
+        this.cubeSegment = cubeSegment;
+        this.rowKeyEncoderProvider = rowKeyEncoderProvider;
+        this.rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256);
+    }
+
+
+    public Pair<Integer, ByteArray> buildKey(Cuboid parentCuboid, Cuboid childCuboid, SplittedBytes[] splitBuffers) {
+        RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(childCuboid);
+
+        int offset = 0;
+
+        // rowkey columns
+        long mask = Long.highestOneBit(parentCuboid.getId());
+        long parentCuboidId = parentCuboid.getId();
+        long childCuboidId = childCuboid.getId();
+        long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboid.getId());
+        int index = rowKeySplitter.getBodySplitOffset(); // skip shard and cuboidId
+        for (int i = 0; i < parentCuboidIdActualLength; i++) {
+            if ((mask & parentCuboidId) > 0) {// if the this bit position equals
+                // 1
+                if ((mask & childCuboidId) > 0) {// if the child cuboid has this
+                    // column
+                    System.arraycopy(splitBuffers[index].value, 0, newKeyBodyBuf, offset, splitBuffers[index].length);
+                    offset += splitBuffers[index].length;
+                }
+                index++;
+            }
+            mask = mask >> 1;
+        }
+
+        int fullKeySize = rowkeyEncoder.getBytesLength();
+        while (newKeyBuf.array().length < fullKeySize) {
+            newKeyBuf.set(new byte[newKeyBuf.length() * 2]);
+        }
+        newKeyBuf.set(0, fullKeySize);
+
+        rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, offset), newKeyBuf);
+
+        return new Pair<>(Integer.valueOf(fullKeySize), newKeyBuf);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index 7b719e0..d08e29a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -18,38 +18,25 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BaseCuboidBuilder;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.measure.BufferedMeasureCodec;
-import org.apache.kylin.measure.MeasureIngester;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.ParameterDesc;
-import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
 
 /**
  */
@@ -59,131 +46,37 @@ abstract public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
     public static final byte[] ONE = Bytes.toBytes("1");
     protected String cubeName;
     protected String segmentID;
-    protected Cuboid baseCuboid;
     protected CubeInstance cube;
     protected CubeDesc cubeDesc;
     protected CubeSegment cubeSegment;
-    protected Set<String> nullStrs;
-    protected CubeJoinedFlatTableEnrich intermediateTableDesc;
-    protected String intermediateTableRowDelimiter;
-    protected byte byteRowDelimiter;
     protected int counter;
-    protected MeasureIngester<?>[] aggrIngesters;
-    protected Map<TblColRef, Dictionary<String>> dictionaryMap;
     protected Object[] measures;
-    protected AbstractRowKeyEncoder rowKeyEncoder;
-    protected BufferedMeasureCodec measureCodec;
     private int errorRecordCounter;
     protected Text outputKey = new Text();
     protected Text outputValue = new Text();
 
+    private BaseCuboidBuilder baseCuboidBuilder;
+
     @Override
     protected void setup(Context context) throws IOException {
         super.bindCurrentConfiguration(context.getConfiguration());
-
         cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
         segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
-        intermediateTableRowDelimiter = context.getConfiguration().get(BatchConstants.CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER, Character.toString(BatchConstants.INTERMEDIATE_TABLE_ROW_DELIMITER));
-        if (Bytes.toBytes(intermediateTableRowDelimiter).length > 1) {
-            throw new RuntimeException("Expected delimiter byte length is 1, but got " + Bytes.toBytes(intermediateTableRowDelimiter).length);
-        }
-
-        byteRowDelimiter = Bytes.toBytes(intermediateTableRowDelimiter)[0];
-
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-
-        cube = CubeManager.getInstance(config).getCube(cubeName);
+        final KylinConfig kylinConfig = AbstractHadoopJob.loadKylinPropsAndMetadata();
+        cube = CubeManager.getInstance(kylinConfig).getCube(cubeName);
         cubeDesc = cube.getDescriptor();
         cubeSegment = cube.getSegmentById(segmentID);
+        CubeJoinedFlatTableEnrich intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
+        baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, cubeDesc, cubeSegment, intermediateTableDesc);
 
-        long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-
-        intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
-
-        rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid);
-
-        measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures());
-        measures = new Object[cubeDesc.getMeasures().size()];
-
-        aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures());
-        dictionaryMap = cubeSegment.buildDictionaryMap();
-
-        initNullBytes();
-    }
-
-    private void initNullBytes() {
-        nullStrs = Sets.newHashSet();
-        nullStrs.add(HIVE_NULL);
-        String[] nullStrings = cubeDesc.getNullStrings();
-        if (nullStrings != null) {
-            for (String s : nullStrings) {
-                nullStrs.add(s);
-            }
-        }
-    }
-
-    protected boolean isNull(String v) {
-        return nullStrs.contains(v);
-    }
-
-    protected byte[] buildKey(String[] flatRow) {
-        int[] rowKeyColumnIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
-        List<TblColRef> columns = baseCuboid.getColumns();
-        String[] colValues = new String[columns.size()];
-        for (int i = 0; i < columns.size(); i++) {
-            colValues[i] = getCell(rowKeyColumnIndexes[i], flatRow);
-        }
-        return rowKeyEncoder.encode(colValues);
     }
 
-    private ByteBuffer buildValue(String[] flatRow) {
-
-        for (int i = 0; i < measures.length; i++) {
-            measures[i] = buildValueOf(i, flatRow);
-        }
-
-        return measureCodec.encode(measures);
-    }
-
-    private Object buildValueOf(int idxOfMeasure, String[] flatRow) {
-        MeasureDesc measure = cubeDesc.getMeasures().get(idxOfMeasure);
-        FunctionDesc function = measure.getFunction();
-        int[] colIdxOnFlatTable = intermediateTableDesc.getMeasureColumnIndexes()[idxOfMeasure];
-
-        int paramCount = function.getParameterCount();
-        String[] inputToMeasure = new String[paramCount];
-
-        // pick up parameter values
-        ParameterDesc param = function.getParameter();
-        int colParamIdx = 0; // index among parameters of column type
-        for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) {
-            String value;
-            if (function.isCount()) {
-                value = "1";
-            } else if (param.isColumnType()) {
-                value = getCell(colIdxOnFlatTable[colParamIdx++], flatRow);
-            } else {
-                value = param.getValue();
-            }
-            inputToMeasure[i] = value;
-        }
-
-        return aggrIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap);
-    }
-
-    private String getCell(int i, String[] flatRow) {
-        if (isNull(flatRow[i]))
-            return null;
-        else
-            return flatRow[i];
-    }
 
     protected void outputKV(String[] flatRow, Context context) throws IOException, InterruptedException {
-        byte[] rowKey = buildKey(flatRow);
+        byte[] rowKey = baseCuboidBuilder.buildKey(flatRow);
         outputKey.set(rowKey, 0, rowKey.length);
 
-        ByteBuffer valueBuf = buildValue(flatRow);
+        ByteBuffer valueBuf = baseCuboidBuilder.buildValue(flatRow);
         outputValue.set(valueBuf.array(), 0, valueBuf.position());
         context.write(outputKey, outputValue);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
index 01cdd4a..b924edc 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
@@ -18,29 +18,27 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import java.io.IOException;
-import java.util.Collection;
-
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.SplittedBytes;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.common.RowKeySplitter;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.kv.RowKeyEncoder;
-import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.NDCuboidBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.Collection;
+
 /**
  * @author George Song (ysong1)
  * 
@@ -59,10 +57,9 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
     private int handleCounter;
     private int skipCounter;
 
-    private byte[] newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE];
-    private ByteArray newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE);
     private RowKeySplitter rowKeySplitter;
-    private RowKeyEncoderProvider rowKeyEncoderProvider;
+
+    private NDCuboidBuilder ndCuboidBuilder;
 
     @Override
     protected void setup(Context context) throws IOException {
@@ -76,48 +73,13 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
         cubeSegment = cube.getSegmentById(segmentID);
         cubeDesc = cube.getDescriptor();
-
+        ndCuboidBuilder = new NDCuboidBuilder(cubeSegment);
         // initialize CubiodScheduler
         cuboidScheduler = new CuboidScheduler(cubeDesc);
-
         rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256);
-        rowKeyEncoderProvider = new RowKeyEncoderProvider(cubeSegment);
     }
 
-    private int buildKey(Cuboid parentCuboid, Cuboid childCuboid, SplittedBytes[] splitBuffers) {
-        RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(childCuboid);
-
-        int offset = 0;
-
-        // rowkey columns
-        long mask = Long.highestOneBit(parentCuboid.getId());
-        long parentCuboidId = parentCuboid.getId();
-        long childCuboidId = childCuboid.getId();
-        long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboid.getId());
-        int index = rowKeySplitter.getBodySplitOffset(); // skip shard and cuboidId
-        for (int i = 0; i < parentCuboidIdActualLength; i++) {
-            if ((mask & parentCuboidId) > 0) {// if the this bit position equals
-                                                  // 1
-                if ((mask & childCuboidId) > 0) {// if the child cuboid has this
-                                                     // column
-                    System.arraycopy(splitBuffers[index].value, 0, newKeyBodyBuf, offset, splitBuffers[index].length);
-                    offset += splitBuffers[index].length;
-                }
-                index++;
-            }
-            mask = mask >> 1;
-        }
 
-        int fullKeySize = rowkeyEncoder.getBytesLength();
-        while (newKeyBuf.array().length < fullKeySize) {
-            newKeyBuf.set(new byte[newKeyBuf.length() * 2]);
-        }
-        newKeyBuf.set(0, fullKeySize);
-
-        rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, offset), newKeyBuf);
-
-        return fullKeySize;
-    }
 
     @Override
     public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException {
@@ -143,8 +105,8 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
 
         for (Long child : myChildren) {
             Cuboid childCuboid = Cuboid.findById(cubeDesc, child);
-            int fullKeySize = buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers());
-            outputKey.set(newKeyBuf.array(), 0, fullKeySize);
+            Pair<Integer, ByteArray> result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers());
+            outputKey.set(result.getSecond().array(), 0, result.getFirst());
             context.write(outputKey, value);
         }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/engine-spark/pom.xml
----------------------------------------------------------------------
diff --git a/engine-spark/pom.xml b/engine-spark/pom.xml
index a7cffdd..264f4c9 100644
--- a/engine-spark/pom.xml
+++ b/engine-spark/pom.xml
@@ -47,6 +47,11 @@
             <artifactId>kylin-core-job</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-engine-mr</artifactId>
+        </dependency>
+
         <!-- Spark dependency -->
         <dependency>
             <groupId>org.apache.spark</groupId>

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java
new file mode 100644
index 0000000..a7a4151
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.spark;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.MRBatchCubingEngine2;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+/**
+ */
+public class SparkBatchCubingEngine2 extends MRBatchCubingEngine2 {
+    @Override
+    public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
+        return new SparkBatchCubingJobBuilder2(newSegment, submitter).build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
new file mode 100644
index 0000000..57b6432
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.spark;
+
+import org.apache.hadoop.util.ClassUtil;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 {
+
+    private static final Logger logger = LoggerFactory.getLogger(SparkBatchCubingJobBuilder2.class);
+
+    public SparkBatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
+        super(newSegment, submitter);
+    }
+
+    protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
+
+    }
+
+    @Override
+    protected AbstractExecutable createInMemCubingStep(String jobId, String cuboidRootPath) {
+        IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg);
+        final SparkExecutable sparkExecutable = new SparkExecutable();
+        sparkExecutable.setClassName(SparkCubingV3.class.getName());
+        sparkExecutable.setParam(SparkCubingV3.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
+        sparkExecutable.setParam(SparkCubingV3.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+        sparkExecutable.setParam(SparkCubingV3.OPTION_INPUT_PATH.getOpt(), flatTableDesc.getTableName());
+        sparkExecutable.setParam(SparkCubingV3.OPTION_CONF_PATH.getOpt(), "/Users/shishaofeng/workspace/kylin-15/examples/test_case_data/sandbox/"); //FIXME
+        sparkExecutable.setParam(SparkCubingV3.OPTION_OUTPUT_PATH.getOpt(), cuboidRootPath);
+
+        StringBuilder jars = new StringBuilder();
+
+        StringUtil.appendWithSeparator(jars, findJar("org.htrace.HTraceConfiguration")); // htrace-core.jar
+        StringUtil.appendWithSeparator(jars, findJar("org.cloudera.htrace.HTraceConfiguration"));
+        StringUtil.appendWithSeparator(jars, findJar("org.apache.hadoop.hbase.client.HConnection")); // hbase-client.jar
+        StringUtil.appendWithSeparator(jars, findJar("org.apache.hadoop.hbase.HBaseConfiguration")); // hbase-common.jar
+        StringUtil.appendWithSeparator(jars, findJar("org.apache.hadoop.hbase.util.ByteStringer")); // hbase-protocol.jar
+
+        StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars());
+        sparkExecutable.setJars(jars.toString());
+        //        sparkExecutable.setJars("/Users/shishaofeng/.m2/repository/org/cloudera/htrace/htrace-core/2.01/htrace-core-2.01.jar,/Users/shishaofeng/.m2/repository/org/apache/hbase/hbase-protocol/0.98.8-hadoop2/hbase-protocol-0.98.8-hadoop2.jar,/Users/shishaofeng/.m2/repository/org/apache/hbase/hbase-common/0.98.8-hadoop2/hbase-common-0.98.8-hadoop2.jar,/Users/shishaofeng/.m2//repository/org/apache/hbase/hbase-client/0.98.8-hadoop2/hbase-client-0.98.8-hadoop2.jar");
+
+        sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE + " with Spark");
+        return sparkExecutable;
+
+    }
+
+    private String findJar(String className) {
+        try {
+            return ClassUtil.findContainingJar(Class.forName(className));
+        } catch (ClassNotFoundException e) {
+            logger.error("failed to locate jar for class " + className, e);
+        }
+
+        return "";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index 10c74f3..15cd65c 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -83,6 +83,7 @@ import org.apache.kylin.engine.spark.cube.DefaultTupleConverter;
 import org.apache.kylin.engine.spark.util.IteratorUtils;
 import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.MeasureAggregators;
+import org.apache.kylin.measure.MeasureIngester;
 import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
@@ -153,6 +154,20 @@ public class SparkCubing extends AbstractApplication {
         return options;
     }
 
+    public static KylinConfig loadKylinPropsAndMetadata(String folder) throws IOException {
+        File metaDir = new File(folder);
+        if (!metaDir.getAbsolutePath().equals(System.getProperty(KylinConfig.KYLIN_CONF))) {
+            System.setProperty(KylinConfig.KYLIN_CONF, metaDir.getAbsolutePath());
+            logger.info("The absolute path for meta dir is " + metaDir.getAbsolutePath());
+            KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+            System.out.println("setting metadataUrl to " + metaDir.getAbsolutePath());
+            kylinConfig.setMetadataUrl(metaDir.getAbsolutePath());
+            return kylinConfig;
+        } else {
+            return KylinConfig.getInstanceFromEnv();
+        }
+    }
+
     private void setupClasspath(JavaSparkContext sc, String confPath) throws Exception {
         ClassUtil.addClasspath(confPath);
         final File[] files = new File(confPath).listFiles(new FileFilter() {
@@ -462,7 +477,7 @@ public class SparkCubing extends AbstractApplication {
         }).saveAsNewAPIHadoopFile(hFileLocation, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat.class, conf);
     }
 
-    private static void prepare() throws Exception {
+    public static void prepare() throws Exception {
         final File file = new File(SparkFiles.get("kylin.properties"));
         final String confPath = file.getParentFile().getAbsolutePath();
         System.out.println("conf directory:" + confPath);
@@ -526,12 +541,18 @@ public class SparkCubing extends AbstractApplication {
         }
     }
 
-    private Collection<String> getKyroClasses() {
+    public static Collection<String> getKyroClasses() {
         Set<Class> kyroClasses = Sets.newHashSet();
         kyroClasses.addAll(new Reflections("org.apache.kylin").getSubTypesOf(Serializable.class));
+        kyroClasses.addAll(new Reflections("org.apache.kylin.dimension").getSubTypesOf(Serializable.class));
+        kyroClasses.addAll(new Reflections("org.apache.kylin.cube").getSubTypesOf(Serializable.class));
         kyroClasses.addAll(new Reflections("org.apache.kylin.cube.model").getSubTypesOf(Object.class));
+        kyroClasses.addAll(new Reflections("org.apache.kylin.metadata").getSubTypesOf(Object.class));
         kyroClasses.addAll(new Reflections("org.apache.kylin.metadata.model").getSubTypesOf(Object.class));
         kyroClasses.addAll(new Reflections("org.apache.kylin.metadata.measure").getSubTypesOf(Object.class));
+        kyroClasses.addAll(new Reflections("org.apache.kylin.metadata.datatype").getSubTypesOf(org.apache.kylin.common.util.BytesSerializer.class));
+        kyroClasses.addAll(new Reflections("org.apache.kylin.measure").getSubTypesOf(MeasureIngester.class));
+
         kyroClasses.add(HashMap.class);
         kyroClasses.add(org.apache.spark.sql.Row[].class);
         kyroClasses.add(org.apache.spark.sql.Row.class);
@@ -541,11 +562,15 @@ public class SparkCubing extends AbstractApplication {
         kyroClasses.add(org.apache.spark.sql.types.StructField.class);
         kyroClasses.add(org.apache.spark.sql.types.DateType$.class);
         kyroClasses.add(org.apache.spark.sql.types.Metadata.class);
-        kyroClasses.add(Object[].class);
         kyroClasses.add(org.apache.spark.sql.types.StringType$.class);
         kyroClasses.add(Hashing.murmur3_128().getClass());
-        kyroClasses.add(org.apache.spark.sql.columnar.CachedBatch.class);
+        kyroClasses.add(org.apache.spark.sql.execution.columnar.CachedBatch.class);
+        kyroClasses.add(Object[].class);
+        kyroClasses.add(int[].class);
+        kyroClasses.add(byte[].class);
         kyroClasses.add(byte[][].class);
+        kyroClasses.add(String[].class);
+        kyroClasses.add(String[][].class);
         kyroClasses.add(org.apache.spark.sql.types.Decimal.class);
         kyroClasses.add(scala.math.BigDecimal.class);
         kyroClasses.add(java.math.BigDecimal.class);
@@ -553,6 +578,52 @@ public class SparkCubing extends AbstractApplication {
         kyroClasses.add(java.math.RoundingMode.class);
         kyroClasses.add(java.util.ArrayList.class);
         kyroClasses.add(java.util.LinkedList.class);
+        kyroClasses.add(java.util.HashSet.class);
+        kyroClasses.add(java.util.LinkedHashSet.class);
+        kyroClasses.add(java.util.LinkedHashMap.class);
+        kyroClasses.add(java.util.concurrent.ConcurrentHashMap.class);
+
+        kyroClasses.add(java.util.HashMap.class);
+        kyroClasses.add(java.util.Properties.class);
+        kyroClasses.add(org.apache.kylin.metadata.model.ColumnDesc[].class);
+        kyroClasses.add(org.apache.kylin.metadata.model.JoinTableDesc[].class);
+        kyroClasses.add(org.apache.kylin.metadata.model.TblColRef[].class);
+        kyroClasses.add(org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity.class);
+        kyroClasses.add(org.apache.kylin.metadata.model.DataModelDesc.TableKind.class);
+        kyroClasses.add(org.apache.kylin.metadata.model.PartitionDesc.DefaultPartitionConditionBuilder.class);
+        kyroClasses.add(org.apache.kylin.metadata.model.PartitionDesc.PartitionType.class);
+        kyroClasses.add(org.apache.kylin.cube.model.CubeDesc.DeriveInfo.class);
+        kyroClasses.add(org.apache.kylin.cube.model.CubeDesc.DeriveType.class);
+        kyroClasses.add(org.apache.kylin.cube.model.HBaseColumnFamilyDesc[].class);
+        kyroClasses.add(org.apache.kylin.cube.model.HBaseColumnDesc[].class);
+        kyroClasses.add(org.apache.kylin.metadata.model.MeasureDesc[].class);
+        kyroClasses.add(org.apache.kylin.cube.model.RowKeyColDesc[].class);
+        kyroClasses.add(org.apache.kylin.common.util.Array.class);
+        kyroClasses.add(org.apache.kylin.metadata.model.Segments.class);
+        kyroClasses.add(org.apache.kylin.metadata.realization.RealizationStatusEnum.class);
+        kyroClasses.add(org.apache.kylin.metadata.model.SegmentStatusEnum.class);
+        kyroClasses.add(org.apache.kylin.measure.BufferedMeasureCodec.class);
+        kyroClasses.add(org.apache.kylin.cube.kv.RowKeyColumnIO.class);
+        kyroClasses.add(org.apache.kylin.measure.MeasureCodec.class);
+        kyroClasses.add(org.apache.kylin.measure.MeasureAggregator[].class);
+        kyroClasses.add(org.apache.kylin.metadata.datatype.DataTypeSerializer[].class);
+        kyroClasses.add(org.apache.kylin.cube.kv.CubeDimEncMap.class);
+        kyroClasses.add(org.apache.kylin.measure.basic.BasicMeasureType.class);
+        kyroClasses.add(org.apache.kylin.common.util.SplittedBytes[].class);
+        kyroClasses.add(org.apache.kylin.common.util.SplittedBytes.class);
+        kyroClasses.add(org.apache.kylin.cube.kv.RowKeyEncoderProvider.class);
+        kyroClasses.add(org.apache.kylin.cube.kv.RowKeyEncoder.class);
+        kyroClasses.add(org.apache.kylin.measure.basic.BigDecimalIngester.class);
+        kyroClasses.add(org.apache.kylin.dimension.DictionaryDimEnc.class);
+        kyroClasses.add(org.apache.kylin.dimension.IntDimEnc.class);
+        kyroClasses.add(org.apache.kylin.dimension.BooleanDimEnc.class);
+        kyroClasses.add(org.apache.kylin.dimension.DateDimEnc.class);
+        kyroClasses.add(org.apache.kylin.dimension.FixedLenDimEnc.class);
+        kyroClasses.add(org.apache.kylin.dimension.FixedLenHexDimEnc.class);
+        kyroClasses.add(org.apache.kylin.dimension.IntegerDimEnc.class);
+        kyroClasses.add(org.apache.kylin.dimension.OneMoreByteVLongDimEnc.class);
+        kyroClasses.add(org.apache.kylin.dimension.TimeDimEnc.class);
+        kyroClasses.add(org.apache.kylin.cube.model.AggregationGroup.HierarchyMask.class);
 
         ArrayList<String> result = Lists.newArrayList();
         for (Class kyroClass : kyroClasses) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingV3.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingV3.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingV3.java
new file mode 100644
index 0000000..6f2915a
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingV3.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.spark;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.common.RowKeySplitter;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
+import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.common.BaseCuboidBuilder;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.NDCuboidBuilder;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.MeasureAggregators;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkFiles;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.hive.HiveContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.kylin.engine.spark.SparkCubing.getKyroClasses;
+
+/**
+ */
+public class SparkCubingV3 extends AbstractApplication implements Serializable {
+
+    protected static final Logger logger = LoggerFactory.getLogger(SparkCubing.class);
+
+    public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable");
+    public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
+    public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube Segment Id").create("segmentId");
+    public static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true).withDescription("Configuration Path").create("confPath");
+    public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg().isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT);
+
+    private Options options;
+
+    public SparkCubingV3() {
+        options = new Options();
+        options.addOption(OPTION_INPUT_PATH);
+        options.addOption(OPTION_CUBE_NAME);
+        options.addOption(OPTION_SEGMENT_ID);
+        options.addOption(OPTION_CONF_PATH);
+        options.addOption(OPTION_OUTPUT_PATH);
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    private void setupClasspath(JavaSparkContext sc, String confPath) throws Exception {
+        ClassUtil.addClasspath(confPath);
+        final File[] files = new File(confPath).listFiles(new FileFilter() {
+            @Override
+            public boolean accept(File pathname) {
+                if (pathname.getAbsolutePath().endsWith(".xml")) {
+                    return true;
+                }
+                if (pathname.getAbsolutePath().endsWith(".properties")) {
+                    return true;
+                }
+                return false;
+            }
+        });
+        for (File file : files) {
+            sc.addFile(file.getAbsolutePath());
+        }
+    }
+
+
+    private static final void prepare() {
+        final File file = new File(SparkFiles.get("kylin.properties"));
+        final String confPath = file.getParentFile().getAbsolutePath();
+        System.out.println("conf directory:" + confPath);
+        System.setProperty(KylinConfig.KYLIN_CONF, confPath);
+        ClassUtil.addClasspath(confPath);
+    }
+
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        final String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+        final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+        final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+        final String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH);
+        final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+
+        SparkConf conf = new SparkConf().setAppName("Cubing Application");
+        //serialization conf
+        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+        conf.set("spark.kryo.registrationRequired", "true");
+        final Iterable<String> allClasses = Iterables.filter(Iterables.concat(Lists.newArrayList(conf.get("spark.kryo.classesToRegister", "").split(",")), getKyroClasses()), new Predicate<String>() {
+            @Override
+            public boolean apply(@Nullable String input) {
+                return input != null && input.trim().length() > 0;
+            }
+        });
+        conf.set("spark.kryo.classesToRegister", StringUtils.join(allClasses, ","));
+
+        JavaSparkContext sc = new JavaSparkContext(conf);
+        setupClasspath(sc, confPath);
+        HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
+
+        HiveContext sqlContext = new HiveContext(sc.sc());
+        final DataFrame intermediateTable = sqlContext.sql("select * from " + hiveTable);
+
+        System.setProperty(KylinConfig.KYLIN_CONF, confPath);
+        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+        final CubeDesc cubeDesc = cubeInstance.getDescriptor();
+        final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+        final CubeJoinedFlatTableEnrich intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
+
+        final Broadcast<CubeDesc> vCubeDesc = sc.broadcast(cubeDesc);
+        final Broadcast<CubeSegment> vCubeSegment = sc.broadcast(cubeSegment);
+        final Broadcast<BufferedMeasureCodec> vCodec = sc.broadcast(new BufferedMeasureCodec(cubeDesc.getMeasures()));
+        NDCuboidBuilder ndCuboidBuilder = new NDCuboidBuilder(vCubeSegment.getValue(), new RowKeyEncoderProvider(cubeSegment));
+
+        final Broadcast<NDCuboidBuilder> vNDCuboidBuilder = sc.broadcast(ndCuboidBuilder);
+        final Broadcast<CuboidScheduler> vCuboidScheduler = sc.broadcast(new CuboidScheduler(vCubeDesc.getValue()));
+
+        final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+        final Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
+        final int measureNum = cubeDesc.getMeasures().size();
+        final BaseCuboidBuilder baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, vCubeDesc.getValue(), vCubeSegment.getValue(), intermediateTableDesc,
+                AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid),
+                vCodec.getValue(), MeasureIngester.create(cubeDesc.getMeasures()), cubeSegment.buildDictionaryMap());
+
+        boolean[] needAggr = new boolean[cubeDesc.getMeasures().size()];
+        boolean allNormalMeasure = true;
+        for (int i = 0; i < cubeDesc.getMeasures().size(); i++) {
+            needAggr[i] = !cubeDesc.getMeasures().get(i).getFunction().getMeasureType().onlyAggrInBaseCuboid();
+            allNormalMeasure = allNormalMeasure && needAggr[i];
+        }
+        logger.info("All measure are normal (agg on all cuboids) ? : " + allNormalMeasure);
+
+        // encode with dimension encoding, transform to <byte[], Object[]> RDD
+        final JavaPairRDD<byte[], Object[]> encodedBaseRDD = intermediateTable.javaRDD().mapToPair(new PairFunction<Row, byte[], Object[]>() {
+            @Override
+            public Tuple2<byte[], Object[]> call(Row row) throws Exception {
+                String[] rowArray = rowToArray(row);
+                baseCuboidBuilder.resetAggrs();
+                byte[] rowKey = baseCuboidBuilder.buildKey(rowArray);
+                Object[] result = baseCuboidBuilder.buildValueObjects(rowArray);
+                return new Tuple2<>(rowKey, result);
+            }
+
+            private String[] rowToArray(Row row) {
+                String[] result = new String[row.size()];
+                for (int i = 0; i < row.size(); i++) {
+                    final Object o = row.get(i);
+                    if (o != null) {
+                        result[i] = o.toString();
+                    } else {
+                        result[i] = null;
+                    }
+                }
+                return result;
+            }
+
+        });
+
+
+        final CuboidReducerFunction2 reducerFunction2 = new CuboidReducerFunction2(measureNum, vCubeDesc.getValue(), vCodec.getValue());
+        CuboidReducerFunction2 baseCuboidReducerFunction = reducerFunction2;
+        if (allNormalMeasure == false) {
+            baseCuboidReducerFunction = new BaseCuboidReducerFunction2(measureNum, vCubeDesc.getValue(), vCodec.getValue(), needAggr);
+        }
+
+        // aggregate to calculate base cuboid
+        final JavaPairRDD<byte[], Object[]> baseCuboidRDD = encodedBaseRDD.reduceByKey(baseCuboidReducerFunction);
+        persistent(baseCuboidRDD, vCodec.getValue(), outputPath, 0, sc.hadoopConfiguration());
+
+        // aggregate to ND cuboids
+        final int totalLevels = cubeDesc.getBuildLevel();
+
+        JavaPairRDD<byte[], Object[]> parentRDD = baseCuboidRDD;
+        for (int level = 1; level <= totalLevels; level++) {
+            JavaPairRDD<byte[], Object[]> childRDD = parentRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<byte[], Object[]>, byte[], Object[]>() {
+
+                transient boolean initialized = false;
+
+                RowKeySplitter rowKeySplitter = new RowKeySplitter(vCubeSegment.getValue(), 65, 256);
+
+                @Override
+                public Iterable<Tuple2<byte[], Object[]>> call(Tuple2<byte[], Object[]> tuple2) throws Exception {
+                    if (initialized == false) {
+                        prepare();
+                        initialized = true;
+                    }
+
+                    List<Tuple2<byte[], Object[]>> tuples = Lists.newArrayList();
+                    byte[] key = tuple2._1();
+                    long cuboidId = rowKeySplitter.split(key);
+                    Cuboid parentCuboid = Cuboid.findById(vCubeDesc.getValue(), cuboidId);
+
+                    Collection<Long> myChildren = vCuboidScheduler.getValue().getSpanningCuboid(cuboidId);
+
+                    // if still empty or null
+                    if (myChildren == null || myChildren.size() == 0) {
+                        return tuples;
+                    }
+
+                    for (Long child : myChildren) {
+                        Cuboid childCuboid = Cuboid.findById(vCubeDesc.getValue(), child);
+                        Pair<Integer, ByteArray> result = vNDCuboidBuilder.getValue().buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers());
+
+                        byte[] newKey = new byte[result.getFirst()];
+                        System.arraycopy(result.getSecond().array(), 0, newKey, 0, result.getFirst());
+
+                        tuples.add(new Tuple2<>(newKey, tuple2._2()));
+                    }
+
+                    return tuples;
+                }
+            }).reduceByKey(reducerFunction2);
+
+            // persistent rdd to hdfs
+            persistent(childRDD, vCodec.getValue(), outputPath, level, sc.hadoopConfiguration());
+            parentRDD = childRDD;
+        }
+
+        logger.info("Finished on calculating all level cuboids.");
+
+    }
+
+    private void persistent(final JavaPairRDD<byte[], Object[]> rdd, final BufferedMeasureCodec codec, final String hdfsBaseLocation, int level, Configuration conf) {
+        final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
+        final JavaPairRDD<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> serializedRDD = rdd.mapToPair(new PairFunction<Tuple2<byte[], Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
+            @Override
+            public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> call(Tuple2<byte[], Object[]> tuple2) throws Exception {
+                ByteBuffer valueBuf = codec.encode(tuple2._2());
+                byte[] encodedBytes = new byte[valueBuf.position()];
+                System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position());
+                return new Tuple2<>(new org.apache.hadoop.io.Text(tuple2._1()), new org.apache.hadoop.io.Text(encodedBytes));
+            }
+        });
+        logger.debug("Persisting RDD for level " + level + " into " + cuboidOutputPath);
+        serializedRDD.saveAsNewAPIHadoopFile(cuboidOutputPath, org.apache.hadoop.io.Text.class, org.apache.hadoop.io.Text.class, SequenceFileOutputFormat.class, conf);
+        logger.debug("Done: persisting RDD for level " + level);
+    }
+
+    class CuboidReducerFunction2 implements Function2<Object[], Object[], Object[]> {
+        BufferedMeasureCodec codec;
+        CubeDesc cubeDesc;
+        int measureNum;
+        transient ThreadLocal<MeasureAggregators> current = new ThreadLocal<>();
+
+        CuboidReducerFunction2(int measureNum, CubeDesc cubeDesc, BufferedMeasureCodec codec) {
+            this.codec = codec;
+            this.cubeDesc = cubeDesc;
+            this.measureNum = measureNum;
+        }
+
+        @Override
+        public Object[] call(Object[] input1, Object[] input2) throws Exception {
+            if (current.get() == null) {
+                current.set(new MeasureAggregators(cubeDesc.getMeasures()));
+            }
+            Object[] result = new Object[measureNum];
+            current.get().reset();
+            current.get().aggregate(input1);
+            current.get().aggregate(input2);
+            current.get().collectStates(result);
+            return result;
+        }
+
+        private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+            in.defaultReadObject();
+            current = new ThreadLocal();
+        }
+    }
+
+    class BaseCuboidReducerFunction2 extends CuboidReducerFunction2 {
+        boolean[] needAggr;
+
+        BaseCuboidReducerFunction2(int measureNum, CubeDesc cubeDesc, BufferedMeasureCodec codec, boolean[] needAggr) {
+            super(measureNum, cubeDesc, codec);
+            this.needAggr = needAggr;
+        }
+
+        @Override
+        public Object[] call(Object[] input1, Object[] input2) throws Exception {
+            if (current.get() == null) {
+                current.set(new MeasureAggregators(cubeDesc.getMeasures()));
+            }
+            current.get().reset();
+            Object[] result = new Object[measureNum];
+            current.get().aggregate(input1, needAggr);
+            current.get().aggregate(input2, needAggr);
+            current.get().collectStates(result);
+            return result;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 7c88372..644f73f 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -22,6 +22,7 @@ import java.util.Map;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.CliCommandExecutor;
 import org.apache.kylin.common.util.Logger;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
@@ -38,11 +39,16 @@ public class SparkExecutable extends AbstractExecutable {
     private static final org.slf4j.Logger logger = LoggerFactory.getLogger(SparkExecutable.class);
 
     private static final String CLASS_NAME = "className";
+    private static final String JARS = "jars";
 
     public void setClassName(String className) {
         this.setParam(CLASS_NAME, className);
     }
 
+    public void setJars(String jars) {
+        this.setParam(JARS, jars);
+    }
+
     private String formatArgs() {
         StringBuilder stringBuilder = new StringBuilder();
         for (Map.Entry<String, String> entry : getParams().entrySet()) {
@@ -50,6 +56,9 @@ public class SparkExecutable extends AbstractExecutable {
             tmp.append("-").append(entry.getKey()).append(" ").append(entry.getValue()).append(" ");
             if (entry.getKey().equals(CLASS_NAME)) {
                 stringBuilder.insert(0, tmp);
+            } else if (entry.getKey().equals(JARS)) {
+                // JARS is for spark-submit, not for app
+                continue;
             } else {
                 stringBuilder.append(tmp);
             }
@@ -65,12 +74,22 @@ public class SparkExecutable extends AbstractExecutable {
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
         final KylinConfig config = context.getConfig();
         Preconditions.checkNotNull(config.getSparkHome());
-        Preconditions.checkNotNull(config.getSparkMaster());
+        Preconditions.checkNotNull(config.getKylinJobJarPath());
+        String sparkConf = config.getSparkConfFile();
+        String jars = this.getParam(JARS);
+
+        String jobJar = config.getKylinJobJarPath();
+
+        if (StringUtils.isEmpty(jars)) {
+            jars = jobJar;
+        }
+
         try {
-            String cmd = String.format("%s/bin/spark-submit --class \"org.apache.kylin.common.util.SparkEntry\" --master %s %s %s", config.getSparkHome(), config.getSparkMaster(), config.getKylinSparkJobJarPath(), formatArgs());
+            String cmd = String.format("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class \"org.apache.kylin.common.util.SparkEntry\" --properties-file %s --jars %s %s %s", config.getSparkHadoopConfDir(), config.getSparkHome(), sparkConf, jars, jobJar, formatArgs());
             logger.info("cmd:" + cmd);
             final StringBuilder output = new StringBuilder();
-            config.getCliCommandExecutor().execute(cmd, new Logger() {
+            CliCommandExecutor exec = new CliCommandExecutor();
+            exec.execute(cmd, new Logger() {
                 @Override
                 public void log(String message) {
                     output.append(message);
@@ -84,4 +103,5 @@ public class SparkExecutable extends AbstractExecutable {
             return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/examples/test_case_data/sandbox/kylin-spark-conf.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin-spark-conf.properties b/examples/test_case_data/sandbox/kylin-spark-conf.properties
new file mode 100644
index 0000000..ca65994
--- /dev/null
+++ b/examples/test_case_data/sandbox/kylin-spark-conf.properties
@@ -0,0 +1,27 @@
+spark.yarn.submit.file.replication=1
+spark.yarn.executor.memoryOverhead=200
+spark.yarn.driver.memoryOverhead=384
+#spark.master=local[4]
+spark.master=yarn
+spark.submit.deployMode=cluster
+spark.eventLog.enabled=true
+spark.yarn.scheduler.heartbeat.interval-ms=5000
+spark.yarn.preserve.staging.files=true
+spark.yarn.queue=default
+spark.yarn.containerLauncherMaxThreads=25
+spark.yarn.max.executor.failures=3
+spark.eventLog.dir=hdfs\:///spark-history
+spark.history.kerberos.enabled=true
+spark.history.provider=org.apache.spark.deploy.history.FsHistoryProvider
+spark.history.ui.port=18080
+spark.history.fs.logDirectory=hdfs\:///spark-history
+spark.executor.memory=1G
+spark.storage.memoryFraction=0.3
+spark.executor.cores=1
+spark.executor.instances=1
+spark.history.kerberos.keytab=none
+spark.history.kerberos.principal=none
+spark.yarn.jar=hdfs://sandbox.hortonworks.com:8020/apps/spark/spark-assembly-1.6.3-hadoop2.6.0.jar
+spark.driver.extraJavaOptions=-Dhdp.version=current
+spark.yarn.am.extraJavaOptions=-Dhdp.version=current
+spark.executor.extraJavaOptions=-Dhdp.version=current

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index 20bc427..db8eb7a 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -63,7 +63,7 @@ kylin.job.retry=0
 # you will have to specify kylin.job.remote-cli-hostname, kylin.job.remote-cli-username and kylin.job.remote-cli-password
 # It should not be set to "true" unless you're NOT running Kylin.sh on a hadoop client machine
 # (Thus kylin instance has to ssh to another real hadoop client machine to execute hbase,hive,hadoop commands)
-kylin.job.use-remote-cli=false
+kylin.job.use-remote-cli=true
 
 # Only necessary when kylin.job.use-remote-cli=true
 kylin.job.remote-cli-hostname=sandbox
@@ -154,3 +154,12 @@ kylin.server.query-metrics-percentiles-intervals=60, 360, 3600
 
 # Env DEV|QA|PROD
 kylin.env=DEV
+kylin.source.hive.keep-flat-table=true
+
+### Spark ###
+#kylin.engine.spark.spark-home=/usr/hdp/2.2.4.2-2/spark
+#kylin.engine.spark.env.hadoop-conf-dir=/etc/hadoop/conf
+kylin.engine.spark.env.hadoop-conf-dir=/Users/shishaofeng/workspace/kylin-15/examples/test_case_data/sandbox
+kylin.engine.spark.spark-home=/Users/shishaofeng/spark-1.6.3-bin-hadoop2.6
+kylin.engine.spark.properties-file=/Users/shishaofeng/workspace/kylin-15/examples/test_case_data/sandbox/kylin-spark-conf.properties
+kylin.engine.spark.conf.jars=
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 51479c8..8c2434d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,7 +84,7 @@
         <commons-math3.version>3.6.1</commons-math3.version>
 
         <!-- Spark -->
-        <spark.version>1.3.0</spark.version>
+        <spark.version>1.6.3</spark.version>
 
         <!-- Utility -->
         <log4j.version>1.2.17</log4j.version>

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 6e3668b..4b8aa62 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -327,10 +327,10 @@ public class CubeController extends BasicController {
                 throw new InternalErrorException("Cannot find cube '" + cubeName + "'");
             }
 
-            if (cube.getSegments() != null && cube.getBuildingSegments().size() > 0) {
-                int num = cube.getBuildingSegments().size();
-                throw new InternalErrorException("Cannot purge cube '" + cubeName + "' as there is " + num + " building " + (num > 1 ? "segment(s)." : "segment. Discard the related job first."));
-            }
+//            if (cube.getSegments() != null && cube.getBuildingSegments().size() > 0) {
+//                int num = cube.getBuildingSegments().size();
+//                throw new InternalErrorException("Cannot purge cube '" + cubeName + "' as there is " + num + " building " + (num > 1 ? "segment(s)." : "segment. Discard the related job first."));
+//            }
 
             return cubeService.purgeCube(cube);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e0665c9/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index cf92fb1..f3b61ef 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -249,6 +249,13 @@
             <scope>provided</scope>
         </dependency>
 
+        <!-- Spark dependency -->
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.10</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-testing-util</artifactId>