You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/22 15:18:49 UTC

incubator-kylin git commit: KYLIN-875 Refactor core-metadata, drop hadoop/hbase dependency

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8 d2456215c -> ecd78c5dd


KYLIN-875 Refactor core-metadata, drop hadoop/hbase dependency


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/ecd78c5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/ecd78c5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/ecd78c5d

Branch: refs/heads/0.8
Commit: ecd78c5ddb71a6c4691cb2cc0f6dad8f658f84a9
Parents: d245621
Author: Yang Li <li...@apache.org>
Authored: Wed Jul 22 21:17:54 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Wed Jul 22 21:17:54 2015 +0800

----------------------------------------------------------------------
 .../kylin/common/persistence/ResourceStore.java |   6 +-
 core-metadata/pom.xml                           |  51 ---------
 .../metadata/measure/DoubleMaxAggregator.java   |  12 +--
 .../metadata/measure/DoubleMinAggregator.java   |  14 +--
 .../kylin/metadata/measure/DoubleMutable.java   |  40 +++++++
 .../metadata/measure/DoubleSumAggregator.java   |  12 +--
 .../kylin/metadata/measure/LDCAggregator.java   |  15 +--
 .../metadata/measure/LongMaxAggregator.java     |  14 +--
 .../metadata/measure/LongMinAggregator.java     |  14 +--
 .../kylin/metadata/measure/LongMutable.java     |  40 +++++++
 .../metadata/measure/LongSumAggregator.java     |  12 +--
 .../kylin/metadata/measure/MeasureCodec.java    |   2 +-
 .../measure/fixedlen/FixedPointLongCodec.java   |  12 +--
 .../serializer/BigDecimalSerializer.java        | 106 +++++++++++++++++++
 .../measure/serializer/DataTypeSerializer.java  | 101 ++++++++++++++++++
 .../measure/serializer/DateTimeSerializer.java  |  59 +++++++++++
 .../measure/serializer/DoubleSerializer.java    |  78 ++++++++++++++
 .../measure/serializer/HLLCSerializer.java      |  92 ++++++++++++++++
 .../measure/serializer/LongSerializer.java      |  85 +++++++++++++++
 .../measure/serializer/StringSerializer.java    |  50 +++++++++
 .../kylin/metadata/model/ParameterDesc.java     |   5 +-
 .../serializer/BigDecimalSerializer.java        | 106 -------------------
 .../metadata/serializer/DataTypeSerializer.java | 101 ------------------
 .../metadata/serializer/DateTimeSerializer.java |  59 -----------
 .../metadata/serializer/DoubleSerializer.java   |  80 --------------
 .../metadata/serializer/HLLCSerializer.java     |  92 ----------------
 .../metadata/serializer/LongSerializer.java     |  87 ---------------
 .../metadata/serializer/StringSerializer.java   |  50 ---------
 .../serializer/BigDecimalSerializerTest.java    |  53 ++++++++++
 .../serializer/BigDecimalSerializerTest.java    |  52 ---------
 .../kylin/storage/cube/CubeCodeSystem.java      |   2 +-
 .../storage/gridtable/GTSampleCodeSystem.java   |   2 +-
 .../gridtable/AggregationCacheMemSizeTest.java  |   8 +-
 .../gridtable/SimpleInvertedIndexTest.java      |   2 +-
 34 files changed, 757 insertions(+), 757 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index ed3a6a5..a89d30f 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -63,14 +63,14 @@ abstract public class ResourceStore {
     public static final ArrayList<Class<? extends ResourceStore>> knownImpl = new ArrayList<Class<? extends ResourceStore>>();
 
     static {
+        knownImpl.add(FileResourceStore.class);
         try {
             knownImpl.add(ClassUtil.forName("org.apache.kylin.storage.hbase.HBaseResourceStore", ResourceStore.class));
-            knownImpl.add(FileResourceStore.class);
         } catch (ClassNotFoundException e) {
-            throw new RuntimeException(e);
+            logger.warn(e.toString());
         }
     }
-
+    
     public static ResourceStore getStore(KylinConfig kylinConfig) {
         ResourceStore r = CACHE.get(kylinConfig);
         List<Throwable> es = new ArrayList<Throwable>();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/pom.xml
----------------------------------------------------------------------
diff --git a/core-metadata/pom.xml b/core-metadata/pom.xml
index 71457bc..2a26846 100644
--- a/core-metadata/pom.xml
+++ b/core-metadata/pom.xml
@@ -49,57 +49,6 @@
             <scope>test</scope>
             <version>${project.parent.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-hdfs</artifactId>
-            <scope>provided</scope>
-            <!-- protobuf version conflict with hbase-->
-            <exclusions>
-                <exclusion>
-                    <groupId>com.google.protobuf</groupId>
-                    <artifactId>protobuf-java</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase-common</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase-client</artifactId>
-            <scope>provided</scope>
-        </dependency>
-         <dependency>
-            <groupId>org.apache.hive.hcatalog</groupId>
-            <artifactId>hive-hcatalog-core</artifactId>
-            <version>${hive-hcatalog.version}</version>
-            <scope>provided</scope>
-          </dependency>
-
-        <dependency>
-            <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase-testing-util</artifactId>
-            <version>${hbase-hadoop2.version}</version>
-            <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>javax.servlet</groupId>
-                    <artifactId>servlet-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>javax.servlet.jsp</groupId>
-                    <artifactId>jsp-api</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
 
         <dependency>
             <groupId>junit</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMaxAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMaxAggregator.java
index 1e5bf82..2b4e9e4 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMaxAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMaxAggregator.java
@@ -18,15 +18,13 @@
 
 package org.apache.kylin.metadata.measure;
 
-import org.apache.hadoop.io.DoubleWritable;
-
 /**
  * @author yangli9
  * 
  */
-public class DoubleMaxAggregator extends MeasureAggregator<DoubleWritable> {
+public class DoubleMaxAggregator extends MeasureAggregator<DoubleMutable> {
 
-    DoubleWritable max = null;
+    DoubleMutable max = null;
 
     @Override
     public void reset() {
@@ -34,15 +32,15 @@ public class DoubleMaxAggregator extends MeasureAggregator<DoubleWritable> {
     }
 
     @Override
-    public void aggregate(DoubleWritable value) {
+    public void aggregate(DoubleMutable value) {
         if (max == null)
-            max = new DoubleWritable(value.get());
+            max = new DoubleMutable(value.get());
         else if (max.get() < value.get())
             max.set(value.get());
     }
 
     @Override
-    public DoubleWritable getState() {
+    public DoubleMutable getState() {
         return max;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMinAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMinAggregator.java
index d9112f7..8811ce3 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMinAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMinAggregator.java
@@ -18,15 +18,11 @@
 
 package org.apache.kylin.metadata.measure;
 
-import org.apache.hadoop.io.DoubleWritable;
-
 /**
- * @author yangli9
- * 
  */
-public class DoubleMinAggregator extends MeasureAggregator<DoubleWritable> {
+public class DoubleMinAggregator extends MeasureAggregator<DoubleMutable> {
 
-    DoubleWritable min = null;
+    DoubleMutable min = null;
 
     @Override
     public void reset() {
@@ -34,15 +30,15 @@ public class DoubleMinAggregator extends MeasureAggregator<DoubleWritable> {
     }
 
     @Override
-    public void aggregate(DoubleWritable value) {
+    public void aggregate(DoubleMutable value) {
         if (min == null)
-            min = new DoubleWritable(value.get());
+            min = new DoubleMutable(value.get());
         else if (min.get() > value.get())
             min.set(value.get());
     }
 
     @Override
-    public DoubleWritable getState() {
+    public DoubleMutable getState() {
         return min;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMutable.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMutable.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMutable.java
new file mode 100644
index 0000000..efcf822
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMutable.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+public class DoubleMutable {
+
+    private double v;
+    
+    public DoubleMutable() {
+        this(0);
+    }
+    
+    public DoubleMutable(double v) {
+        set(v);
+    }
+    
+    public double get() {
+        return v;
+    }
+    
+    public void set(double v) {
+        this.v = v;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSumAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSumAggregator.java
index 923bd0b..a931b4a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSumAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSumAggregator.java
@@ -18,15 +18,11 @@
 
 package org.apache.kylin.metadata.measure;
 
-import org.apache.hadoop.io.DoubleWritable;
-
 /**
- * @author yangli9
- * 
  */
-public class DoubleSumAggregator extends MeasureAggregator<DoubleWritable> {
+public class DoubleSumAggregator extends MeasureAggregator<DoubleMutable> {
 
-    DoubleWritable sum = new DoubleWritable();
+    DoubleMutable sum = new DoubleMutable();
 
     @Override
     public void reset() {
@@ -34,12 +30,12 @@ public class DoubleSumAggregator extends MeasureAggregator<DoubleWritable> {
     }
 
     @Override
-    public void aggregate(DoubleWritable value) {
+    public void aggregate(DoubleMutable value) {
         sum.set(sum.get() + value.get());
     }
 
     @Override
-    public DoubleWritable getState() {
+    public DoubleMutable getState() {
         return sum;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LDCAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LDCAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LDCAggregator.java
index 767a6d8..c751b4d 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LDCAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LDCAggregator.java
@@ -18,20 +18,15 @@
 
 package org.apache.kylin.metadata.measure;
 
-import org.apache.hadoop.io.LongWritable;
-
 /**
  * Long Distinct Count
- * 
- * @author xjiang
- * 
  */
-public class LDCAggregator extends MeasureAggregator<LongWritable> {
+public class LDCAggregator extends MeasureAggregator<LongMutable> {
 
-    private static LongWritable ZERO = new LongWritable(0);
+    private static LongMutable ZERO = new LongMutable(0);
 
     private HLLCAggregator hllAgg = null;
-    private LongWritable state = new LongWritable(0);
+    private LongMutable state = new LongMutable(0);
 
     @SuppressWarnings("rawtypes")
     public void setDependentAggregator(MeasureAggregator agg) {
@@ -43,11 +38,11 @@ public class LDCAggregator extends MeasureAggregator<LongWritable> {
     }
 
     @Override
-    public void aggregate(LongWritable value) {
+    public void aggregate(LongMutable value) {
     }
 
     @Override
-    public LongWritable getState() {
+    public LongMutable getState() {
         if (hllAgg == null) {
             return ZERO;
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMaxAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMaxAggregator.java
index 0fac3c7..74916d6 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMaxAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMaxAggregator.java
@@ -18,15 +18,11 @@
 
 package org.apache.kylin.metadata.measure;
 
-import org.apache.hadoop.io.LongWritable;
-
 /**
- * @author yangli9
- * 
  */
-public class LongMaxAggregator extends MeasureAggregator<LongWritable> {
+public class LongMaxAggregator extends MeasureAggregator<LongMutable> {
 
-    LongWritable max = null;
+    LongMutable max = null;
 
     @Override
     public void reset() {
@@ -34,15 +30,15 @@ public class LongMaxAggregator extends MeasureAggregator<LongWritable> {
     }
 
     @Override
-    public void aggregate(LongWritable value) {
+    public void aggregate(LongMutable value) {
         if (max == null)
-            max = new LongWritable(value.get());
+            max = new LongMutable(value.get());
         else if (max.get() < value.get())
             max.set(value.get());
     }
 
     @Override
-    public LongWritable getState() {
+    public LongMutable getState() {
         return max;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMinAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMinAggregator.java
index 4c058d8..ec98d48 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMinAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMinAggregator.java
@@ -18,15 +18,11 @@
 
 package org.apache.kylin.metadata.measure;
 
-import org.apache.hadoop.io.LongWritable;
-
 /**
- * @author yangli9
- * 
  */
-public class LongMinAggregator extends MeasureAggregator<LongWritable> {
+public class LongMinAggregator extends MeasureAggregator<LongMutable> {
 
-    LongWritable min = null;
+    LongMutable min = null;
 
     @Override
     public void reset() {
@@ -34,15 +30,15 @@ public class LongMinAggregator extends MeasureAggregator<LongWritable> {
     }
 
     @Override
-    public void aggregate(LongWritable value) {
+    public void aggregate(LongMutable value) {
         if (min == null)
-            min = new LongWritable(value.get());
+            min = new LongMutable(value.get());
         else if (min.get() > value.get())
             min.set(value.get());
     }
 
     @Override
-    public LongWritable getState() {
+    public LongMutable getState() {
         return min;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMutable.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMutable.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMutable.java
new file mode 100644
index 0000000..ab258f9
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMutable.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+public class LongMutable {
+
+    private long v;
+    
+    public LongMutable() {
+        this(0);
+    }
+    
+    public LongMutable(long v) {
+        set(v);
+    }
+    
+    public long get() {
+        return v;
+    }
+    
+    public void set(long v) {
+        this.v = v;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongSumAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongSumAggregator.java
index e40870d..63ea004 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongSumAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongSumAggregator.java
@@ -18,15 +18,11 @@
 
 package org.apache.kylin.metadata.measure;
 
-import org.apache.hadoop.io.LongWritable;
-
 /**
- * @author yangli9
- * 
  */
-public class LongSumAggregator extends MeasureAggregator<LongWritable> {
+public class LongSumAggregator extends MeasureAggregator<LongMutable> {
 
-    LongWritable sum = new LongWritable();
+    LongMutable sum = new LongMutable();
 
     @Override
     public void reset() {
@@ -34,12 +30,12 @@ public class LongSumAggregator extends MeasureAggregator<LongWritable> {
     }
 
     @Override
-    public void aggregate(LongWritable value) {
+    public void aggregate(LongMutable value) {
         sum.set(sum.get() + value.get());
     }
 
     @Override
-    public LongWritable getState() {
+    public LongMutable getState() {
         return sum;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureCodec.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureCodec.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureCodec.java
index f620ace..296290a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureCodec.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureCodec.java
@@ -21,8 +21,8 @@ package org.apache.kylin.metadata.measure;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 
+import org.apache.kylin.metadata.measure.serializer.DataTypeSerializer;
 import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.serializer.DataTypeSerializer;
 
 /**
  * @author yangli9

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
index 5218101..4014c21 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
@@ -18,18 +18,18 @@
 
 package org.apache.kylin.metadata.measure.fixedlen;
 
-import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.metadata.measure.LongMutable;
 import org.apache.kylin.metadata.model.DataType;
 
-public class FixedPointLongCodec extends FixedLenMeasureCodec<LongWritable> {
+public class FixedPointLongCodec extends FixedLenMeasureCodec<LongMutable> {
 
     private static final int SIZE = 8;
     // number of digits after decimal point
     int scale;
     DataType type;
     // avoid massive object creation
-    LongWritable current = new LongWritable();
+    LongMutable current = new LongMutable();
 
     public FixedPointLongCodec(DataType type) {
         this.type = type;
@@ -80,7 +80,7 @@ public class FixedPointLongCodec extends FixedLenMeasureCodec<LongWritable> {
     }
 
     @Override
-    public LongWritable valueOf(String value) {
+    public LongMutable valueOf(String value) {
         if (value == null)
             current.set(0L);
         else
@@ -97,13 +97,13 @@ public class FixedPointLongCodec extends FixedLenMeasureCodec<LongWritable> {
     }
 
     @Override
-    public LongWritable read(byte[] buf, int offset) {
+    public LongMutable read(byte[] buf, int offset) {
         current.set(BytesUtil.readLong(buf, offset, SIZE));
         return current;
     }
 
     @Override
-    public void write(LongWritable v, byte[] buf, int offset) {
+    public void write(LongMutable v, byte[] buf, int offset) {
         BytesUtil.writeLong(v == null ? 0 : v.get(), buf, offset, SIZE);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/BigDecimalSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/BigDecimalSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/BigDecimalSerializer.java
new file mode 100644
index 0000000..a4945cb
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/BigDecimalSerializer.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure.serializer;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.metadata.model.DataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> {
+
+    private static final Logger logger = LoggerFactory.getLogger(BigDecimalSerializer.class);
+    
+    final DataType type;
+    final int maxLength;
+    
+    int avoidVerbose = 0;
+    
+    public BigDecimalSerializer(DataType type) {
+        this.type = type;
+        // see serialize(): 1 byte scale, 1 byte length, assume every 2 digits takes 1 byte
+        this.maxLength = 1 + 1 + (type.getPrecision() + 1) / 2;
+    }
+
+    @Override
+    public void serialize(BigDecimal value, ByteBuffer out) {
+        if (value.scale() > type.getScale()) {
+            if (avoidVerbose % 10000 == 0) {
+                logger.warn("value's scale has exceeded the " + type.getScale() + ", cut it off, to ensure encoded value do not exceed maxLength " + maxLength + " times:" + (avoidVerbose++));
+            }
+            value = value.setScale(type.getScale(), BigDecimal.ROUND_HALF_EVEN);
+        }
+        byte[] bytes = value.unscaledValue().toByteArray();
+        if (bytes.length + 2 > maxLength) {
+            throw new IllegalArgumentException("'" + value + "' exceeds the expected length for type " + type);
+        }
+
+        BytesUtil.writeVInt(value.scale(), out);
+        BytesUtil.writeVInt(bytes.length, out);
+        out.put(bytes);
+    }
+
+    @Override
+    public BigDecimal deserialize(ByteBuffer in) {
+        int scale = BytesUtil.readVInt(in);
+        int n = BytesUtil.readVInt(in);
+
+        byte[] bytes = new byte[n];
+        in.get(bytes);
+
+        return new BigDecimal(new BigInteger(bytes), scale);
+    }
+
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        int mark = in.position();
+        
+        @SuppressWarnings("unused")
+        int scale = BytesUtil.readVInt(in);
+        int n = BytesUtil.readVInt(in);
+        int len = in.position() - mark + n;
+        
+        in.position(mark);
+        return len;
+    }
+    
+    @Override
+    public int maxLength() {
+        return maxLength;
+    }
+
+    @Override
+    public BigDecimal valueOf(byte[] value) {
+        if (value == null)
+            return new BigDecimal(0);
+        else
+            return new BigDecimal(Bytes.toString(value));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DataTypeSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DataTypeSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DataTypeSerializer.java
new file mode 100644
index 0000000..02acdbb
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DataTypeSerializer.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure.serializer;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.util.BytesSerializer;
+import org.apache.kylin.metadata.model.DataType;
+
+/**
+ * @author yangli9
+ * 
+ */
+abstract public class DataTypeSerializer<T> implements BytesSerializer<T> {
+
+    final static Map<String, Class<?>> implementations;
+    static {
+        HashMap<String, Class<?>> impl = Maps.newHashMap();
+        impl.put("varchar", StringSerializer.class);
+        impl.put("decimal", BigDecimalSerializer.class);
+        impl.put("double", DoubleSerializer.class);
+        impl.put("float", DoubleSerializer.class);
+        impl.put("bigint", LongSerializer.class);
+        impl.put("long", LongSerializer.class);
+        impl.put("integer", LongSerializer.class);
+        impl.put("int", LongSerializer.class);
+        impl.put("smallint", LongSerializer.class);
+        impl.put("date", DateTimeSerializer.class);
+        impl.put("datetime", DateTimeSerializer.class);
+        impl.put("timestamp", DateTimeSerializer.class);
+        implementations = Collections.unmodifiableMap(impl);
+
+    }
+
+    public static DataTypeSerializer<?> create(String dataType) {
+        return create(DataType.getInstance(dataType));
+    }
+    
+    public static DataTypeSerializer<?> create(DataType type) {
+        if (type.isHLLC()) {
+            return new HLLCSerializer(type);
+        }
+
+        Class<?> clz = implementations.get(type.getName());
+        if (clz == null)
+            throw new RuntimeException("No MeasureSerializer for type " + type);
+
+        try {
+            return (DataTypeSerializer<?>) clz.getConstructor(DataType.class).newInstance(type);
+        } catch (Exception e) {
+            throw new RuntimeException(e); // never happen
+        }
+    }
+    
+    /** peek into buffer and return the length of serialization */
+    abstract public int peekLength(ByteBuffer in);
+    
+    /** return the max number of bytes to the longest serialization */
+    abstract public int maxLength();
+    
+    /** convert from String to obj (string often come as byte[] in mapred) */
+    abstract public T valueOf(byte[] value);
+    
+    /** convert from String to obj */
+    public T valueOf(String value) {
+        try {
+            return valueOf(value.getBytes("UTF-8"));
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e); // never happen
+        }
+    }
+
+    /** convert from obj to string */
+    public String toString(T value) {
+        if (value == null)
+            return "NULL";
+        else
+            return value.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DateTimeSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DateTimeSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DateTimeSerializer.java
new file mode 100644
index 0000000..16d83cd
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DateTimeSerializer.java
@@ -0,0 +1,59 @@
+package org.apache.kylin.metadata.measure.serializer;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.common.util.DateFormat;
+
+public class DateTimeSerializer extends DataTypeSerializer<LongMutable> {
+    
+    // be thread-safe and avoid repeated obj creation
+    private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>();
+
+    public DateTimeSerializer(DataType type) {
+    }
+
+    @Override
+    public void serialize(LongMutable value, ByteBuffer out) {
+        out.putLong(value.get());
+    }
+
+    private LongMutable current() {
+        LongMutable l = current.get();
+        if (l == null) {
+            l = new LongMutable();
+            current.set(l);
+        }
+        return l;
+    }
+    
+    @Override
+    public LongMutable deserialize(ByteBuffer in) {
+        LongMutable l = current();
+        l.set(in.getLong());
+        return l;
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        return 8;
+    }
+    
+    @Override
+    public int maxLength() {
+        return 8;
+    }
+
+    @Override
+    public LongMutable valueOf(byte[] value) {
+        LongMutable l = current();
+        if (value == null)
+            l.set(0L);
+        else
+            l.set(DateFormat.stringToMillis(Bytes.toString(value)));
+        return l;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DoubleSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DoubleSerializer.java
new file mode 100644
index 0000000..09a6776
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/DoubleSerializer.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure.serializer;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.metadata.measure.DoubleMutable;
+import org.apache.kylin.metadata.model.DataType;
+
+/**
+ */
+public class DoubleSerializer extends DataTypeSerializer<DoubleMutable> {
+
+    // be thread-safe and avoid repeated obj creation
+    private ThreadLocal<DoubleMutable> current = new ThreadLocal<DoubleMutable>();
+
+    public DoubleSerializer(DataType type) {
+    }
+
+    @Override
+    public void serialize(DoubleMutable value, ByteBuffer out) {
+        out.putDouble(value.get());
+    }
+
+    private DoubleMutable current() {
+        DoubleMutable d = current.get();
+        if (d == null) {
+            d = new DoubleMutable();
+            current.set(d);
+        }
+        return d;
+    }
+    
+    @Override
+    public DoubleMutable deserialize(ByteBuffer in) {
+        DoubleMutable d = current();
+        d.set(in.getDouble());
+        return d;
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        return 8;
+    }
+
+    @Override
+    public int maxLength() {
+        return 8;
+    }
+    
+    @Override
+    public DoubleMutable valueOf(byte[] value) {
+        DoubleMutable d = current();
+        if (value == null)
+            d.set(0d);
+        else
+            d.set(Double.parseDouble(Bytes.toString(value)));
+        return d;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/HLLCSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/HLLCSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/HLLCSerializer.java
new file mode 100644
index 0000000..9bbe956
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/HLLCSerializer.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure.serializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.metadata.model.DataType;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounter> {
+
+    // be thread-safe and avoid repeated obj creation
+    private ThreadLocal<HyperLogLogPlusCounter> current = new ThreadLocal<HyperLogLogPlusCounter>();
+    
+    private int precision;
+
+    public HLLCSerializer(DataType type) {
+        this.precision = type.getPrecision();
+    }
+
+    @Override
+    public void serialize(HyperLogLogPlusCounter value, ByteBuffer out) {
+        try {
+            value.writeRegisters(out);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private HyperLogLogPlusCounter current() {
+        HyperLogLogPlusCounter hllc = current.get();
+        if (hllc == null) {
+            hllc = new HyperLogLogPlusCounter(precision);
+            current.set(hllc);
+        }
+        return hllc;
+    }
+    
+    @Override
+    public HyperLogLogPlusCounter deserialize(ByteBuffer in) {
+        HyperLogLogPlusCounter hllc = current();
+        try {
+            hllc.readRegisters(in);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return hllc;
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        return current().peekLength(in);
+    }
+    
+    @Override
+    public int maxLength() {
+        return current().maxLength();
+    }
+
+    @Override
+    public HyperLogLogPlusCounter valueOf(byte[] value) {
+        HyperLogLogPlusCounter hllc = current();
+        hllc.clear();
+        if (value == null)
+            hllc.add("__nUlL__");
+        else
+            hllc.add(value);
+        return hllc;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/LongSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/LongSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/LongSerializer.java
new file mode 100644
index 0000000..9b4a99a
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/LongSerializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure.serializer;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.model.DataType;
+
+/**
+ */
+public class LongSerializer extends DataTypeSerializer<LongMutable> {
+
+    // be thread-safe and avoid repeated obj creation
+    private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>();
+
+    public LongSerializer(DataType type) {
+    }
+
+    @Override
+    public void serialize(LongMutable value, ByteBuffer out) {
+        BytesUtil.writeVLong(value.get(), out);
+    }
+
+    private LongMutable current() {
+        LongMutable l = current.get();
+        if (l == null) {
+            l = new LongMutable();
+            current.set(l);
+        }
+        return l;
+    }
+    
+    @Override
+    public LongMutable deserialize(ByteBuffer in) {
+        LongMutable l = current();
+        l.set(BytesUtil.readVLong(in));
+        return l;
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        int mark = in.position();
+        
+        BytesUtil.readVLong(in);
+        int len = in.position() - mark;
+        
+        in.position(mark);
+        return len;
+    }
+    
+    @Override
+    public int maxLength() {
+        return 9; // vlong: 1 + 8
+    }
+
+    @Override
+    public LongMutable valueOf(byte[] value) {
+        LongMutable l = current();
+        if (value == null)
+            l.set(0L);
+        else
+            l.set(Long.parseLong(Bytes.toString(value)));
+        return l;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/StringSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/StringSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/StringSerializer.java
new file mode 100644
index 0000000..586bfd9
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/StringSerializer.java
@@ -0,0 +1,50 @@
+package org.apache.kylin.metadata.measure.serializer;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.metadata.model.DataType;
+
+public class StringSerializer extends DataTypeSerializer<String> {
+
+    final DataType type;
+    final int maxLength;
+
+    public StringSerializer(DataType type) {
+        this.type = type;
+        // see serialize(): 2 byte length, rest is String.toBytes()
+        this.maxLength = 2 + type.getPrecision();
+    }
+
+    @Override
+    public void serialize(String value, ByteBuffer out) {
+        int start = out.position();
+
+        BytesUtil.writeUTFString(value, out);
+
+        if (out.position() - start > maxLength)
+            throw new IllegalArgumentException("'" + value + "' exceeds the expected length for type " + type);
+    }
+
+    @Override
+    public String deserialize(ByteBuffer in) {
+        return BytesUtil.readUTFString(in);
+    }
+    
+    @Override
+    public int peekLength(ByteBuffer in) {
+        return BytesUtil.peekByteArrayLength(in);
+    }
+
+    @Override
+    public int maxLength() {
+        return maxLength;
+    }
+
+    @Override
+    public String valueOf(byte[] value) {
+        return Bytes.toString(value);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
index 14a073f..ca26e44 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
@@ -21,12 +21,13 @@ package org.apache.kylin.metadata.model;
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.hadoop.util.StringUtils;
 
 import java.io.UnsupportedEncodingException;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.commons.lang.StringUtils;
+
 /**
  */
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
@@ -79,7 +80,7 @@ public class ParameterDesc {
             for (int i = 0; i < values.length; i++)
                 values[i] = values[i].toUpperCase();
             Arrays.sort(values);
-            value = StringUtils.join(",", values);
+            value = StringUtils.join(values, ",");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/src/main/java/org/apache/kylin/metadata/serializer/BigDecimalSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/serializer/BigDecimalSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/serializer/BigDecimalSerializer.java
deleted file mode 100644
index 72e3696..0000000
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/serializer/BigDecimalSerializer.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.serializer;
-
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.metadata.model.DataType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-
-/**
- * @author yangli9
- * 
- */
-public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> {
-
-    private static final Logger logger = LoggerFactory.getLogger(BigDecimalSerializer.class);
-    
-    final DataType type;
-    final int maxLength;
-    
-    int avoidVerbose = 0;
-    
-    public BigDecimalSerializer(DataType type) {
-        this.type = type;
-        // see serialize(): 1 byte scale, 1 byte length, assume every 2 digits takes 1 byte
-        this.maxLength = 1 + 1 + (type.getPrecision() + 1) / 2;
-    }
-
-    @Override
-    public void serialize(BigDecimal value, ByteBuffer out) {
-        if (value.scale() > type.getScale()) {
-            if (avoidVerbose % 10000 == 0) {
-                logger.warn("value's scale has exceeded the " + type.getScale() + ", cut it off, to ensure encoded value do not exceed maxLength " + maxLength + " times:" + (avoidVerbose++));
-            }
-            value = value.setScale(type.getScale(), BigDecimal.ROUND_HALF_EVEN);
-        }
-        byte[] bytes = value.unscaledValue().toByteArray();
-        if (bytes.length + 2 > maxLength) {
-            throw new IllegalArgumentException("'" + value + "' exceeds the expected length for type " + type);
-        }
-
-        BytesUtil.writeVInt(value.scale(), out);
-        BytesUtil.writeVInt(bytes.length, out);
-        out.put(bytes);
-    }
-
-    @Override
-    public BigDecimal deserialize(ByteBuffer in) {
-        int scale = BytesUtil.readVInt(in);
-        int n = BytesUtil.readVInt(in);
-
-        byte[] bytes = new byte[n];
-        in.get(bytes);
-
-        return new BigDecimal(new BigInteger(bytes), scale);
-    }
-
-
-    @Override
-    public int peekLength(ByteBuffer in) {
-        int mark = in.position();
-        
-        @SuppressWarnings("unused")
-        int scale = BytesUtil.readVInt(in);
-        int n = BytesUtil.readVInt(in);
-        int len = in.position() - mark + n;
-        
-        in.position(mark);
-        return len;
-    }
-    
-    @Override
-    public int maxLength() {
-        return maxLength;
-    }
-
-    @Override
-    public BigDecimal valueOf(byte[] value) {
-        if (value == null)
-            return new BigDecimal(0);
-        else
-            return new BigDecimal(Bytes.toString(value));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/src/main/java/org/apache/kylin/metadata/serializer/DataTypeSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/serializer/DataTypeSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/serializer/DataTypeSerializer.java
deleted file mode 100644
index 739cde4..0000000
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/serializer/DataTypeSerializer.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.serializer;
-
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.google.common.collect.Maps;
-import org.apache.kylin.common.util.BytesSerializer;
-import org.apache.kylin.metadata.model.DataType;
-
-/**
- * @author yangli9
- * 
- */
-abstract public class DataTypeSerializer<T> implements BytesSerializer<T> {
-
-    final static Map<String, Class<?>> implementations;
-    static {
-        HashMap<String, Class<?>> impl = Maps.newHashMap();
-        impl.put("varchar", StringSerializer.class);
-        impl.put("decimal", BigDecimalSerializer.class);
-        impl.put("double", DoubleSerializer.class);
-        impl.put("float", DoubleSerializer.class);
-        impl.put("bigint", LongSerializer.class);
-        impl.put("long", LongSerializer.class);
-        impl.put("integer", LongSerializer.class);
-        impl.put("int", LongSerializer.class);
-        impl.put("smallint", LongSerializer.class);
-        impl.put("date", DateTimeSerializer.class);
-        impl.put("datetime", DateTimeSerializer.class);
-        impl.put("timestamp", DateTimeSerializer.class);
-        implementations = Collections.unmodifiableMap(impl);
-
-    }
-
-    public static DataTypeSerializer<?> create(String dataType) {
-        return create(DataType.getInstance(dataType));
-    }
-    
-    public static DataTypeSerializer<?> create(DataType type) {
-        if (type.isHLLC()) {
-            return new HLLCSerializer(type);
-        }
-
-        Class<?> clz = implementations.get(type.getName());
-        if (clz == null)
-            throw new RuntimeException("No MeasureSerializer for type " + type);
-
-        try {
-            return (DataTypeSerializer<?>) clz.getConstructor(DataType.class).newInstance(type);
-        } catch (Exception e) {
-            throw new RuntimeException(e); // never happen
-        }
-    }
-    
-    /** peek into buffer and return the length of serialization */
-    abstract public int peekLength(ByteBuffer in);
-    
-    /** return the max number of bytes to the longest serialization */
-    abstract public int maxLength();
-    
-    /** convert from String to obj (string often come as byte[] in mapred) */
-    abstract public T valueOf(byte[] value);
-    
-    /** convert from String to obj */
-    public T valueOf(String value) {
-        try {
-            return valueOf(value.getBytes("UTF-8"));
-        } catch (UnsupportedEncodingException e) {
-            throw new RuntimeException(e); // never happen
-        }
-    }
-
-    /** convert from obj to string */
-    public String toString(T value) {
-        if (value == null)
-            return "NULL";
-        else
-            return value.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/src/main/java/org/apache/kylin/metadata/serializer/DateTimeSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/serializer/DateTimeSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/serializer/DateTimeSerializer.java
deleted file mode 100644
index 7c0fd8e..0000000
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/serializer/DateTimeSerializer.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package org.apache.kylin.metadata.serializer;
-
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.common.util.Bytes;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.kylin.metadata.model.DataType;
-import org.apache.kylin.common.util.DateFormat;
-
-public class DateTimeSerializer extends DataTypeSerializer<LongWritable> {
-    
-    // be thread-safe and avoid repeated obj creation
-    private ThreadLocal<LongWritable> current = new ThreadLocal<LongWritable>();
-
-    public DateTimeSerializer(DataType type) {
-    }
-
-    @Override
-    public void serialize(LongWritable value, ByteBuffer out) {
-        out.putLong(value.get());
-    }
-
-    private LongWritable current() {
-        LongWritable l = current.get();
-        if (l == null) {
-            l = new LongWritable();
-            current.set(l);
-        }
-        return l;
-    }
-    
-    @Override
-    public LongWritable deserialize(ByteBuffer in) {
-        LongWritable l = current();
-        l.set(in.getLong());
-        return l;
-    }
-
-    @Override
-    public int peekLength(ByteBuffer in) {
-        return 8;
-    }
-    
-    @Override
-    public int maxLength() {
-        return 8;
-    }
-
-    @Override
-    public LongWritable valueOf(byte[] value) {
-        LongWritable l = current();
-        if (value == null)
-            l.set(0L);
-        else
-            l.set(DateFormat.stringToMillis(Bytes.toString(value)));
-        return l;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/src/main/java/org/apache/kylin/metadata/serializer/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/serializer/DoubleSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/serializer/DoubleSerializer.java
deleted file mode 100644
index 2a92535..0000000
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/serializer/DoubleSerializer.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.serializer;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.metadata.model.DataType;
-
-/**
- * @author yangli9
- * 
- */
-public class DoubleSerializer extends DataTypeSerializer<DoubleWritable> {
-
-    // be thread-safe and avoid repeated obj creation
-    private ThreadLocal<DoubleWritable> current = new ThreadLocal<DoubleWritable>();
-
-    public DoubleSerializer(DataType type) {
-    }
-
-    @Override
-    public void serialize(DoubleWritable value, ByteBuffer out) {
-        out.putDouble(value.get());
-    }
-
-    private DoubleWritable current() {
-        DoubleWritable d = current.get();
-        if (d == null) {
-            d = new DoubleWritable();
-            current.set(d);
-        }
-        return d;
-    }
-    
-    @Override
-    public DoubleWritable deserialize(ByteBuffer in) {
-        DoubleWritable d = current();
-        d.set(in.getDouble());
-        return d;
-    }
-
-    @Override
-    public int peekLength(ByteBuffer in) {
-        return 8;
-    }
-
-    @Override
-    public int maxLength() {
-        return 8;
-    }
-    
-    @Override
-    public DoubleWritable valueOf(byte[] value) {
-        DoubleWritable d = current();
-        if (value == null)
-            d.set(0d);
-        else
-            d.set(Double.parseDouble(Bytes.toString(value)));
-        return d;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/src/main/java/org/apache/kylin/metadata/serializer/HLLCSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/serializer/HLLCSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/serializer/HLLCSerializer.java
deleted file mode 100644
index 3f923e8..0000000
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/serializer/HLLCSerializer.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.serializer;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.model.DataType;
-
-/**
- * @author yangli9
- * 
- */
-public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounter> {
-
-    // be thread-safe and avoid repeated obj creation
-    private ThreadLocal<HyperLogLogPlusCounter> current = new ThreadLocal<HyperLogLogPlusCounter>();
-    
-    private int precision;
-
-    public HLLCSerializer(DataType type) {
-        this.precision = type.getPrecision();
-    }
-
-    @Override
-    public void serialize(HyperLogLogPlusCounter value, ByteBuffer out) {
-        try {
-            value.writeRegisters(out);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private HyperLogLogPlusCounter current() {
-        HyperLogLogPlusCounter hllc = current.get();
-        if (hllc == null) {
-            hllc = new HyperLogLogPlusCounter(precision);
-            current.set(hllc);
-        }
-        return hllc;
-    }
-    
-    @Override
-    public HyperLogLogPlusCounter deserialize(ByteBuffer in) {
-        HyperLogLogPlusCounter hllc = current();
-        try {
-            hllc.readRegisters(in);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-        return hllc;
-    }
-
-    @Override
-    public int peekLength(ByteBuffer in) {
-        return current().peekLength(in);
-    }
-    
-    @Override
-    public int maxLength() {
-        return current().maxLength();
-    }
-
-    @Override
-    public HyperLogLogPlusCounter valueOf(byte[] value) {
-        HyperLogLogPlusCounter hllc = current();
-        hllc.clear();
-        if (value == null)
-            hllc.add("__nUlL__");
-        else
-            hllc.add(value);
-        return hllc;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/src/main/java/org/apache/kylin/metadata/serializer/LongSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/serializer/LongSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/serializer/LongSerializer.java
deleted file mode 100644
index 81b4b20..0000000
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/serializer/LongSerializer.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.serializer;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.metadata.model.DataType;
-
-/**
- * @author yangli9
- * 
- */
-public class LongSerializer extends DataTypeSerializer<LongWritable> {
-
-    // be thread-safe and avoid repeated obj creation
-    private ThreadLocal<LongWritable> current = new ThreadLocal<LongWritable>();
-
-    public LongSerializer(DataType type) {
-    }
-
-    @Override
-    public void serialize(LongWritable value, ByteBuffer out) {
-        BytesUtil.writeVLong(value.get(), out);
-    }
-
-    private LongWritable current() {
-        LongWritable l = current.get();
-        if (l == null) {
-            l = new LongWritable();
-            current.set(l);
-        }
-        return l;
-    }
-    
-    @Override
-    public LongWritable deserialize(ByteBuffer in) {
-        LongWritable l = current();
-        l.set(BytesUtil.readVLong(in));
-        return l;
-    }
-
-    @Override
-    public int peekLength(ByteBuffer in) {
-        int mark = in.position();
-        
-        BytesUtil.readVLong(in);
-        int len = in.position() - mark;
-        
-        in.position(mark);
-        return len;
-    }
-    
-    @Override
-    public int maxLength() {
-        return 9; // vlong: 1 + 8
-    }
-
-    @Override
-    public LongWritable valueOf(byte[] value) {
-        LongWritable l = current();
-        if (value == null)
-            l.set(0L);
-        else
-            l.set(Long.parseLong(Bytes.toString(value)));
-        return l;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/src/main/java/org/apache/kylin/metadata/serializer/StringSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/serializer/StringSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/serializer/StringSerializer.java
deleted file mode 100644
index 8686fb7..0000000
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/serializer/StringSerializer.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package org.apache.kylin.metadata.serializer;
-
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.metadata.model.DataType;
-
-public class StringSerializer extends DataTypeSerializer<String> {
-
-    final DataType type;
-    final int maxLength;
-
-    public StringSerializer(DataType type) {
-        this.type = type;
-        // see serialize(): 2 byte length, rest is String.toBytes()
-        this.maxLength = 2 + type.getPrecision();
-    }
-
-    @Override
-    public void serialize(String value, ByteBuffer out) {
-        int start = out.position();
-
-        BytesUtil.writeUTFString(value, out);
-
-        if (out.position() - start > maxLength)
-            throw new IllegalArgumentException("'" + value + "' exceeds the expected length for type " + type);
-    }
-
-    @Override
-    public String deserialize(ByteBuffer in) {
-        return BytesUtil.readUTFString(in);
-    }
-    
-    @Override
-    public int peekLength(ByteBuffer in) {
-        return BytesUtil.peekByteArrayLength(in);
-    }
-
-    @Override
-    public int maxLength() {
-        return maxLength;
-    }
-
-    @Override
-    public String valueOf(byte[] value) {
-        return Bytes.toString(value);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/BigDecimalSerializerTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/BigDecimalSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/BigDecimalSerializerTest.java
new file mode 100644
index 0000000..dc5ff4c
--- /dev/null
+++ b/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/BigDecimalSerializerTest.java
@@ -0,0 +1,53 @@
+package org.apache.kylin.metadata.measure.serializer;
+
+import org.apache.kylin.metadata.measure.serializer.BigDecimalSerializer;
+import org.apache.kylin.metadata.model.DataType;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ */
+public class BigDecimalSerializerTest {
+
+    private static BigDecimalSerializer bigDecimalSerializer;
+
+    @BeforeClass
+    public static void beforeClass() {
+        bigDecimalSerializer = new BigDecimalSerializer(DataType.getInstance("decimal"));
+    }
+
+    @Test
+    public void testNormal() {
+        BigDecimal input = new BigDecimal("1234.1234");
+        ByteBuffer buffer = ByteBuffer.allocate(256);
+        buffer.mark();
+        bigDecimalSerializer.serialize(input, buffer);
+        buffer.reset();
+        BigDecimal output = bigDecimalSerializer.deserialize(buffer);
+        assertEquals(input, output);
+    }
+
+    @Test
+    public void testScaleOutOfRange() {
+        BigDecimal input = new BigDecimal("1234.1234567890");
+        ByteBuffer buffer = ByteBuffer.allocate(256);
+        buffer.mark();
+        bigDecimalSerializer.serialize(input, buffer);
+        buffer.reset();
+        BigDecimal output = bigDecimalSerializer.deserialize(buffer);
+        assertEquals(input.setScale(bigDecimalSerializer.type.getScale(), BigDecimal.ROUND_HALF_EVEN), output);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testOutOfPrecision() {
+        BigDecimal input = new BigDecimal("66855344214907231736.4924");
+        ByteBuffer buffer = ByteBuffer.allocate(256);
+        bigDecimalSerializer.serialize(input, buffer);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/core-metadata/src/test/java/org/apache/kylin/metadata/serializer/BigDecimalSerializerTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/serializer/BigDecimalSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/serializer/BigDecimalSerializerTest.java
deleted file mode 100644
index e7b7916..0000000
--- a/core-metadata/src/test/java/org/apache/kylin/metadata/serializer/BigDecimalSerializerTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package org.apache.kylin.metadata.serializer;
-
-import org.apache.kylin.metadata.model.DataType;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- */
-public class BigDecimalSerializerTest {
-
-    private static BigDecimalSerializer bigDecimalSerializer;
-
-    @BeforeClass
-    public static void beforeClass() {
-        bigDecimalSerializer = new BigDecimalSerializer(DataType.getInstance("decimal"));
-    }
-
-    @Test
-    public void testNormal() {
-        BigDecimal input = new BigDecimal("1234.1234");
-        ByteBuffer buffer = ByteBuffer.allocate(256);
-        buffer.mark();
-        bigDecimalSerializer.serialize(input, buffer);
-        buffer.reset();
-        BigDecimal output = bigDecimalSerializer.deserialize(buffer);
-        assertEquals(input, output);
-    }
-
-    @Test
-    public void testScaleOutOfRange() {
-        BigDecimal input = new BigDecimal("1234.1234567890");
-        ByteBuffer buffer = ByteBuffer.allocate(256);
-        buffer.mark();
-        bigDecimalSerializer.serialize(input, buffer);
-        buffer.reset();
-        BigDecimal output = bigDecimalSerializer.deserialize(buffer);
-        assertEquals(input.setScale(bigDecimalSerializer.type.getScale(), BigDecimal.ROUND_HALF_EVEN), output);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testOutOfPrecision() {
-        BigDecimal input = new BigDecimal("66855344214907231736.4924");
-        ByteBuffer buffer = ByteBuffer.allocate(256);
-        bigDecimalSerializer.serialize(input, buffer);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
index 3d3ed51..2e07c24 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
@@ -13,7 +13,7 @@ import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.metadata.measure.MeasureAggregator;
-import org.apache.kylin.metadata.serializer.DataTypeSerializer;
+import org.apache.kylin.metadata.measure.serializer.DataTypeSerializer;
 import org.apache.kylin.storage.gridtable.GTInfo;
 import org.apache.kylin.storage.gridtable.IGTCodeSystem;
 import org.apache.kylin.storage.gridtable.IGTComparator;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java
index 71a9962..d3fa42d 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java
@@ -5,7 +5,7 @@ import java.nio.ByteBuffer;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.metadata.measure.MeasureAggregator;
-import org.apache.kylin.metadata.serializer.DataTypeSerializer;
+import org.apache.kylin.metadata.measure.serializer.DataTypeSerializer;
 
 @SuppressWarnings({ "rawtypes", "unchecked" })
 /**

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/storage/src/test/java/org/apache/kylin/storage/gridtable/AggregationCacheMemSizeTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/AggregationCacheMemSizeTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/AggregationCacheMemSizeTest.java
index 6520795..649fd5a 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/AggregationCacheMemSizeTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/AggregationCacheMemSizeTest.java
@@ -23,14 +23,14 @@ import java.util.Random;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.metadata.measure.BigDecimalSumAggregator;
 import org.apache.kylin.metadata.measure.DoubleSumAggregator;
+import org.apache.kylin.metadata.measure.DoubleMutable;
 import org.apache.kylin.metadata.measure.HLLCAggregator;
 import org.apache.kylin.metadata.measure.LongSumAggregator;
+import org.apache.kylin.metadata.measure.LongMutable;
 import org.apache.kylin.metadata.measure.MeasureAggregator;
 import org.junit.Test;
 
@@ -86,7 +86,7 @@ public class AggregationCacheMemSizeTest {
 
     private LongSumAggregator newLongAggr() {
         LongSumAggregator aggr = new LongSumAggregator();
-        aggr.aggregate(new LongWritable(10));
+        aggr.aggregate(new LongMutable(10));
         return aggr;
     }
 
@@ -103,7 +103,7 @@ public class AggregationCacheMemSizeTest {
 
     private DoubleSumAggregator newDoubleAggr() {
         DoubleSumAggregator aggr = new DoubleSumAggregator();
-        aggr.aggregate(new DoubleWritable(10));
+        aggr.aggregate(new DoubleMutable(10));
         return aggr;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ecd78c5d/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java
index ee08f2a..3e6f672 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java
@@ -32,9 +32,9 @@ import org.apache.kylin.metadata.filter.ConstantTupleFilter;
 import org.apache.kylin.metadata.filter.LogicalTupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.measure.serializer.StringSerializer;
 import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.serializer.StringSerializer;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;