You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/27 10:01:01 UTC

[06/13] incubator-kylin git commit: KYLIN-976 AggregationType interface, serializer and aggregator

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSerializer.java
deleted file mode 100644
index 202596d..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSerializer.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.basic;
-
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.aggregation.DataTypeSerializer;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.LongMutable;
-import org.apache.kylin.metadata.model.DataType;
-
-/**
- */
-public class LongSerializer extends DataTypeSerializer<LongMutable> {
-
-    // be thread-safe and avoid repeated obj creation
-    private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>();
-
-    public LongSerializer(DataType type) {
-    }
-
-    @Override
-    public void serialize(LongMutable value, ByteBuffer out) {
-        BytesUtil.writeVLong(value.get(), out);
-    }
-
-    private LongMutable current() {
-        LongMutable l = current.get();
-        if (l == null) {
-            l = new LongMutable();
-            current.set(l);
-        }
-        return l;
-    }
-
-    @Override
-    public LongMutable deserialize(ByteBuffer in) {
-        LongMutable l = current();
-        l.set(BytesUtil.readVLong(in));
-        return l;
-    }
-
-    @Override
-    public int peekLength(ByteBuffer in) {
-        int mark = in.position();
-
-        BytesUtil.readVLong(in);
-        int len = in.position() - mark;
-
-        in.position(mark);
-        return len;
-    }
-
-    @Override
-    public int maxLength() {
-        return 9; // vlong: 1 + 8
-    }
-
-    @Override
-    public int getStorageBytesEstimate() {
-        return 5;
-    }
-
-    @Override
-    public LongMutable valueOf(byte[] value) {
-        LongMutable l = current();
-        if (value == null)
-            l.set(0L);
-        else
-            l.set(Long.parseLong(Bytes.toString(value)));
-        return l;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java
index c85c83c..38d728a 100644
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java
@@ -19,7 +19,7 @@
 package org.apache.kylin.aggregation.basic;
 
 import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.util.LongMutable;
+import org.apache.kylin.common.datatype.LongMutable;
 
 /**
  */

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/basic/StringSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/StringSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/StringSerializer.java
deleted file mode 100644
index e84278d..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/StringSerializer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package org.apache.kylin.aggregation.basic;
-
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.aggregation.DataTypeSerializer;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.metadata.model.DataType;
-
-public class StringSerializer extends DataTypeSerializer<String> {
-
-    final DataType type;
-    final int maxLength;
-
-    public StringSerializer(DataType type) {
-        this.type = type;
-        // see serialize(): 2 byte length, rest is String.toBytes()
-        this.maxLength = 2 + type.getPrecision();
-    }
-
-    @Override
-    public void serialize(String value, ByteBuffer out) {
-        int start = out.position();
-
-        BytesUtil.writeUTFString(value, out);
-
-        if (out.position() - start > maxLength)
-            throw new IllegalArgumentException("'" + value + "' exceeds the expected length for type " + type);
-    }
-
-    @Override
-    public String deserialize(ByteBuffer in) {
-        return BytesUtil.readUTFString(in);
-    }
-
-    @Override
-    public int peekLength(ByteBuffer in) {
-        return BytesUtil.peekByteArrayLength(in);
-    }
-
-    @Override
-    public int maxLength() {
-        return maxLength;
-    }
-
-    @Override
-    public int getStorageBytesEstimate() {
-        return maxLength;
-    }
-
-    @Override
-    public String valueOf(byte[] value) {
-        return Bytes.toString(value);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregation.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregation.java b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregation.java
new file mode 100644
index 0000000..d5ceba5
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregation.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.hllc;
+
+import java.util.List;
+
+import org.apache.kylin.aggregation.AggregationType;
+import org.apache.kylin.aggregation.MeasureAggregator;
+import org.apache.kylin.common.datatype.DataType;
+import org.apache.kylin.common.datatype.DataTypeSerializer;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+public class HLLCAggregation extends AggregationType {
+    
+    private final DataType dataType;
+    
+    public HLLCAggregation(String dataType) {
+        this.dataType = DataType.getType(dataType);
+        
+        if (this.dataType.getPrecision() < 10 || this.dataType.getPrecision() > 16)
+            throw new IllegalArgumentException("HLLC precision must be between 10 and 16");
+    }
+
+    @Override
+    public DataType getAggregationDataType() {
+        return dataType;
+    }
+
+    @Override
+    public Class<? extends DataTypeSerializer<?>> getAggregationDataSeralizer() {
+        return HLLCSerializer.class;
+    }
+    
+    @Override
+    public void validate(MeasureDesc measureDesc) throws IllegalArgumentException {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public MeasureAggregator<?> newAggregator() {
+      if (dataType.isHLLC())
+          return new HLLCAggregator(dataType.getPrecision());
+      else
+          return new LDCAggregator();
+    }
+
+    @Override
+    public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregationFactory.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregationFactory.java b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregationFactory.java
new file mode 100644
index 0000000..18c021d
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregationFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.hllc;
+
+import org.apache.kylin.aggregation.AggregationType;
+import org.apache.kylin.aggregation.IAggregationFactory;
+import org.apache.kylin.metadata.model.FunctionDesc;
+
+public class HLLCAggregationFactory implements IAggregationFactory {
+
+    @Override
+    public AggregationType createAggregationType(String funcName, String dataType) {
+        if (FunctionDesc.FUNC_COUNT_DISTINCT.equalsIgnoreCase(funcName) == false)
+            throw new IllegalArgumentException();
+        
+        return new HLLCAggregation(dataType);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java
index f7804f4..5612892 100644
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java
@@ -21,9 +21,9 @@ package org.apache.kylin.aggregation.hllc;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import org.apache.kylin.aggregation.DataTypeSerializer;
+import org.apache.kylin.common.datatype.DataType;
+import org.apache.kylin.common.datatype.DataTypeSerializer;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.model.DataType;
 
 /**
  * @author yangli9

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java
index 643bcae..151c1ee 100644
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java
@@ -19,7 +19,7 @@
 package org.apache.kylin.aggregation.hllc;
 
 import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.util.LongMutable;
+import org.apache.kylin.common.datatype.LongMutable;
 
 /**
  * Long Distinct Count

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregation.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregation.java b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregation.java
new file mode 100644
index 0000000..251abd9
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregation.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.topn;
+
+import java.util.List;
+
+import org.apache.kylin.aggregation.AggregationType;
+import org.apache.kylin.aggregation.MeasureAggregator;
+import org.apache.kylin.aggregation.hllc.HLLCSerializer;
+import org.apache.kylin.common.datatype.DataType;
+import org.apache.kylin.common.datatype.DataTypeSerializer;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+public class TopNAggregation extends AggregationType {
+
+    private final DataType dataType;
+
+    public TopNAggregation(String dataType) {
+        this.dataType = DataType.getType(dataType);
+        
+        if (this.dataType.getPrecision() < 1 || this.dataType.getPrecision() > 1000)
+            throw new IllegalArgumentException("TopN precision must be between 1 and 1000");
+    }
+
+    @Override
+    public DataType getAggregationDataType() {
+        return dataType;
+    }
+
+    @Override
+    public Class<? extends DataTypeSerializer<?>> getAggregationDataSeralizer() {
+        return HLLCSerializer.class;
+    }
+    
+    @Override
+    public void validate(MeasureDesc measureDesc) throws IllegalArgumentException {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public MeasureAggregator<?> newAggregator() {
+        return new TopNAggregator();
+    }
+
+    @Override
+    public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregationFactory.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregationFactory.java b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregationFactory.java
new file mode 100644
index 0000000..1ea22c8
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregationFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.topn;
+
+import org.apache.kylin.aggregation.AggregationType;
+import org.apache.kylin.aggregation.IAggregationFactory;
+import org.apache.kylin.metadata.model.FunctionDesc;
+
+public class TopNAggregationFactory implements IAggregationFactory {
+
+    @Override
+    public AggregationType createAggregationType(String funcName, String dataType) {
+        if (FunctionDesc.FUNC_TOP_N.equalsIgnoreCase(funcName) == false)
+            throw new IllegalArgumentException();
+        
+        return new TopNAggregation(dataType);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java
index 8c44f8f..8088842 100644
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java
@@ -18,17 +18,17 @@
 
 package org.apache.kylin.aggregation.topn;
 
-import org.apache.kylin.aggregation.DataTypeSerializer;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.kylin.common.datatype.DataType;
+import org.apache.kylin.common.datatype.DataTypeSerializer;
 import org.apache.kylin.common.topn.Counter;
 import org.apache.kylin.common.topn.DoubleDeltaSerializer;
 import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.metadata.model.DataType;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.List;
 
 /**
  * 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
index b88f9df..3619d69 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
@@ -6,9 +6,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 
-import org.apache.kylin.aggregation.DataTypeSerializer;
 import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.aggregation.basic.StringSerializer;
+import org.apache.kylin.common.datatype.DataTypeSerializer;
+import org.apache.kylin.common.datatype.StringSerializer;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.ImmutableBitSet;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
index 33db11b..aa0a530 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
@@ -11,7 +11,6 @@ import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.metadata.model.TblColRef;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
 @SuppressWarnings("rawtypes")

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
index c95e932..36db773 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
@@ -6,11 +6,11 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.kylin.common.datatype.DataType;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
 import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
index 26f1636..d30186e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
@@ -23,9 +23,8 @@ package org.apache.kylin.cube.gridtable;
 import java.nio.ByteBuffer;
 import java.util.Map;
 
-import org.apache.kylin.aggregation.DataTypeSerializer;
 import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.aggregation.basic.StringSerializer;
+import org.apache.kylin.common.datatype.DataTypeSerializer;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.IGTCodeSystem;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index 8c6146b..a393179 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -28,9 +28,9 @@ import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.kylin.common.datatype.DoubleMutable;
 import org.apache.kylin.common.topn.Counter;
 import org.apache.kylin.common.topn.TopNCounter;
-import org.apache.kylin.common.util.DoubleMutable;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.MemoryBudgetController;
 import org.apache.kylin.common.util.Pair;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
index 951c054..bf4278a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
@@ -20,8 +20,8 @@ package org.apache.kylin.cube.inmemcubing;
 import com.google.common.base.Preconditions;
 
 import org.apache.kylin.aggregation.MeasureCodec;
+import org.apache.kylin.common.datatype.LongMutable;
 import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.LongMutable;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java
index d7a48d9..a21fe9f 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java
@@ -21,7 +21,7 @@ package org.apache.kylin.cube.kv;
 import java.util.Collection;
 import java.util.Comparator;
 
-import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.common.datatype.DataType;
 
 /**
  * @author yangli9

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
index 1920fc7..8e36009 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
@@ -26,13 +26,13 @@ import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.datatype.DataType;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.validation.IValidatorRule;
 import org.apache.kylin.cube.model.validation.ResultLevel;
 import org.apache.kylin.cube.model.validation.ValidateContext;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.ParameterDesc;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
index e3d3640..229c679 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
@@ -5,10 +5,10 @@ import java.util.BitSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 
+import org.apache.kylin.common.datatype.DataType;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.cube.gridtable.CubeCodeSystem;
 import org.apache.kylin.cube.util.KryoUtils;
-import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
 
 public class GTInfo {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
index be92f73..2783f55 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
@@ -2,8 +2,8 @@ package org.apache.kylin.gridtable;
 
 import java.nio.ByteBuffer;
 
-import org.apache.kylin.aggregation.DataTypeSerializer;
 import org.apache.kylin.aggregation.MeasureAggregator;
+import org.apache.kylin.common.datatype.DataTypeSerializer;
 import org.apache.kylin.common.util.ImmutableBitSet;
 
 @SuppressWarnings({ "rawtypes", "unchecked" })

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java b/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
index 9852acc..f8d7f30 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
@@ -22,11 +22,11 @@ import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
 
+import org.apache.kylin.common.datatype.DataType;
+import org.apache.kylin.common.datatype.LongMutable;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.LongMutable;
 import org.apache.kylin.gridtable.GTInfo.Builder;
-import org.apache.kylin.metadata.model.DataType;
 
 public class UnitTestSupport {
 
@@ -48,11 +48,11 @@ public class UnitTestSupport {
         Builder builder = GTInfo.builder();
         builder.setCodeSystem(new GTSampleCodeSystem());
         builder.setColumns( //
-                DataType.getInstance("varchar(10)"), //
-                DataType.getInstance("varchar(10)"), //
-                DataType.getInstance("varchar(10)"), //
-                DataType.getInstance("bigint"), //
-                DataType.getInstance("decimal") //
+                DataType.getType("varchar(10)"), //
+                DataType.getType("varchar(10)"), //
+                DataType.getType("varchar(10)"), //
+                DataType.getType("bigint"), //
+                DataType.getType("decimal") //
         );
         builder.setPrimaryKey(setOf(0));
         builder.setColumnPreferIndex(setOf(0));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/test/java/org/apache/kylin/aggregation/basic/BigDecimalSerializerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/aggregation/basic/BigDecimalSerializerTest.java b/core-cube/src/test/java/org/apache/kylin/aggregation/basic/BigDecimalSerializerTest.java
deleted file mode 100644
index 27aa07c..0000000
--- a/core-cube/src/test/java/org/apache/kylin/aggregation/basic/BigDecimalSerializerTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package org.apache.kylin.aggregation.basic;
-
-import static org.junit.Assert.assertEquals;
-
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.aggregation.basic.BigDecimalSerializer;
-import org.apache.kylin.metadata.model.DataType;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-/**
- */
-public class BigDecimalSerializerTest {
-
-    private static BigDecimalSerializer bigDecimalSerializer;
-
-    @BeforeClass
-    public static void beforeClass() {
-        bigDecimalSerializer = new BigDecimalSerializer(DataType.getInstance("decimal"));
-    }
-
-    @Test
-    public void testNormal() {
-        BigDecimal input = new BigDecimal("1234.1234");
-        ByteBuffer buffer = ByteBuffer.allocate(256);
-        buffer.mark();
-        bigDecimalSerializer.serialize(input, buffer);
-        buffer.reset();
-        BigDecimal output = bigDecimalSerializer.deserialize(buffer);
-        assertEquals(input, output);
-    }
-
-    @Test
-    public void testScaleOutOfRange() {
-        BigDecimal input = new BigDecimal("1234.1234567890");
-        ByteBuffer buffer = ByteBuffer.allocate(256);
-        buffer.mark();
-        bigDecimalSerializer.serialize(input, buffer);
-        buffer.reset();
-        BigDecimal output = bigDecimalSerializer.deserialize(buffer);
-        assertEquals(input.setScale(bigDecimalSerializer.type.getScale(), BigDecimal.ROUND_HALF_EVEN), output);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testOutOfPrecision() {
-        BigDecimal input = new BigDecimal("66855344214907231736.4924");
-        ByteBuffer buffer = ByteBuffer.allocate(256);
-        bigDecimalSerializer.serialize(input, buffer);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java b/core-cube/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
index 8ee7b8d..8ae44b6 100644
--- a/core-cube/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
@@ -1,22 +1,21 @@
 package org.apache.kylin.aggregation.topn;
 
-import org.apache.kylin.aggregation.topn.TopNCounterSerializer;
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.datatype.DataType;
 import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.metadata.model.DataType;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.nio.ByteBuffer;
-
 /**
  * 
  */
 public class TopNCounterSerializerTest {
 
-    private static TopNCounterSerializer serializer = new TopNCounterSerializer(DataType.getInstance("topn(10)"));
+    private static TopNCounterSerializer serializer = new TopNCounterSerializer(DataType.getType("topn(10)"));
 
     @Test
     public void testSerialization() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
index f407977..93f0419 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
@@ -28,10 +28,10 @@ import org.apache.kylin.aggregation.basic.BigDecimalSumAggregator;
 import org.apache.kylin.aggregation.basic.DoubleSumAggregator;
 import org.apache.kylin.aggregation.basic.LongSumAggregator;
 import org.apache.kylin.aggregation.hllc.HLLCAggregator;
+import org.apache.kylin.common.datatype.DoubleMutable;
+import org.apache.kylin.common.datatype.LongMutable;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.DoubleMutable;
-import org.apache.kylin.common.util.LongMutable;
 import org.junit.Test;
 
 public class AggregationCacheMemSizeTest {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
index f5247e2..b3981e8 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.kylin.gridtable;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
 
 import java.io.IOException;
 import java.math.BigDecimal;
@@ -27,9 +27,10 @@ import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.kylin.common.datatype.DataType;
+import org.apache.kylin.common.datatype.LongMutable;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.LongMutable;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.gridtable.CubeCodeSystem;
 import org.apache.kylin.dict.Dictionary;
@@ -46,7 +47,6 @@ import org.apache.kylin.metadata.filter.LogicalTupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
 import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.junit.Before;
@@ -408,11 +408,11 @@ public class DictGridTableTest {
         Builder builder = GTInfo.builder();
         builder.setCodeSystem(newDictCodeSystem());
         builder.setColumns( //
-                DataType.getInstance("timestamp"), //
-                DataType.getInstance("integer"), //
-                DataType.getInstance("varchar(10)"), //
-                DataType.getInstance("bigint"), //
-                DataType.getInstance("decimal") //
+                DataType.getType("timestamp"), //
+                DataType.getType("integer"), //
+                DataType.getType("varchar(10)"), //
+                DataType.getType("bigint"), //
+                DataType.getType("decimal") //
         );
         builder.setPrimaryKey(setOf(0, 1));
         builder.setColumnPreferIndex(setOf(0));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
index 361617a..4313f4b 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
@@ -26,8 +26,8 @@ import java.math.BigDecimal;
 import java.util.BitSet;
 import java.util.List;
 
+import org.apache.kylin.common.datatype.LongMutable;
 import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.LongMutable;
 import org.apache.kylin.gridtable.memstore.GTSimpleMemStore;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java
index a964e67..7f4da61 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java
@@ -17,29 +17,28 @@
 
 package org.apache.kylin.gridtable;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
+import it.uniroma3.mat.extendedset.intset.ConciseSet;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 
-import org.apache.kylin.aggregation.basic.StringSerializer;
+import org.apache.kylin.common.datatype.DataType;
+import org.apache.kylin.common.datatype.LongMutable;
+import org.apache.kylin.common.datatype.StringSerializer;
 import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.LongMutable;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.ConstantTupleFilter;
 import org.apache.kylin.metadata.filter.LogicalTupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
-import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
 
-import it.uniroma3.mat.extendedset.intset.ConciseSet;
-
 public class SimpleInvertedIndexTest {
 
     GTInfo info;
@@ -167,7 +166,7 @@ public class SimpleInvertedIndexTest {
     public static ConstantTupleFilter constFilter(int id) {
         byte[] space = new byte[10];
         ByteBuffer buf = ByteBuffer.wrap(space);
-        StringSerializer stringSerializer = new StringSerializer(DataType.getInstance("string"));
+        StringSerializer stringSerializer = new StringSerializer(DataType.getType("string"));
         stringSerializer.serialize("" + id, buf);
         ByteArray data = new ByteArray(buf.array(), buf.arrayOffset(), buf.position());
         return new ConstantTupleFilter(data);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java b/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
index 0e13d7e..4da5471 100644
--- a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
@@ -24,9 +24,9 @@ import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 
 import org.apache.kylin.aggregation.MeasureCodec;
+import org.apache.kylin.common.datatype.DoubleMutable;
+import org.apache.kylin.common.datatype.LongMutable;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.util.DoubleMutable;
-import org.apache.kylin.common.util.LongMutable;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
index 8027f41..300c240 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
@@ -21,20 +21,19 @@ package org.apache.kylin.dict;
 import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.datatype.DataType;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.source.ReadableTable;
-import org.apache.kylin.source.ReadableTable.TableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 
 /**
  * @author yangli9
@@ -105,7 +104,7 @@ public class DictionaryGenerator {
             logger.debug("Building dictionary object " + JsonUtil.writeValueAsString(info));
 
             columnValueEnumerator = new TableColumnValueEnumerator(inpTable.getReader(), info.getSourceColumnIndex());
-            return buildDictionaryFromValueEnumerator(DataType.getInstance(info.getDataType()), columnValueEnumerator);
+            return buildDictionaryFromValueEnumerator(DataType.getType(info.getDataType()), columnValueEnumerator);
         } finally {
             if (columnValueEnumerator != null)
                 columnValueEnumerator.close();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index 981e19a..2f4b761 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -30,10 +30,10 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.datatype.DataType;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.ReadableTable;
@@ -218,7 +218,7 @@ public class DictionaryManager {
             logger.info("Use one of the merging dictionaries directly");
             return dicts.get(0);
         } else {
-            Dictionary<?> newDict = DictionaryGenerator.mergeDictionaries(DataType.getInstance(newDictInfo.getDataType()), dicts);
+            Dictionary<?> newDict = DictionaryGenerator.mergeDictionaries(DataType.getType(newDictInfo.getDataType()), dicts);
             return trySaveNewDict(newDict, newDictInfo);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java
index e51153c..8c8dcfc 100644
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java
@@ -18,10 +18,9 @@
 
 package org.apache.kylin.dict;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
-import java.io.IOException;
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.Collection;
 import java.util.Collections;
@@ -29,8 +28,8 @@ import java.util.List;
 import java.util.Random;
 import java.util.Set;
 
+import org.apache.kylin.common.datatype.DataType;
 import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.metadata.model.DataType;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
@@ -54,7 +53,7 @@ public class NumberDictionaryTest {
         }
 
         // check "" is treated as NULL, not a code of dictionary
-        Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(DataType.getInstance("integer"), new IterableDictionaryValueEnumerator(intBytes));
+        Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(DataType.getType("integer"), new IterableDictionaryValueEnumerator(intBytes));
         assertEquals(4, dict.getSize());
 
         final int id = ((NumberDictionary<String>) dict).getIdFromValue("");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java
index e4bdf3d..0e687f7 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java
@@ -5,8 +5,8 @@ import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.kylin.common.datatype.DataType;
 import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
 
 import com.google.common.collect.Maps;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
index 6162477..f03e736 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
@@ -18,14 +18,15 @@
 
 package org.apache.kylin.metadata.model;
 
+import java.io.Serializable;
+
 import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.datatype.DataType;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-import java.io.Serializable;
-
 /**
  * Column Metadata from Source. All name should be uppercase.
  * <p/>
@@ -72,7 +73,7 @@ public class ColumnDesc implements Serializable {
     public void setDatatype(String datatype) {
         //logger.info("setting datatype to " + datatype);
         this.datatype = datatype;
-        type = DataType.getInstance(datatype);
+        type = DataType.getType(datatype);
     }
 
     public String getId() {
@@ -132,7 +133,7 @@ public class ColumnDesc implements Serializable {
         if (id != null)
             zeroBasedIndex = Integer.parseInt(id) - 1;
 
-        DataType normalized = DataType.getInstance(datatype);
+        DataType normalized = DataType.getType(datatype);
         if (normalized == null) {
             this.setDatatype(null);
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
deleted file mode 100644
index 1333426..0000000
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.model;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.kylin.aggregation.DataTypeSerializer;
-
-/**
- */
-@SuppressWarnings("serial")
-public class DataType implements Serializable {
-
-    public static final String VALID_TYPES_STRING = "any|char|varchar|boolean|binary" //
-            + "|integer|tinyint|smallint|bigint|decimal|numeric|float|real|double" //
-            + "|date|time|datetime|timestamp|byte|int|short|long|string|hllc|topn" //
-            + "|" + TblColRef.InnerDataTypeEnum.LITERAL.getDataType() //
-            + "|" + TblColRef.InnerDataTypeEnum.DERIVED.getDataType();
-
-    private static final Pattern TYPE_PATTERN = Pattern.compile(
-    // standard sql types, ref:
-    // http://www.w3schools.com/sql/sql_datatypes_general.asp
-            "(" + VALID_TYPES_STRING + ")" + "\\s*" //
-                    + "(?:" + "[(]" + "([\\d\\s,]+)" + "[)]" + ")?", Pattern.CASE_INSENSITIVE);
-
-    public static final Set<String> INTEGER_FAMILY = new HashSet<String>();
-    public static final Set<String> NUMBER_FAMILY = new HashSet<String>();
-    public static final Set<String> DATETIME_FAMILY = new HashSet<String>();
-    public static final Set<String> STRING_FAMILY = new HashSet<String>();
-    private static final Set<Integer> HLLC_PRECISIONS = new HashSet<Integer>();
-    private static final Map<String, String> LEGACY_TYPE_MAP = new HashMap<String, String>();
-    static {
-        INTEGER_FAMILY.add("tinyint");
-        INTEGER_FAMILY.add("smallint");
-        INTEGER_FAMILY.add("integer");
-        INTEGER_FAMILY.add("bigint");
-
-        NUMBER_FAMILY.addAll(INTEGER_FAMILY);
-        NUMBER_FAMILY.add("float");
-        NUMBER_FAMILY.add("double");
-        NUMBER_FAMILY.add("decimal");
-        NUMBER_FAMILY.add("real");
-        NUMBER_FAMILY.add("numeric");
-
-        DATETIME_FAMILY.add("date");
-        DATETIME_FAMILY.add("time");
-        DATETIME_FAMILY.add("datetime");
-        DATETIME_FAMILY.add("timestamp");
-
-        STRING_FAMILY.add("varchar");
-        STRING_FAMILY.add("char");
-
-        LEGACY_TYPE_MAP.put("byte", "tinyint");
-        LEGACY_TYPE_MAP.put("int", "integer");
-        LEGACY_TYPE_MAP.put("short", "smallint");
-        LEGACY_TYPE_MAP.put("long", "bigint");
-        LEGACY_TYPE_MAP.put("string", "varchar");
-        LEGACY_TYPE_MAP.put("hllc10", "hllc(10)");
-        LEGACY_TYPE_MAP.put("hllc12", "hllc(12)");
-        LEGACY_TYPE_MAP.put("hllc14", "hllc(14)");
-        LEGACY_TYPE_MAP.put("hllc15", "hllc(15)");
-        LEGACY_TYPE_MAP.put("hllc16", "hllc(16)");
-
-        for (int i = 10; i <= 16; i++)
-            HLLC_PRECISIONS.add(i);
-    }
-
-    private static final ConcurrentMap<DataType, DataType> CACHE = new ConcurrentHashMap<DataType, DataType>();
-
-    public static final DataType ANY = DataType.getInstance("any");
-
-    public static DataType getInstance(String type) {
-        if (type == null)
-            return null;
-
-        DataType dataType = new DataType(type);
-        DataType cached = CACHE.get(dataType);
-        if (cached == null) {
-            CACHE.put(dataType, dataType);
-            cached = dataType;
-        }
-        return cached;
-    }
-
-    // ============================================================================
-
-    private String name;
-    private int precision;
-    private int scale;
-
-    DataType(String datatype) {
-        parseDataType(datatype);
-    }
-
-    private void parseDataType(String datatype) {
-        datatype = datatype.trim().toLowerCase();
-        datatype = replaceLegacy(datatype);
-
-        Matcher m = TYPE_PATTERN.matcher(datatype);
-        if (m.matches() == false)
-            throw new IllegalArgumentException("bad data type -- " + datatype + ", does not match " + TYPE_PATTERN);
-
-        name = replaceLegacy(m.group(1));
-        precision = -1;
-        scale = -1;
-
-        String leftover = m.group(2);
-        if (leftover != null) {
-            String[] parts = leftover.split("\\s*,\\s*");
-            for (int i = 0; i < parts.length; i++) {
-                int n;
-                try {
-                    n = Integer.parseInt(parts[i]);
-                } catch (NumberFormatException e) {
-                    throw new IllegalArgumentException("bad data type -- " + datatype + ", precision/scale not numeric");
-                }
-                if (i == 0)
-                    precision = n;
-                else if (i == 1)
-                    scale = n;
-                else
-                    throw new IllegalArgumentException("bad data type -- " + datatype + ", too many precision/scale parts");
-            }
-        }
-
-        // FIXME 256 for unknown string precision
-        if ((name.equals("char") || name.equals("varchar")) && precision == -1) {
-            precision = 256; // to save memory at frontend, e.g. tableau will
-                             // allocate memory according to this
-        }
-
-        // FIXME (19,4) for unknown decimal precision
-        if ((name.equals("decimal") || name.equals("numeric")) && precision == -1) {
-            precision = 19;
-            scale = 4;
-        }
-
-        if (isHLLC() && HLLC_PRECISIONS.contains(precision) == false)
-            throw new IllegalArgumentException("HLLC precision must be one of " + HLLC_PRECISIONS);
-    }
-
-    private String replaceLegacy(String str) {
-        String replace = LEGACY_TYPE_MAP.get(str);
-        return replace == null ? str : replace;
-    }
-
-    public int getStorageBytesEstimate() {
-        return DataTypeSerializer.create(this).getStorageBytesEstimate();
-    }
-
-    public boolean isStringFamily() {
-        return STRING_FAMILY.contains(name);
-    }
-
-    public boolean isIntegerFamily() {
-        return INTEGER_FAMILY.contains(name);
-    }
-
-    public boolean isNumberFamily() {
-        return NUMBER_FAMILY.contains(name);
-    }
-
-    public boolean isDateTimeFamily() {
-        return DATETIME_FAMILY.contains(name);
-    }
-
-    public boolean isDate() {
-        return name.equals("date");
-    }
-
-    public boolean isTime() {
-        return name.equals("time");
-    }
-
-    public boolean isTimestamp() {
-        return name.equals("timestamp");
-    }
-
-    public boolean isDatetime() {
-        return name.equals("datetime");
-    }
-
-    public boolean isTinyInt() {
-        return name.equals("tinyint");
-    }
-
-    public boolean isSmallInt() {
-        return name.equals("smallint");
-    }
-
-    public boolean isInt() {
-        return name.equals("integer");
-    }
-
-    public boolean isBigInt() {
-        return name.equals("bigint");
-    }
-
-    public boolean isFloat() {
-        return name.equals("float");
-    }
-
-    public boolean isDouble() {
-        return name.equals("double");
-    }
-
-    public boolean isDecimal() {
-        return name.equals("decimal");
-    }
-
-    public boolean isHLLC() {
-        return name.equals("hllc");
-    }
-    
-    public boolean isTopN() {
-        return name.equals("topn");
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public int getPrecision() {
-        return precision;
-    }
-
-    public int getScale() {
-        return scale;
-    }
-
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + ((name == null) ? 0 : name.hashCode());
-        result = prime * result + precision;
-        result = prime * result + scale;
-        return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        DataType other = (DataType) obj;
-        if (name == null) {
-            if (other.name != null)
-                return false;
-        } else if (!name.equals(other.name))
-            return false;
-        if (precision != other.precision)
-            return false;
-        if (scale != other.scale)
-            return false;
-        return true;
-    }
-
-    @Override
-    public String toString() {
-        if (precision < 0 && scale < 0)
-            return name;
-        else if (scale < 0)
-            return name + "(" + precision + ")";
-        else
-            return name + "(" + precision + "," + scale + ")";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index b8cefa2..d22d0a3 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -21,6 +21,8 @@ package org.apache.kylin.metadata.model;
 import java.util.ArrayList;
 import java.util.Collection;
 
+import org.apache.kylin.common.datatype.DataType;
+
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -53,7 +55,7 @@ public class FunctionDesc {
 
     public void init(TableDesc factTable) {
         expression = expression.toUpperCase();
-        returnDataType = DataType.getInstance(returnType);
+        returnDataType = DataType.getType(returnType);
 
         for (ParameterDesc p = parameter; p != null; p = p.getNextParameter()) {
             p.setValue(p.getValue().toUpperCase());
@@ -69,6 +71,11 @@ public class FunctionDesc {
         }
 
         parameter.setColRefs(colRefs);
+        
+        // make sure sum/max/min returns the exact type as its input
+        if (isSum() || isMax() || isMin() && (colRefs.size() > 0)) {
+            setReturnType(colRefs.get(0).getDatatype());
+        }
     }
 
     public String getRewriteFieldName() {
@@ -183,7 +190,7 @@ public class FunctionDesc {
 
     public void setReturnType(String returnType) {
         this.returnType = returnType;
-        this.returnDataType = DataType.getInstance(returnType);
+        this.returnDataType = DataType.getType(returnType);
     }
 
     public TblColRef selectTblColRef(Collection<TblColRef> metricColumns, String factTableName) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
index ddb1e1a..de27145 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
@@ -18,10 +18,11 @@
 
 package org.apache.kylin.metadata.model;
 
-import org.apache.commons.lang.StringUtils;
-
 import java.io.Serializable;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.datatype.DataType;
+
 /**
  */
 @SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
index a4cdcbe..4a8c5d1 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
@@ -1,12 +1,12 @@
 package org.apache.kylin.metadata.realization;
 
+import org.apache.kylin.common.datatype.DataType;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.ConstantTupleFilter;
 import org.apache.kylin.metadata.filter.LogicalTupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
 
 import com.google.common.base.Function;
@@ -52,9 +52,9 @@ public class SQLDigestUtil {
     //ts column type differentiate
     private static String formatTimeStr(DataType type, long ts) {
         String ret;
-        if (type == DataType.getInstance("date")) {
+        if (type == DataType.getType("date")) {
             ret = DateFormat.formatToDateStr(ts);
-        } else if (type == DataType.getInstance("long")) {
+        } else if (type == DataType.getType("long")) {
             ret = String.valueOf(ts);
         } else {
             throw new IllegalArgumentException("Illegal type for partition column " + type);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java b/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
index da009df..4f011cf 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
@@ -24,11 +24,11 @@ import java.util.List;
 
 import net.sf.ehcache.pool.sizeof.annotations.IgnoreSizeOf;
 
+import org.apache.kylin.common.datatype.DoubleMutable;
+import org.apache.kylin.common.datatype.LongMutable;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.common.util.DoubleMutable;
-import org.apache.kylin.common.util.LongMutable;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.tuple.ITuple;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java
index 5b2d20e..55235be 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java
@@ -32,8 +32,8 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
 import org.apache.hadoop.mrunit.types.Pair;
 import org.apache.kylin.aggregation.MeasureCodec;
+import org.apache.kylin.common.datatype.LongMutable;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.common.util.LongMutable;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
index 2acc751..5145efc 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 import java.io.File;
 import java.io.IOException;
@@ -29,14 +29,19 @@ import java.util.List;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.apache.kylin.common.datatype.DataType;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.dict.*;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
+import org.apache.kylin.dict.TrieDictionary;
 import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.source.ReadableTable.TableSignature;
@@ -71,7 +76,7 @@ public class MergeCuboidMapperTest extends LocalFileMetadataTestCase {
         List<byte[]> values = new ArrayList<byte[]>();
         values.add(new byte[] { 101, 101, 101 });
         values.add(new byte[] { 102, 102, 102 });
-        Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(DataType.getInstance(newDictInfo.getDataType()), new IterableDictionaryValueEnumerator(values));
+        Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(DataType.getType(newDictInfo.getDataType()), new IterableDictionaryValueEnumerator(values));
         dictionaryManager.trySaveNewDict(dict, newDictInfo);
         ((TrieDictionary) dict).dump(System.out);
 
@@ -123,7 +128,7 @@ public class MergeCuboidMapperTest extends LocalFileMetadataTestCase {
                 values.add(new byte[] { 99, 99, 99 });
             else
                 values.add(new byte[] { 98, 98, 98 });
-            Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(DataType.getInstance(newDictInfo.getDataType()), new IterableDictionaryValueEnumerator(values));
+            Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(DataType.getType(newDictInfo.getDataType()), new IterableDictionaryValueEnumerator(values));
             dictionaryManager.trySaveNewDict(dict, newDictInfo);
             ((TrieDictionary) dict).dump(System.out);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
index 11b1897..aad1fa2 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
@@ -21,9 +21,9 @@ package org.apache.kylin.invertedindex.index;
 import java.util.Arrays;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.common.datatype.LongMutable;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.LongMutable;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.invertedindex.measure.FixedLenMeasureCodec;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
index 71d7bae..12fcf29 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecord.java
@@ -22,8 +22,8 @@ import java.util.Arrays;
 
 import org.apache.commons.lang.ObjectUtils;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.common.datatype.LongMutable;
 import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.common.util.LongMutable;
 import org.apache.kylin.common.util.ShardingHash;
 import org.apache.kylin.dict.Dictionary;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
index 27519bc..d9e5b26 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
@@ -20,12 +20,12 @@ package org.apache.kylin.invertedindex.index;
 
 import java.util.List;
 
+import org.apache.kylin.common.datatype.DataType;
 import org.apache.kylin.common.util.Array;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.measure.FixedLenMeasureCodec;
 import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
 
 /**
@@ -71,10 +71,10 @@ public class TableRecordInfo {
             isMetric[i] = desc.isMetricsCol(i);
             dataTypes[i] = tblColRef.getDatatype();
             if (isMetric[i]) {
-                lengths[i] = FixedLenMeasureCodec.get(DataType.getInstance(tblColRef.getColumnDesc().getDatatype())).getLength();
+                lengths[i] = FixedLenMeasureCodec.get(DataType.getType(tblColRef.getColumnDesc().getDatatype())).getLength();
             } else {
                 if (Array.isEmpty(dictionaryMap)) {
-                    final DataType dataType = DataType.getInstance(tblColRef.getColumnDesc().getDatatype());
+                    final DataType dataType = DataType.getType(tblColRef.getColumnDesc().getDatatype());
                     if (dataType.isNumberFamily()) {
                         lengths[i] = 16;
                     } else if (dataType.isStringFamily()) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java
index bd27e38..afb3633 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfoDigest.java
@@ -21,11 +21,11 @@ package org.apache.kylin.invertedindex.index;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 
+import org.apache.kylin.common.datatype.DataType;
+import org.apache.kylin.common.datatype.LongMutable;
 import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.LongMutable;
 import org.apache.kylin.invertedindex.measure.FixedLenMeasureCodec;
-import org.apache.kylin.metadata.model.DataType;
 
 import com.google.common.base.Objects;
 
@@ -55,7 +55,7 @@ public class TableRecordInfoDigest {
         this.measureCodecs = new FixedLenMeasureCodec[nColumns];
         for (int i = 0; i < isMetric.length; i++) {
             if (isMetric[i]) {
-                measureCodecs[i] = FixedLenMeasureCodec.get(DataType.getInstance(metricDataTypes[i]));
+                measureCodecs[i] = FixedLenMeasureCodec.get(DataType.getType(metricDataTypes[i]));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedHLLCodec.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedHLLCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedHLLCodec.java
index de35f91..42e5185 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedHLLCodec.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedHLLCodec.java
@@ -19,8 +19,8 @@ package org.apache.kylin.invertedindex.measure;
 
 import java.nio.ByteBuffer;
 
+import org.apache.kylin.common.datatype.DataType;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.model.DataType;
 
 /**
  */

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedLenMeasureCodec.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedLenMeasureCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedLenMeasureCodec.java
index 35872be..4fa8714 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedLenMeasureCodec.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedLenMeasureCodec.java
@@ -18,10 +18,10 @@
 
 package org.apache.kylin.invertedindex.measure;
 
-import org.apache.kylin.metadata.model.DataType;
-
 import java.nio.ByteBuffer;
 
+import org.apache.kylin.common.datatype.DataType;
+
 abstract public class FixedLenMeasureCodec<T> {
 
     public static FixedLenMeasureCodec<?> get(DataType type) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedPointLongCodec.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedPointLongCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedPointLongCodec.java
index a5d74f7..2b65192 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedPointLongCodec.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedPointLongCodec.java
@@ -20,9 +20,9 @@ package org.apache.kylin.invertedindex.measure;
 
 import java.nio.ByteBuffer;
 
+import org.apache.kylin.common.datatype.DataType;
+import org.apache.kylin.common.datatype.LongMutable;
 import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.LongMutable;
-import org.apache.kylin.metadata.model.DataType;
 
 public class FixedPointLongCodec extends FixedLenMeasureCodec<LongMutable> {
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
index 4dd1723..0f29d95 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Iterator;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.common.datatype.DataType;
 import org.apache.kylin.common.util.Array;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.BytesUtil;
@@ -33,7 +34,6 @@ import org.apache.kylin.invertedindex.index.CompressedValueContainer;
 import org.apache.kylin.invertedindex.index.Slice;
 import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
 import org.apache.kylin.invertedindex.measure.FixedLenMeasureCodec;
-import org.apache.kylin.metadata.model.DataType;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -111,11 +111,11 @@ public class IIKeyValueCodec implements KeyValueCodec {
         final boolean emptyDictionary = Array.isEmpty(dictionaries);
         for (int i = 0; i < nColumns; ++i) {
             if (isMetric[i]) {
-                final FixedLenMeasureCodec<?> fixedLenMeasureCodec = FixedLenMeasureCodec.get(DataType.getInstance(dataTypes[i]));
+                final FixedLenMeasureCodec<?> fixedLenMeasureCodec = FixedLenMeasureCodec.get(DataType.getType(dataTypes[i]));
                 lengths[i] = fixedLenMeasureCodec.getLength();
             } else {
                 if (emptyDictionary) {
-                    final DataType dataType = DataType.getInstance(dataTypes[i]);
+                    final DataType dataType = DataType.getType(dataTypes[i]);
                     if (dataType.isNumberFamily()) {
                         lengths[i] = 16;
                     } else if (dataType.isStringFamily()) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/invertedindex/src/test/java/org/apache/kylin/invertedindex/measure/FixedPointLongCodecTest.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/measure/FixedPointLongCodecTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/measure/FixedPointLongCodecTest.java
index 27a4f71..600d034 100644
--- a/invertedindex/src/test/java/org/apache/kylin/invertedindex/measure/FixedPointLongCodecTest.java
+++ b/invertedindex/src/test/java/org/apache/kylin/invertedindex/measure/FixedPointLongCodecTest.java
@@ -1,7 +1,6 @@
 package org.apache.kylin.invertedindex.measure;
 
-import org.apache.kylin.invertedindex.measure.FixedPointLongCodec;
-import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.common.datatype.DataType;
 import org.junit.Test;
 
 /**
@@ -10,35 +9,35 @@ public class FixedPointLongCodecTest {
 
     @Test
     public void testEncode1() {
-        FixedPointLongCodec codec = new FixedPointLongCodec(DataType.getInstance("decimal(18,5)"));
+        FixedPointLongCodec codec = new FixedPointLongCodec(DataType.getType("decimal(18,5)"));
         long x = codec.getValueIgnoringDecimalPoint("12.12345");
         org.junit.Assert.assertEquals(1212345, x);
     }
 
     @Test
     public void testEncode2() {
-        FixedPointLongCodec codec = new FixedPointLongCodec(DataType.getInstance("decimal(18,5)"));
+        FixedPointLongCodec codec = new FixedPointLongCodec(DataType.getType("decimal(18,5)"));
         long x = codec.getValueIgnoringDecimalPoint("12.1234");
         org.junit.Assert.assertEquals(1212340, x);
     }
 
     @Test
     public void testEncode3() {
-        FixedPointLongCodec codec = new FixedPointLongCodec(DataType.getInstance("decimal(18,5)"));
+        FixedPointLongCodec codec = new FixedPointLongCodec(DataType.getType("decimal(18,5)"));
         long x = codec.getValueIgnoringDecimalPoint("12.123456");
         org.junit.Assert.assertEquals(1212345, x);
     }
 
     @Test
     public void testEncode4() {
-        FixedPointLongCodec codec = new FixedPointLongCodec(DataType.getInstance("decimal(18,5)"));
+        FixedPointLongCodec codec = new FixedPointLongCodec(DataType.getType("decimal(18,5)"));
         long x = codec.getValueIgnoringDecimalPoint("12");
         org.junit.Assert.assertEquals(1200000, x);
     }
 
     @Test
     public void testDecode1() {
-        FixedPointLongCodec codec = new FixedPointLongCodec(DataType.getInstance("decimal(18,5)"));
+        FixedPointLongCodec codec = new FixedPointLongCodec(DataType.getType("decimal(18,5)"));
         String x = codec.restoreDecimalPoint(1212345);
         org.junit.Assert.assertEquals("12.12345", x);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
index 8813901..817a1e2 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
@@ -12,7 +12,6 @@ import org.apache.kylin.common.topn.Counter;
 import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.Array;
 import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -21,12 +20,9 @@ import org.apache.kylin.cube.kv.RowKeyDecoder;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.lookup.LookupStringTable;
-import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.ITuple;
-import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
 import org.apache.kylin.storage.tuple.Tuple;
 import org.apache.kylin.storage.tuple.TupleInfo;