You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/13 22:11:34 UTC

[08/49] carbondata git commit: [CARBONDATA-1594] Add precision and scale to DecimalType

[CARBONDATA-1594] Add precision and scale to DecimalType

Refactor on DecimalType to include precision and scale parameter.
Precision and scale parameter is required for Decimal data type. In earlier code, they are stored in following classes:

ColumnSpec
ColumnPageEncoderMeta
PrimitivePageStatsCollector
ColumnSchema
Since now we have changed DataType from enum to class, precision and scale should be stored in DecimalType object only. The PR does this change.

No new test case is added in this PR since no functionality change.

This closes #1417


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f209e8ee
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f209e8ee
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f209e8ee

Branch: refs/heads/fgdatamap
Commit: f209e8ee315a272f1f60a7a037d6c15fc08b6add
Parents: e945449
Author: Jacky Li <ja...@qq.com>
Authored: Wed Nov 1 00:18:47 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Nov 2 14:20:30 2017 +0530

----------------------------------------------------------------------
 .../cache/dictionary/ColumnDictionaryInfo.java  |   2 +-
 .../carbondata/core/datastore/TableSpec.java    |  75 ++++---
 .../core/datastore/block/SegmentProperties.java |   2 +-
 .../core/datastore/page/ColumnPage.java         |  23 +--
 .../core/datastore/page/LazyColumnPage.java     |   2 +-
 .../datastore/page/UnsafeDecimalColumnPage.java |  12 +-
 .../page/UnsafeFixLengthColumnPage.java         |   2 +-
 .../datastore/page/VarLengthColumnPageBase.java |  15 +-
 .../page/encoding/ColumnPageEncoder.java        |   4 +-
 .../page/encoding/ColumnPageEncoderMeta.java    |  46 +++--
 .../page/encoding/DefaultEncodingFactory.java   |   2 +-
 .../page/encoding/EncodingFactory.java          |   6 +-
 .../adaptive/AdaptiveDeltaIntegralCodec.java    |   4 +-
 .../adaptive/AdaptiveIntegralCodec.java         |   2 +-
 .../encoding/compress/DirectCompressCodec.java  |   2 +-
 .../page/statistics/KeyPageStatsCollector.java  |   7 -
 .../page/statistics/LVStringStatsCollector.java |   7 -
 .../statistics/PrimitivePageStatsCollector.java |  44 ++--
 .../page/statistics/SimpleStatsResult.java      |   3 -
 .../blockletindex/BlockletDataMap.java          |   4 +-
 .../ThriftWrapperSchemaConverterImpl.java       |  32 ++-
 .../core/metadata/datatype/DataType.java        |   5 +-
 .../core/metadata/datatype/DataTypes.java       |  23 ++-
 .../datatype/DecimalConverterFactory.java       |  39 +---
 .../core/metadata/datatype/DecimalType.java     |  28 ++-
 .../schema/table/column/ColumnSchema.java       |  41 ++--
 .../impl/AbstractScannedResultCollector.java    |   4 +-
 .../RestructureBasedVectorResultCollector.java  |   2 +-
 .../scan/executor/util/RestructureUtil.java     |   4 +-
 .../core/scan/expression/ExpressionResult.java  |   6 +-
 .../conditional/EqualToExpression.java          |   2 +-
 .../GreaterThanEqualToExpression.java           |   2 +-
 .../conditional/GreaterThanExpression.java      |   2 +-
 .../expression/conditional/InExpression.java    |   2 +-
 .../conditional/LessThanEqualToExpression.java  |   2 +-
 .../conditional/LessThanExpression.java         |   2 +-
 .../conditional/NotEqualsExpression.java        |   2 +-
 .../expression/conditional/NotInExpression.java |   2 +-
 .../carbondata/core/scan/filter/FilterUtil.java |   4 +-
 .../executer/ExcludeFilterExecuterImpl.java     |  22 +-
 .../executer/IncludeFilterExecuterImpl.java     |   4 +-
 .../executer/RowLevelFilterExecuterImpl.java    |   6 +-
 .../core/scan/partition/PartitionUtil.java      |   4 +-
 .../vector/MeasureDataVectorProcessor.java      |   2 +-
 .../util/AbstractDataFileFooterConverter.java   |  12 +-
 .../core/util/CarbonMetadataUtil.java           |   2 +-
 .../apache/carbondata/core/util/CarbonUtil.java |  29 ++-
 .../carbondata/core/util/DataTypeUtil.java      |  98 +--------
 .../core/util/comparator/Comparator.java        |   4 +-
 .../sortindex/CarbonDictionarySortModel.java    |   2 +-
 .../dictionary/ColumnDictionaryInfoTest.java    |   3 +-
 .../datastore/page/encoding/RLECodecSuite.java  |   6 +-
 .../ThriftWrapperSchemaConverterImplTest.java   |  20 +-
 .../scan/executor/util/RestructureUtilTest.java |   5 +
 .../scan/expression/ExpressionResultTest.java   |  13 +-
 .../conditional/EqualToExpressionUnitTest.java  |   3 +-
 .../GreaterThanEqualToExpressionUnitTest.java   |   5 +-
 .../GreaterThanExpressionUnitTest.java          |   5 +-
 .../conditional/InExpressionUnitTest.java       |   5 +-
 .../LessThanEqualToExpressionUnitTest.java      |   5 +-
 .../conditional/LessThanExpressionUnitTest.java |   5 +-
 .../NotEqualsExpressionUnitTest.java            |   5 +-
 .../conditional/NotInExpressionUnitTest.java    |   5 +-
 .../core/scan/filter/FilterUtilTest.java        |   3 +-
 .../core/util/CarbonMetadataUtilTest.java       |   2 +-
 .../carbondata/core/util/CarbonUtilTest.java    |   3 +-
 .../carbondata/core/util/DataTypeUtilTest.java  |  23 +--
 .../CarbonDictionarySortModelTest.java          |  33 +--
 .../hive/CarbonDictionaryDecodeReadSupport.java |   2 +-
 .../carbondata/presto/CarbonTypeUtil.java       |   2 +-
 .../presto/CarbonVectorizedRecordReader.java    |   2 +-
 .../presto/CarbondataColumnHandle.java          |   6 +-
 .../carbondata/presto/CarbondataMetadata.java   |   2 +-
 .../carbondata/presto/PrestoFilterUtil.java     |   5 +-
 .../presto/util/CarbonDataStoreCreator.scala    |   4 +-
 .../carbondata/spark/util/CarbonScalaUtil.scala |  43 ++--
 .../spark/util/DataTypeConverterUtil.scala      |  37 ++--
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   2 +-
 .../command/carbonTableSchemaCommon.scala       | 201 +++++++++----------
 .../VectorizedCarbonRecordReader.java           |   2 +-
 .../spark/sql/CarbonDataFrameWriter.scala       |   3 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     | 145 ++++++-------
 .../apache/spark/sql/hive/CarbonRelation.scala  |   7 +-
 .../partition/impl/HashPartitionerImpl.java     |   2 +-
 .../sort/unsafe/UnsafeCarbonRowPage.java        |   6 +-
 .../holder/UnsafeSortTempFileChunkHolder.java   |   2 +-
 .../merger/UnsafeIntermediateFileMerger.java    |   2 +-
 .../merger/CompactionResultSortProcessor.java   |   2 +-
 .../sort/sortdata/IntermediateFileMerger.java   |   2 +-
 .../processing/sort/sortdata/SortDataRows.java  |   2 +-
 .../sort/sortdata/SortTempFileChunkHolder.java  |   2 +-
 .../store/CarbonFactDataHandlerColumnar.java    |   2 +-
 .../carbondata/processing/store/TablePage.java  |   7 +-
 93 files changed, 623 insertions(+), 671 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnDictionaryInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnDictionaryInfo.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnDictionaryInfo.java
index 223812e..db49fa1 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnDictionaryInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnDictionaryInfo.java
@@ -291,7 +291,7 @@ public class ColumnDictionaryInfo extends AbstractColumnDictionaryInfo {
         dateToStr = parser.parse(memberVal);
         dictionaryDate = parser.parse(dictionaryVal);
         return dictionaryDate.compareTo(dateToStr);
-      } else if (dataType == DataTypes.DECIMAL) {
+      } else if (DataTypes.isDecimal(dataType)) {
         java.math.BigDecimal javaDecValForDictVal = new java.math.BigDecimal(dictionaryVal);
         java.math.BigDecimal javaDecValForMemberVal = new java.math.BigDecimal(memberVal);
         return javaDecValForDictVal.compareTo(javaDecValForMemberVal);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
index 9f29e27..eb36c8d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
@@ -24,6 +24,7 @@ import java.util.List;
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalType;
 import org.apache.carbondata.core.metadata.schema.table.Writable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
@@ -84,8 +85,7 @@ public class TableSpec {
   private void addMeasures(List<CarbonMeasure> measures) {
     for (int i = 0; i < measures.size(); i++) {
       CarbonMeasure measure = measures.get(i);
-      measureSpec[i] = new MeasureSpec(measure.getColName(), measure.getDataType(), measure
-          .getScale(), measure.getPrecision());
+      measureSpec[i] = new MeasureSpec(measure.getColName(), measure.getDataType());
     }
   }
 
@@ -122,27 +122,29 @@ public class TableSpec {
     // dimension type of this dimension
     private ColumnType columnType;
 
-    // scale and precision is for decimal column only
-    // TODO: make DataType a class instead of enum
-    private int scale;
-    private int precision;
-
     public ColumnSpec() {
     }
 
-    public ColumnSpec(String fieldName, DataType schemaDataType, ColumnType columnType) {
-      // for backward compatibility as the precision and scale is not stored, the values should be
-      // initialized with -1 for both precision and scale
-      this(fieldName, schemaDataType, columnType, -1, -1);
-    }
-
-    public ColumnSpec(String fieldName, DataType schemaDataType, ColumnType columnType,
-        int scale, int precision) {
+    private ColumnSpec(String fieldName, DataType schemaDataType, ColumnType columnType) {
       this.fieldName = fieldName;
       this.schemaDataType = schemaDataType;
       this.columnType = columnType;
-      this.scale = scale;
-      this.precision = precision;
+    }
+
+    public static ColumnSpec newInstance(String fieldName, DataType schemaDataType,
+        ColumnType columnType) {
+      return new ColumnSpec(fieldName, schemaDataType, columnType);
+    }
+
+    public static ColumnSpec newInstanceLegacy(String fieldName, DataType schemaDataType,
+        ColumnType columnType) {
+      // for backward compatibility as the precision and scale is not stored, the values should be
+      // initialized with -1 for both precision and scale
+      if (schemaDataType instanceof DecimalType) {
+        ((DecimalType) schemaDataType).setPrecision(-1);
+        ((DecimalType) schemaDataType).setScale(-1);
+      }
+      return new ColumnSpec(fieldName, schemaDataType, columnType);
     }
 
     public DataType getSchemaDataType() {
@@ -158,11 +160,21 @@ public class TableSpec {
     }
 
     public int getScale() {
-      return scale;
+      if (DataTypes.isDecimal(schemaDataType)) {
+        return ((DecimalType) schemaDataType).getScale();
+      } else if (schemaDataType == DataTypes.BYTE_ARRAY) {
+        return -1;
+      }
+      throw new UnsupportedOperationException();
     }
 
     public int getPrecision() {
-      return precision;
+      if (DataTypes.isDecimal(schemaDataType)) {
+        return ((DecimalType) schemaDataType).getPrecision();
+      } else if (schemaDataType == DataTypes.BYTE_ARRAY) {
+        return -1;
+      }
+      throw new UnsupportedOperationException();
     }
 
     @Override
@@ -170,8 +182,14 @@ public class TableSpec {
       out.writeUTF(fieldName);
       out.writeByte(schemaDataType.getId());
       out.writeByte(columnType.ordinal());
-      out.writeInt(scale);
-      out.writeInt(precision);
+      if (DataTypes.isDecimal(schemaDataType)) {
+        DecimalType decimalType = (DecimalType) schemaDataType;
+        out.writeInt(decimalType.getScale());
+        out.writeInt(decimalType.getPrecision());
+      } else {
+        out.writeInt(-1);
+        out.writeInt(-1);
+      }
     }
 
     @Override
@@ -179,8 +197,13 @@ public class TableSpec {
       this.fieldName = in.readUTF();
       this.schemaDataType = DataTypes.valueOf(in.readByte());
       this.columnType = ColumnType.valueOf(in.readByte());
-      this.scale = in.readInt();
-      this.precision = in.readInt();
+      int scale = in.readInt();
+      int precision = in.readInt();
+      if (DataTypes.isDecimal(this.schemaDataType)) {
+        DecimalType decimalType = (DecimalType) this.schemaDataType;
+        decimalType.setPrecision(precision);
+        decimalType.setScale(scale);
+      }
     }
   }
 
@@ -193,7 +216,7 @@ public class TableSpec {
     private boolean doInvertedIndex;
 
     DimensionSpec(ColumnType columnType, CarbonDimension dimension) {
-      super(dimension.getColName(), dimension.getDataType(), columnType, 0, 0);
+      super(dimension.getColName(), dimension.getDataType(), columnType);
       this.inSortColumns = dimension.isSortColumn();
       this.doInvertedIndex = dimension.isUseInvertedIndex();
     }
@@ -219,8 +242,8 @@ public class TableSpec {
 
   public class MeasureSpec extends ColumnSpec implements Writable {
 
-    MeasureSpec(String fieldName, DataType dataType, int scale, int precision) {
-      super(fieldName, dataType, ColumnType.MEASURE, scale, precision);
+    MeasureSpec(String fieldName, DataType dataType) {
+      super(fieldName, dataType, ColumnType.MEASURE);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
index 2ac200f..c93b771 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
@@ -669,7 +669,7 @@ public class SegmentProperties {
     int k = eachDimColumnValueSize.length + eachComplexDimColumnValueSize.length;
     for (int i = 0; i < measures.size(); i++) {
       DataType dataType = measures.get(i).getDataType();
-      if (dataType.equals(DataTypes.DECIMAL)) {
+      if (DataTypes.isDecimal(dataType)) {
         dimensionValueSize[k++] = -1;
       } else {
         dimensionValueSize[k++] = 8;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index 35bc560..58627d0 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -36,7 +36,6 @@ import org.apache.carbondata.core.util.CarbonProperties;
 
 import static org.apache.carbondata.core.metadata.datatype.DataTypes.BYTE;
 import static org.apache.carbondata.core.metadata.datatype.DataTypes.BYTE_ARRAY;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.DECIMAL;
 import static org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE;
 import static org.apache.carbondata.core.metadata.datatype.DataTypes.FLOAT;
 import static org.apache.carbondata.core.metadata.datatype.DataTypes.INT;
@@ -96,13 +95,6 @@ public abstract class ColumnPage {
       return BYTE_ARRAY;
     }
 
-    @Override public int getScale() {
-      return 0;
-    }
-
-    @Override public int getPrecision() {
-      return 0;
-    }
   };
 
   public SimpleStatsResult getStatistics() {
@@ -163,7 +155,7 @@ public abstract class ColumnPage {
 
   private static ColumnPage createPage(TableSpec.ColumnSpec columnSpec, DataType dataType,
       int pageSize) {
-    if (dataType.equals(DECIMAL)) {
+    if (DataTypes.isDecimal(dataType)) {
       return createDecimalPage(columnSpec, dataType, pageSize);
     } else if (dataType.equals(BYTE_ARRAY)) {
       return createVarLengthPage(columnSpec, dataType, pageSize);
@@ -195,7 +187,7 @@ public abstract class ColumnPage {
           dataType == DataTypes.FLOAT ||
           dataType == DataTypes.DOUBLE) {
         instance = new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize);
-      } else if (dataType == DataTypes.DECIMAL) {
+      } else if (DataTypes.isDecimal(dataType)) {
         instance = new UnsafeDecimalColumnPage(columnSpec, dataType, pageSize);
       } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY) {
         instance = new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize);
@@ -217,7 +209,7 @@ public abstract class ColumnPage {
         instance = newFloatPage(columnSpec, new float[pageSize]);
       } else if (dataType == DataTypes.DOUBLE) {
         instance = newDoublePage(columnSpec, new double[pageSize]);
-      } else if (dataType == DataTypes.DECIMAL) {
+      } else if (DataTypes.isDecimal(dataType)) {
         instance = newDecimalPage(columnSpec, new byte[pageSize][]);
       } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY) {
         instance = new SafeVarLengthColumnPage(columnSpec, dataType, pageSize);
@@ -277,7 +269,8 @@ public abstract class ColumnPage {
   }
 
   private static ColumnPage newDecimalPage(TableSpec.ColumnSpec columnSpec, byte[][] byteArray) {
-    ColumnPage columnPage = createPage(columnSpec, DECIMAL, byteArray.length);
+    ColumnPage columnPage =
+        createPage(columnSpec, columnSpec.getSchemaDataType(), byteArray.length);
     columnPage.setByteArrayPage(byteArray);
     return columnPage;
   }
@@ -365,7 +358,7 @@ public abstract class ColumnPage {
     } else if (dataType == DataTypes.DOUBLE) {
       putDouble(rowId, (double) value);
       statsCollector.update((double) value);
-    } else if (dataType == DataTypes.DECIMAL) {
+    } else if (DataTypes.isDecimal(dataType)) {
       putDecimal(rowId, (BigDecimal) value);
       statsCollector.update((BigDecimal) value);
     } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY) {
@@ -445,7 +438,7 @@ public abstract class ColumnPage {
       putLong(rowId, 0L);
     } else if (dataType == DataTypes.DOUBLE) {
       putDouble(rowId, 0.0);
-    } else if (dataType == DataTypes.DECIMAL) {
+    } else if (DataTypes.isDecimal(dataType)) {
       putDecimal(rowId, BigDecimal.ZERO);
     } else {
       throw new IllegalArgumentException("unsupported data type: " + dataType);
@@ -586,7 +579,7 @@ public abstract class ColumnPage {
       return compressor.compressFloat(getFloatPage());
     } else if (dataType == DataTypes.DOUBLE) {
       return compressor.compressDouble(getDoublePage());
-    } else if (dataType == DataTypes.DECIMAL) {
+    } else if (DataTypes.isDecimal(dataType)) {
       return compressor.compressByte(getDecimalPage());
     } else if (dataType == DataTypes.BYTE_ARRAY) {
       return compressor.compressByte(getLVFlattenedBytePage());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
index 11bdaca..f2cb860 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
@@ -108,7 +108,7 @@ public class LazyColumnPage extends ColumnPage {
       return decimalConverter.getDecimal(converter.decodeLong(columnPage.getShortInt(rowId)));
     } else if (dataType == DataTypes.INT) {
       return decimalConverter.getDecimal(converter.decodeLong(columnPage.getInt(rowId)));
-    } else if (dataType == DataTypes.LONG || dataType == DataTypes.DECIMAL) {
+    } else if (dataType == DataTypes.LONG || DataTypes.isDecimal(dataType)) {
       return columnPage.getDecimal(rowId);
     } else {
       throw new RuntimeException("internal error: " + this.toString());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
index b4f33b8..378b51f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
@@ -53,20 +53,18 @@ public class UnsafeDecimalColumnPage extends DecimalColumnPage {
         dataType == DataTypes.LONG) {
       int size = pageSize << dataType.getSizeBits();
       memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
-      baseAddress = memoryBlock.getBaseObject();
-      baseOffset = memoryBlock.getBaseOffset();
     } else if (dataType == DataTypes.SHORT_INT) {
       int size = pageSize * 3;
       memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
-      baseAddress = memoryBlock.getBaseObject();
-      baseOffset = memoryBlock.getBaseOffset();
-    } else if (dataType == DataTypes.DECIMAL) {
+    } else if (DataTypes.isDecimal(dataType)) {
+      memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) (capacity));
+    } else if (dataType == DataTypes.BYTE_ARRAY) {
       memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) (capacity));
-      baseAddress = memoryBlock.getBaseObject();
-      baseOffset = memoryBlock.getBaseOffset();
     } else {
       throw new UnsupportedOperationException("invalid data type: " + dataType);
     }
+    baseAddress = memoryBlock.getBaseObject();
+    baseOffset = memoryBlock.getBaseOffset();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
index 5695b70..c88dc0b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
@@ -72,7 +72,7 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
       memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
       baseAddress = memoryBlock.getBaseObject();
       baseOffset = memoryBlock.getBaseOffset();
-    } else if (dataType == DataTypes.DECIMAL || dataType == DataTypes.STRING) {
+    } else if (DataTypes.isDecimal(dataType) || dataType == DataTypes.STRING) {
       throw new UnsupportedOperationException("invalid data type: " + dataType);
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
index 60c7112..c6062c1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -32,12 +32,9 @@ import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.BYTE;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.DECIMAL;
-
 public abstract class VarLengthColumnPageBase extends ColumnPage {
 
-  static final int byteBits = BYTE.getSizeBits();
+  static final int byteBits = DataTypes.BYTE.getSizeBits();
   static final int shortBits = DataTypes.SHORT.getSizeBits();
   static final int intBits = DataTypes.INT.getSizeBits();
   static final int longBits = DataTypes.LONG.getSizeBits();
@@ -65,6 +62,7 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
 
   // size of the allocated memory, in bytes
   int capacity;
+
   VarLengthColumnPageBase(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
     super(columnSpec, dataType, pageSize);
     rowOffset = new int[pageSize + 1];
@@ -116,7 +114,8 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
             columnSpec.getScale());
     int size = decimalConverter.getSize();
     if (size < 0) {
-      return getLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.DECIMAL);
+      return getLVBytesColumnPage(columnSpec, lvEncodedBytes,
+          DataTypes.createDecimalType(columnSpec.getPrecision(), columnSpec.getScale()));
     } else {
       // Here the size is always fixed.
       return getDecimalColumnPage(columnSpec, lvEncodedBytes, size);
@@ -144,9 +143,9 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
 
     VarLengthColumnPageBase page;
     if (unsafe) {
-      page = new UnsafeDecimalColumnPage(columnSpec, DECIMAL, rowId);
+      page = new UnsafeDecimalColumnPage(columnSpec, columnSpec.getSchemaDataType(), rowId);
     } else {
-      page = new SafeDecimalColumnPage(columnSpec, DECIMAL, rowId);
+      page = new SafeDecimalColumnPage(columnSpec, columnSpec.getSchemaDataType(), rowId);
     }
 
     // set total length and rowOffset in page
@@ -187,7 +186,7 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
     VarLengthColumnPageBase page;
     int inputDataLength = offset;
     if (unsafe) {
-      page = new UnsafeDecimalColumnPage(columnSpec, DECIMAL, numRows, inputDataLength);
+      page = new UnsafeDecimalColumnPage(columnSpec, dataType, numRows, inputDataLength);
     } else {
       page = new SafeDecimalColumnPage(columnSpec, dataType, numRows);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
index 15e26e7..dfdca02 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
@@ -148,8 +148,8 @@ public abstract class ColumnPageEncoder {
 
   private static EncodedColumnPage encodeChildColumn(byte[][] data)
       throws IOException, MemoryException {
-    TableSpec.ColumnSpec spec =
-        new TableSpec.ColumnSpec("complex_inner_column", DataTypes.BYTE_ARRAY, ColumnType.COMPLEX);
+    TableSpec.ColumnSpec spec = TableSpec.ColumnSpec.newInstance("complex_inner_column",
+        DataTypes.BYTE_ARRAY, ColumnType.COMPLEX);
     ColumnPage page = ColumnPage.wrapByteArrayPage(spec, data);
     ColumnPageEncoder encoder = new DirectCompressCodec(DataTypes.BYTE_ARRAY).createEncoder(null);
     return encoder.encode(page);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
index a38da84..659feb0 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
@@ -28,6 +28,7 @@ import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalType;
 import org.apache.carbondata.core.metadata.schema.table.Writable;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
@@ -47,11 +48,6 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
   // compressor name for compressing and decompressing this column
   private String compressorName;
 
-  private int scale;
-  private int precision;
-
-
-
   public ColumnPageEncoderMeta() {
   }
 
@@ -74,8 +70,6 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
       setDecimal(stats.getDecimalCount());
       setMaxValue(stats.getMax());
       setMinValue(stats.getMin());
-      this.scale = stats.getScale();
-      this.precision = stats.getPrecision();
     }
   }
 
@@ -98,6 +92,12 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
     columnSpec = new TableSpec.ColumnSpec();
     columnSpec.readFields(in);
     storeDataType = DataTypes.valueOf(in.readByte());
+    if (DataTypes.isDecimal(storeDataType)) {
+      DecimalType decimalType = (DecimalType) storeDataType;
+      decimalType.setPrecision(columnSpec.getPrecision());
+      decimalType.setScale(columnSpec.getScale());
+    }
+
     setDecimal(in.readInt());
     setDataTypeSelected(in.readByte());
     readMinMax(in);
@@ -126,7 +126,7 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
       out.writeDouble((Double) getMaxValue());
       out.writeDouble((Double) getMinValue());
       out.writeDouble(0d); // unique value is obsoleted, maintain for compatibility
-    } else if (dataType == DataTypes.DECIMAL) {
+    } else if (DataTypes.isDecimal(dataType)) {
       byte[] maxAsBytes = getMaxAsBytes(columnSpec.getSchemaDataType());
       byte[] minAsBytes = getMinAsBytes(columnSpec.getSchemaDataType());
       byte[] unique = DataTypeUtil.bigDecimalToByte(BigDecimal.ZERO);
@@ -137,8 +137,14 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
       // unique value is obsoleted, maintain for compatibility
       out.writeShort((short) unique.length);
       out.write(unique);
-      out.writeInt(scale);
-      out.writeInt(precision);
+      if (DataTypes.isDecimal(dataType)) {
+        DecimalType decimalType = (DecimalType) dataType;
+        out.writeInt(decimalType.getScale());
+        out.writeInt(decimalType.getPrecision());
+      } else {
+        out.writeInt(-1);
+        out.writeInt(-1);
+      }
     } else if (dataType == DataTypes.BYTE_ARRAY) {
       // for complex type, it will come here, ignoring stats for complex type
       // TODO: support stats for complex type
@@ -169,7 +175,7 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
       this.setMaxValue(in.readDouble());
       this.setMinValue(in.readDouble());
       in.readDouble(); // for non exist value which is obsoleted, it is backward compatibility;
-    } else if (dataType == DataTypes.DECIMAL) {
+    } else if (DataTypes.isDecimal(dataType)) {
       byte[] max = new byte[in.readShort()];
       in.readFully(max);
       this.setMaxValue(DataTypeUtil.byteToBigDecimal(max));
@@ -179,8 +185,10 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
       // unique value is obsoleted, maintain for compatiability
       short uniqueLength = in.readShort();
       in.readFully(new byte[uniqueLength]);
-      this.scale = in.readInt();
-      this.precision = in.readInt();
+      // scale field is obsoleted. It is stored in the schema data type in columnSpec
+      in.readInt();
+      // precision field is obsoleted. It is stored in the schema data type in columnSpec
+      in.readInt();
     } else if (dataType == DataTypes.BYTE_ARRAY) {
       // for complex type, it will come here, ignoring stats for complex type
       // TODO: support stats for complex type
@@ -227,7 +235,7 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
       b.putDouble((double) value);
       b.flip();
       return b.array();
-    } else if (dataType == DataTypes.DECIMAL) {
+    } else if (DataTypes.isDecimal(dataType)) {
       return DataTypeUtil.bigDecimalToByte((BigDecimal) value);
     } else if (dataType == DataTypes.STRING || dataType == DataTypes.TIMESTAMP
         || dataType == DataTypes.DATE) {
@@ -238,11 +246,17 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
   }
 
   public int getScale() {
-    return scale;
+    if (DataTypes.isDecimal(columnSpec.getSchemaDataType())) {
+      return columnSpec.getScale();
+    }
+    throw new UnsupportedOperationException();
   }
 
   public int getPrecision() {
-    return precision;
+    if (DataTypes.isDecimal(columnSpec.getSchemaDataType())) {
+      return columnSpec.getPrecision();
+    }
+    throw new UnsupportedOperationException();
   }
 
   public TableSpec.ColumnSpec getColumnSpec() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
index 03af657..518573d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
@@ -121,7 +121,7 @@ public class DefaultEncodingFactory extends EncodingFactory {
         dataType == DataTypes.INT ||
         dataType == DataTypes.LONG) {
       return selectCodecByAlgorithmForIntegral(stats).createEncoder(null);
-    } else if (dataType == DataTypes.DECIMAL) {
+    } else if (DataTypes.isDecimal(dataType)) {
       return createEncoderForDecimalDataTypeMeasure(columnPage);
     } else if (dataType == DataTypes.FLOAT ||
         dataType == DataTypes.DOUBLE) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
index c298525..0f45abb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
@@ -111,8 +111,8 @@ public abstract class EncodingFactory {
    */
   public ColumnPageDecoder createDecoderLegacy(ValueEncoderMeta metadata) {
     SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
-    TableSpec.ColumnSpec spec = new TableSpec.ColumnSpec("legacy", stats.getDataType(),
-        ColumnType.MEASURE);
+    TableSpec.ColumnSpec spec =
+        TableSpec.ColumnSpec.newInstanceLegacy("legacy", stats.getDataType(), ColumnType.MEASURE);
     String compressor = "snappy";
     DataType dataType = DataType.getDataType(metadata.getType());
     if (dataType == DataTypes.BYTE ||
@@ -155,7 +155,7 @@ public abstract class EncodingFactory {
       } else {
         throw new RuntimeException("internal error");
       }
-    } else if (dataType == DataTypes.DECIMAL || dataType == DataTypes.BYTE_ARRAY) {
+    } else if (DataTypes.isDecimal(dataType) || dataType == DataTypes.BYTE_ARRAY) {
       // no dictionary dimension
       return new DirectCompressCodec(stats.getDataType())
           .createDecoder(new ColumnPageEncoderMeta(spec, stats.getDataType(), stats, compressor));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
index 96f7b16..a543f7e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
@@ -61,7 +61,7 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
       this.max = (long) stats.getMax();
     } else if (srcDataType == DataTypes.DOUBLE) {
       this.max = (long) (double) stats.getMax();
-    } else if (srcDataType == DataTypes.DECIMAL) {
+    } else if (DataTypes.isDecimal(srcDataType)) {
       this.max = ((BigDecimal) stats.getMax()).unscaledValue().longValue();
     } else {
       // this codec is for integer type only
@@ -114,7 +114,7 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
       @Override public ColumnPage decode(byte[] input, int offset, int length)
           throws MemoryException, IOException {
         ColumnPage page = null;
-        if (meta.getSchemaDataType() == DataTypes.DECIMAL) {
+        if (DataTypes.isDecimal(meta.getSchemaDataType())) {
           page = ColumnPage.decompressDecimalPage(meta, input, offset, length);
         } else {
           page = ColumnPage.decompress(meta, input, offset, length);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
index 907649d..b65296d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
@@ -94,7 +94,7 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
       public ColumnPage decode(byte[] input, int offset, int length)
           throws MemoryException, IOException {
         ColumnPage page = null;
-        if (meta.getSchemaDataType() == DataTypes.DECIMAL) {
+        if (DataTypes.isDecimal(meta.getSchemaDataType())) {
           page = ColumnPage.decompressDecimalPage(meta, input, offset, length);
         } else {
           page = ColumnPage.decompress(meta, input, offset, length);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
index b3d282e..cfdf114 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
@@ -103,7 +103,7 @@ public class DirectCompressCodec implements ColumnPageCodec {
     @Override
     public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException {
       ColumnPage decodedPage;
-      if (dataType == DataTypes.DECIMAL) {
+      if (DataTypes.isDecimal(dataType)) {
         decodedPage = ColumnPage.decompressDecimalPage(meta, input, offset, length);
       } else {
         decodedPage = ColumnPage.decompress(meta, input, offset, length);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/KeyPageStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/KeyPageStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/KeyPageStatsCollector.java
index be47966..e6cf29e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/KeyPageStatsCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/KeyPageStatsCollector.java
@@ -106,13 +106,6 @@ public class KeyPageStatsCollector implements ColumnPageStatsCollector {
         return dataType;
       }
 
-      @Override public int getScale() {
-        return 0;
-      }
-
-      @Override public int getPrecision() {
-        return 0;
-      }
     };
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java
index 20e10b8..61acec9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java
@@ -119,13 +119,6 @@ public class LVStringStatsCollector implements ColumnPageStatsCollector {
         return DataTypes.STRING;
       }
 
-      @Override public int getScale() {
-        return 0;
-      }
-
-      @Override public int getPrecision() {
-        return 0;
-      }
     };
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
index ed92622..76cb002 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
@@ -37,7 +37,6 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
   private long minLong, maxLong;
   private double minDouble, maxDouble;
   private BigDecimal minDecimal, maxDecimal;
-  private int scale, precision;
 
   // scale of the double value, apply adaptive encoding if this is positive
   private int decimal;
@@ -46,15 +45,14 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
   private BigDecimal zeroDecimal;
 
   // this is for encode flow
-  public static PrimitivePageStatsCollector newInstance(DataType dataType,
-      int scale, int precision) {
-    return new PrimitivePageStatsCollector(dataType, scale, precision);
+  public static PrimitivePageStatsCollector newInstance(DataType dataType) {
+    return new PrimitivePageStatsCollector(dataType);
   }
 
   // this is for decode flow, create stats from encoder meta in carbondata file
   public static PrimitivePageStatsCollector newInstance(ColumnPageEncoderMeta meta) {
-    PrimitivePageStatsCollector instance = new PrimitivePageStatsCollector(meta.getSchemaDataType(),
-        meta.getScale(), meta.getPrecision());
+    PrimitivePageStatsCollector instance =
+        new PrimitivePageStatsCollector(meta.getSchemaDataType());
     // set min max from meta
     DataType dataType = meta.getSchemaDataType();
     if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
@@ -73,12 +71,10 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
       instance.minDouble = (double) meta.getMinValue();
       instance.maxDouble = (double) meta.getMaxValue();
       instance.decimal = meta.getDecimal();
-    } else if (dataType == DataTypes.DECIMAL) {
+    } else if (DataTypes.isDecimal(dataType)) {
       instance.minDecimal = (BigDecimal) meta.getMinValue();
       instance.maxDecimal = (BigDecimal) meta.getMaxValue();
       instance.decimal = meta.getDecimal();
-      instance.scale = meta.getScale();
-      instance.precision = meta.getPrecision();
     } else {
       throw new UnsupportedOperationException(
           "unsupported data type for stats collection: " + meta.getSchemaDataType());
@@ -88,7 +84,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
 
   public static PrimitivePageStatsCollector newInstance(ValueEncoderMeta meta) {
     PrimitivePageStatsCollector instance =
-        new PrimitivePageStatsCollector(DataType.getDataType(meta.getType()), -1, -1);
+        new PrimitivePageStatsCollector(DataType.getDataType(meta.getType()));
     // set min max from meta
     DataType dataType = DataType.getDataType(meta.getType());
     if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
@@ -107,12 +103,10 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
       instance.minDouble = (double) meta.getMinValue();
       instance.maxDouble = (double) meta.getMaxValue();
       instance.decimal = meta.getDecimal();
-    } else if (dataType == DataTypes.DECIMAL) {
+    } else if (DataTypes.isDecimal(dataType)) {
       instance.minDecimal = (BigDecimal) meta.getMinValue();
       instance.maxDecimal = (BigDecimal) meta.getMaxValue();
       instance.decimal = meta.getDecimal();
-      instance.scale = -1;
-      instance.precision = -1;
     } else {
       throw new UnsupportedOperationException(
           "unsupported data type for Stats collection: " + meta.getType());
@@ -120,7 +114,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
     return instance;
   }
 
-  private PrimitivePageStatsCollector(DataType dataType, int scale, int precision) {
+  private PrimitivePageStatsCollector(DataType dataType) {
     this.dataType = dataType;
     if (dataType == DataTypes.BOOLEAN) {
       minByte = TRUE_VALUE;
@@ -141,11 +135,9 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
       minDouble = Double.POSITIVE_INFINITY;
       maxDouble = Double.NEGATIVE_INFINITY;
       decimal = 0;
-    } else if (dataType == DataTypes.DECIMAL) {
+    } else if (DataTypes.isDecimal(dataType)) {
       this.zeroDecimal = BigDecimal.ZERO;
-      decimal = scale;
-      this.scale = scale;
-      this.precision = precision;
+      decimal = 0;
     } else {
       throw new UnsupportedOperationException(
           "unsupported data type for Stats collection: " + dataType);
@@ -165,7 +157,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
       update(value);
     } else if (dataType == DataTypes.DOUBLE) {
       update(0d);
-    } else if (dataType == DataTypes.DECIMAL) {
+    } else if (DataTypes.isDecimal(dataType)) {
       if (isFirst) {
         maxDecimal = zeroDecimal;
         minDecimal = zeroDecimal;
@@ -306,7 +298,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
       return minLong;
     } else if (dataType == DataTypes.DOUBLE) {
       return minDouble;
-    } else if (dataType == DataTypes.DECIMAL) {
+    } else if (DataTypes.isDecimal(dataType)) {
       return minDecimal;
     }
     return null;
@@ -324,7 +316,7 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
       return maxLong;
     } else if (dataType == DataTypes.DOUBLE) {
       return maxDouble;
-    } else if (dataType == DataTypes.DECIMAL) {
+    } else if (DataTypes.isDecimal(dataType)) {
       return maxDecimal;
     }
     return null;
@@ -340,14 +332,4 @@ public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, Si
     return dataType;
   }
 
-  @Override
-  public int getScale() {
-    return scale;
-  }
-
-  @Override
-  public int getPrecision() {
-    return precision;
-  }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/SimpleStatsResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/SimpleStatsResult.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/SimpleStatsResult.java
index 65cd40f..0e1f650 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/SimpleStatsResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/SimpleStatsResult.java
@@ -29,7 +29,4 @@ public interface SimpleStatsResult {
 
   DataType getDataType();
 
-  int getScale();
-
-  int getPrecision();
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 3e083cc..d5bd695 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -206,7 +206,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
         } else if (dataType == DataTypes.LONG) {
           buffer.putLong(Long.MIN_VALUE);
           updatedValues[minValues.length + i] = buffer.array().clone();
-        } else if (dataType == DataTypes.DECIMAL) {
+        } else if (DataTypes.isDecimal(dataType)) {
           updatedValues[minValues.length + i] =
               DataTypeUtil.bigDecimalToByte(BigDecimal.valueOf(Long.MIN_VALUE));
         } else {
@@ -244,7 +244,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
         } else if (dataType == DataTypes.LONG) {
           buffer.putLong(Long.MAX_VALUE);
           updatedValues[maxValues.length + i] = buffer.array().clone();
-        } else if (dataType == DataTypes.DECIMAL) {
+        } else if (DataTypes.isDecimal(dataType)) {
           updatedValues[maxValues.length + i] =
               DataTypeUtil.bigDecimalToByte(BigDecimal.valueOf(Long.MAX_VALUE));
         } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index 7faa7e6..70a6e63 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -139,7 +139,7 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
       return org.apache.carbondata.format.DataType.LONG;
     } else if (dataType.getId() == DataTypes.DOUBLE.getId()) {
       return org.apache.carbondata.format.DataType.DOUBLE;
-    } else if (dataType.getId() == DataTypes.DECIMAL.getId()) {
+    } else if (DataTypes.isDecimal(dataType)) {
       return org.apache.carbondata.format.DataType.DECIMAL;
     } else if (dataType.getId() == DataTypes.DATE.getId()) {
       return org.apache.carbondata.format.DataType.DATE;
@@ -167,12 +167,21 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
     }
     org.apache.carbondata.format.ColumnSchema thriftColumnSchema =
         new org.apache.carbondata.format.ColumnSchema(
-            fromWrapperToExternalDataType(wrapperColumnSchema.getDataType()),
-            wrapperColumnSchema.getColumnName(), wrapperColumnSchema.getColumnUniqueId(),
-            wrapperColumnSchema.isColumnar(), encoders, wrapperColumnSchema.isDimensionColumn());
+            fromWrapperToExternalDataType(
+                wrapperColumnSchema.getDataType()),
+            wrapperColumnSchema.getColumnName(),
+            wrapperColumnSchema.getColumnUniqueId(),
+            wrapperColumnSchema.isColumnar(),
+            encoders,
+            wrapperColumnSchema.isDimensionColumn());
     thriftColumnSchema.setColumn_group_id(wrapperColumnSchema.getColumnGroupId());
-    thriftColumnSchema.setScale(wrapperColumnSchema.getScale());
-    thriftColumnSchema.setPrecision(wrapperColumnSchema.getPrecision());
+    if (DataTypes.isDecimal(wrapperColumnSchema.getDataType())) {
+      thriftColumnSchema.setScale(wrapperColumnSchema.getScale());
+      thriftColumnSchema.setPrecision(wrapperColumnSchema.getPrecision());
+    } else {
+      thriftColumnSchema.setScale(-1);
+      thriftColumnSchema.setPrecision(-1);
+    }
     thriftColumnSchema.setNum_child(wrapperColumnSchema.getNumberOfChild());
     thriftColumnSchema.setDefault_value(wrapperColumnSchema.getDefaultValue());
     thriftColumnSchema.setColumnProperties(wrapperColumnSchema.getColumnProperties());
@@ -358,7 +367,8 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
    * @param dataType
    * @return
    */
-  private DataType fromExternalToWrapperDataType(org.apache.carbondata.format.DataType dataType) {
+  private DataType fromExternalToWrapperDataType(org.apache.carbondata.format.DataType dataType,
+      int precision, int scale) {
     if (null == dataType) {
       return null;
     }
@@ -376,7 +386,7 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
       case DOUBLE:
         return DataTypes.DOUBLE;
       case DECIMAL:
-        return DataTypes.DECIMAL;
+        return DataTypes.createDecimalType(precision, scale);
       case TIMESTAMP:
         return DataTypes.TIMESTAMP;
       case DATE:
@@ -399,7 +409,11 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
     wrapperColumnSchema.setColumnUniqueId(externalColumnSchema.getColumn_id());
     wrapperColumnSchema.setColumnName(externalColumnSchema.getColumn_name());
     wrapperColumnSchema.setColumnar(externalColumnSchema.isColumnar());
-    wrapperColumnSchema.setDataType(fromExternalToWrapperDataType(externalColumnSchema.data_type));
+    wrapperColumnSchema.setDataType(
+        fromExternalToWrapperDataType(
+            externalColumnSchema.data_type,
+            externalColumnSchema.precision,
+            externalColumnSchema.scale));
     wrapperColumnSchema.setDimensionColumn(externalColumnSchema.isDimension());
     List<Encoding> encoders = new ArrayList<Encoding>();
     for (org.apache.carbondata.format.Encoding encoder : externalColumnSchema.getEncoders()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
index b3dd1bc..8e08436 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
@@ -85,7 +85,7 @@ public class DataType implements Serializable {
       return BIG_INT_MEASURE_CHAR;
     } else if (dataType == DataTypes.DOUBLE) {
       return DOUBLE_MEASURE_CHAR;
-    } else if (dataType == DataTypes.DECIMAL) {
+    } else if (DataTypes.isDecimal(dataType)) {
       return BIG_DECIMAL_MEASURE_CHAR;
     } else if (dataType == DataTypes.STRING) {
       return STRING_CHAR;
@@ -107,11 +107,12 @@ public class DataType implements Serializable {
       case DOUBLE_MEASURE_CHAR:
         return DataTypes.DOUBLE;
       case BIG_DECIMAL_MEASURE_CHAR:
-        return DataTypes.DECIMAL;
+        return DataTypes.createDefaultDecimalType();
       case 'l':
         return DataTypes.LEGACY_LONG;
       default:
         throw new RuntimeException("Unexpected type: " + type);
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java
index 178f06a..43dad72 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java
@@ -44,7 +44,6 @@ public class DataTypes {
   // Only for internal use for backward compatability. It is only used for V1 version
   public static final DataType LEGACY_LONG = LegacyLongType.LEGACY_LONG;
 
-  public static final DataType DECIMAL = DecimalType.DECIMAL;
   public static final DataType ARRAY = ArrayType.ARRAY;
   public static final DataType STRUCT = StructType.STRUCT;
   public static final DataType MAP = MapType.MAP;
@@ -99,8 +98,8 @@ public class DataTypes {
       return DOUBLE;
     } else if (id == NULL.getId()) {
       return NULL;
-    } else if (id == DECIMAL.getId()) {
-      return DECIMAL;
+    } else if (id == DECIMAL_TYPE_ID) {
+      return createDefaultDecimalType();
     } else if (id == ARRAY.getId()) {
       return ARRAY;
     } else if (id == STRUCT.getId()) {
@@ -114,4 +113,22 @@ public class DataTypes {
     }
   }
 
+  /**
+   * create a decimal type object with specified precision and scale
+   */
+  public static DecimalType createDecimalType(int precision, int scale) {
+    return new DecimalType(precision, scale);
+  }
+
+  /**
+   * create a decimal type object with default precision = 10 and scale = 2
+   */
+  public static DecimalType createDefaultDecimalType() {
+    return new DecimalType(10, 2);
+  }
+
+  public static boolean isDecimal(DataType dataType) {
+    return dataType.getId() == DECIMAL_TYPE_ID;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
index 9dbc9b4..e4059c8 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
@@ -20,7 +20,6 @@ import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.Arrays;
 
-import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
 /**
@@ -73,19 +72,17 @@ public final class DecimalConverterFactory {
 
     BigDecimal getDecimal(Object valueToBeConverted);
 
-    void writeToColumnVector(byte[] bytes, CarbonColumnVector vector, int rowId);
-
     int getSize();
 
     DecimalConverterType getDecimalConverterType();
 
   }
 
-  public class DecimalIntConverter implements DecimalConverter {
+  public static class DecimalIntConverter implements DecimalConverter {
 
     private int scale;
 
-    public DecimalIntConverter(int precision, int scale) {
+    DecimalIntConverter(int scale) {
       this.scale = scale;
     }
 
@@ -98,11 +95,6 @@ public final class DecimalConverterFactory {
       return BigDecimal.valueOf((Long) valueToBeConverted, scale);
     }
 
-    @Override public void writeToColumnVector(byte[] bytes, CarbonColumnVector vector, int rowId) {
-      long unscaled = getUnscaledLong(bytes);
-      vector.putInt(rowId, (int) unscaled);
-    }
-
     @Override public int getSize() {
       return 4;
     }
@@ -126,28 +118,22 @@ public final class DecimalConverterFactory {
     return unscaled;
   }
 
-  public class DecimalLongConverter implements DecimalConverter {
+  public static class DecimalLongConverter implements DecimalConverter {
 
     private int scale;
 
-    public DecimalLongConverter(int precision, int scale) {
+    DecimalLongConverter(int scale) {
       this.scale = scale;
     }
 
     @Override public Object convert(BigDecimal decimal) {
-      long longValue = decimal.unscaledValue().longValue();
-      return longValue;
+      return decimal.unscaledValue().longValue();
     }
 
     @Override public BigDecimal getDecimal(Object valueToBeConverted) {
       return BigDecimal.valueOf((Long) valueToBeConverted, scale);
     }
 
-    @Override public void writeToColumnVector(byte[] bytes, CarbonColumnVector vector, int rowId) {
-      long unscaled = getUnscaledLong(bytes);
-      vector.putLong(rowId, unscaled);
-    }
-
     @Override public int getSize() {
       return 8;
     }
@@ -159,14 +145,13 @@ public final class DecimalConverterFactory {
 
   public class DecimalUnscaledConverter implements DecimalConverter {
 
-
     private int scale;
 
     private int numBytes;
 
     private byte[] decimalBuffer = new byte[minBytesForPrecision[38]];
 
-    public DecimalUnscaledConverter(int precision, int scale) {
+    DecimalUnscaledConverter(int precision, int scale) {
       this.scale = scale;
       this.numBytes = minBytesForPrecision[precision];
     }
@@ -202,10 +187,6 @@ public final class DecimalConverterFactory {
       return new BigDecimal(bigInteger, scale);
     }
 
-    @Override public void writeToColumnVector(byte[] bytes, CarbonColumnVector vector, int rowId) {
-      vector.putBytes(rowId, bytes);
-    }
-
     @Override public int getSize() {
       return numBytes;
     }
@@ -227,10 +208,6 @@ public final class DecimalConverterFactory {
       return DataTypeUtil.byteToBigDecimal((byte[]) valueToBeConverted);
     }
 
-    @Override public void writeToColumnVector(byte[] bytes, CarbonColumnVector vector, int rowId) {
-      throw new UnsupportedOperationException("Unsupported in vector reading for legacy format");
-    }
-
     @Override public int getSize() {
       return -1;
     }
@@ -244,9 +221,9 @@ public final class DecimalConverterFactory {
     if (precision < 0) {
       return new LVBytesDecimalConverter();
     } else if (precision <= 9) {
-      return new DecimalIntConverter(precision, scale);
+      return new DecimalIntConverter(scale);
     } else if (precision <= 18) {
-      return new DecimalLongConverter(precision, scale);
+      return new DecimalLongConverter(scale);
     } else {
       return new DecimalUnscaledConverter(precision, scale);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalType.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalType.java
index 0c78e50..b2acd21 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalType.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalType.java
@@ -19,15 +19,29 @@ package org.apache.carbondata.core.metadata.datatype;
 
 public class DecimalType extends DataType {
 
-  public static final DataType DECIMAL =
-      new DecimalType(DataTypes.DECIMAL_TYPE_ID, 8, "DECIMAL", -1);
+  private int precision;
+  private int scale;
 
-  private DecimalType(int id, int precedenceOrder, String name, int sizeInBytes) {
-    super(id, precedenceOrder, name, sizeInBytes);
+  // create a decimal type object with specified precision and scale
+  DecimalType(int precision, int scale) {
+    super(DataTypes.DECIMAL_TYPE_ID, 8, "DECIMAL", -1);
+    this.precision = precision;
+    this.scale = scale;
   }
 
-  // this function is needed to ensure singleton pattern while supporting java serialization
-  private Object readResolve() {
-    return DataTypes.DECIMAL;
+  public int getPrecision() {
+    return precision;
+  }
+
+  public void setPrecision(int precision) {
+    this.precision = precision;
+  }
+
+  public int getScale() {
+    return scale;
+  }
+
+  public void setScale(int scale) {
+    this.scale = scale;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
index bd246a4..cad3dd6 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
@@ -27,6 +27,7 @@ import java.util.Map;
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalType;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.Writable;
 import org.apache.carbondata.core.metadata.schema.table.WritableUtil;
@@ -212,6 +213,16 @@ public class ColumnSchema implements Serializable, Writable {
   }
 
   /**
+   * Set the scale if it is decimal type
+   */
+  public void setScale(int scale) {
+    this.scale = scale;
+    if (DataTypes.isDecimal(dataType)) {
+      ((DecimalType) dataType).setScale(scale);
+    }
+  }
+
+  /**
    * @return the scale
    */
   public int getScale() {
@@ -219,10 +230,13 @@ public class ColumnSchema implements Serializable, Writable {
   }
 
   /**
-   * @param scale the scale to set
+   * Set the precision if it is decimal type
    */
-  public void setScale(int scale) {
-    this.scale = scale;
+  public void setPrecision(int precision) {
+    this.precision = precision;
+    if (DataTypes.isDecimal(dataType)) {
+      ((DecimalType) dataType).setPrecision(precision);
+    }
   }
 
   /**
@@ -233,13 +247,6 @@ public class ColumnSchema implements Serializable, Writable {
   }
 
   /**
-   * @param precision the precision to set
-   */
-  public void setPrecision(int precision) {
-    this.precision = precision;
-  }
-
-  /**
    * @return the getNumberOfChild
    */
   public int getNumberOfChild() {
@@ -423,8 +430,13 @@ public class ColumnSchema implements Serializable, Writable {
       }
     }
     out.writeBoolean(isDimensionColumn);
-    out.writeInt(scale);
-    out.writeInt(precision);
+    if (DataTypes.isDecimal(dataType)) {
+      out.writeInt(((DecimalType) dataType).getScale());
+      out.writeInt(((DecimalType) dataType).getPrecision());
+    } else {
+      out.writeInt(-1);
+      out.writeInt(-1);
+    }
     out.writeInt(schemaOrdinal);
     out.writeInt(numberOfChild);
     WritableUtil.writeByteArray(out, defaultValue);
@@ -457,6 +469,11 @@ public class ColumnSchema implements Serializable, Writable {
     this.isDimensionColumn = in.readBoolean();
     this.scale = in.readInt();
     this.precision = in.readInt();
+    if (DataTypes.isDecimal(dataType)) {
+      DecimalType decimalType = (DecimalType) dataType;
+      decimalType.setPrecision(precision);
+      decimalType.setScale(scale);
+    }
     this.schemaOrdinal = in.readInt();
     this.numberOfChild = in.readInt();
     this.defaultValue = WritableUtil.readByteArray(in);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
index 7e24413..dc78ac6 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
@@ -78,7 +78,7 @@ public abstract class AbstractScannedResultCollector implements ScannedResultCol
       } else {
         // if not then get the default value and use that value in aggregation
         Object defaultValue = measureInfo.getDefaultValues()[i];
-        if (null != defaultValue && measureInfo.getMeasureDataTypes()[i] == DataTypes.DECIMAL) {
+        if (null != defaultValue && DataTypes.isDecimal(measureInfo.getMeasureDataTypes()[i])) {
           // convert data type as per the computing engine
           defaultValue = DataTypeUtil.getDataTypeConverter().convertToDecimal(defaultValue);
         }
@@ -99,7 +99,7 @@ public abstract class AbstractScannedResultCollector implements ScannedResultCol
         return (int) dataChunk.getLong(index);
       } else if (dataType == DataTypes.LONG) {
         return dataChunk.getLong(index);
-      } else if (dataType == DataTypes.DECIMAL) {
+      } else if (DataTypes.isDecimal(dataType)) {
         BigDecimal bigDecimalMsrValue = dataChunk.getDecimal(index);
         if (null != bigDecimalMsrValue && carbonMeasure.getScale() > bigDecimalMsrValue.scale()) {
           bigDecimalMsrValue =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
index cbde2e1..d9b7b23 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedVectorResultCollector.java
@@ -236,7 +236,7 @@ public class RestructureBasedVectorResultCollector extends DictionaryBasedVector
           } else if (dataType == DataTypes.LONG) {
             vector.putLongs(columnVectorInfo.vectorOffset, columnVectorInfo.size,
                 (long) defaultValue);
-          } else if (dataType == DataTypes.DECIMAL) {
+          } else if (DataTypes.isDecimal(dataType)) {
             vector.putDecimals(columnVectorInfo.vectorOffset, columnVectorInfo.size,
                 (Decimal) defaultValue, measure.getPrecision());
           } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
index ddc75ff..b3a77b8 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
@@ -292,7 +292,7 @@ public class RestructureUtil {
       if (dataType == DataTypes.SHORT || dataType == DataTypes.INT || dataType == DataTypes.LONG) {
         value = new String(defaultValue, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
         measureDefaultValue = Long.parseLong(value);
-      } else if (dataType == DataTypes.DECIMAL) {
+      } else if (DataTypes.isDecimal(dataType)) {
         BigDecimal decimal = DataTypeUtil.byteToBigDecimal(defaultValue);
         if (columnSchema.getScale() > decimal.scale()) {
           decimal = decimal.setScale(columnSchema.getScale(), RoundingMode.HALF_UP);
@@ -331,7 +331,7 @@ public class RestructureUtil {
       } else if (dataType == DataTypes.LONG) {
         value = new String(defaultValue, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
         measureDefaultValue = Long.parseLong(value);
-      } else if (dataType == DataTypes.DECIMAL) {
+      } else if (DataTypes.isDecimal(dataType)) {
         BigDecimal decimal = DataTypeUtil.byteToBigDecimal(defaultValue);
         if (columnSchema.getScale() > decimal.scale()) {
           decimal = decimal.setScale(columnSchema.getScale(), RoundingMode.HALF_UP);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/scan/expression/ExpressionResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/ExpressionResult.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/ExpressionResult.java
index e3892be..262904f 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/ExpressionResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/ExpressionResult.java
@@ -308,7 +308,7 @@ public class ExpressionResult implements Comparable<ExpressionResult> {
         return new BigDecimal((int) value);
       } else if (dataType == DataTypes.LONG) {
         return new BigDecimal((long) value);
-      } else if (dataType == DataTypes.DOUBLE || dataType == DataTypes.DECIMAL) {
+      } else if (dataType == DataTypes.DOUBLE || DataTypes.isDecimal(dataType)) {
         return new BigDecimal(value.toString());
       } else if (dataType == DataTypes.DATE) {
         if (value instanceof java.sql.Date) {
@@ -494,7 +494,7 @@ public class ExpressionResult implements Comparable<ExpressionResult> {
         result = this.getLong().equals(objToCompare.getLong());
       } else if (dataType == DataTypes.DOUBLE) {
         result = this.getDouble().equals(objToCompare.getDouble());
-      } else if (dataType == DataTypes.DECIMAL) {
+      } else if (DataTypes.isDecimal(dataType)) {
         result = this.getDecimal().equals(objToCompare.getDecimal());
       }
     } catch (FilterIllegalMemberException ex) {
@@ -518,7 +518,7 @@ public class ExpressionResult implements Comparable<ExpressionResult> {
         Double d1 = this.getDouble();
         Double d2 = o.getDouble();
         return d1.compareTo(d2);
-      } else if (type == DataTypes.DECIMAL) {
+      } else if (DataTypes.isDecimal(type)) {
         java.math.BigDecimal val1 = this.getDecimal();
         java.math.BigDecimal val2 = o.getDecimal();
         return val1.compareTo(val2);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpression.java
index f143189..faf5bb1 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/EqualToExpression.java
@@ -81,7 +81,7 @@ public class EqualToExpression extends BinaryConditionalExpression {
       result = val1.getTime().equals(val2.getTime());
     } else if (dataType == DataTypes.LONG) {
       result = val1.getLong().equals(val2.getLong());
-    } else if (dataType == DataTypes.DECIMAL) {
+    } else if (DataTypes.isDecimal(dataType)) {
       result = val1.getDecimal().compareTo(val2.getDecimal()) == 0;
     } else {
       throw new FilterUnsupportedException(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpression.java
index 1472959..24575d2 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanEqualToExpression.java
@@ -64,7 +64,7 @@ public class GreaterThanEqualToExpression extends BinaryConditionalExpression {
       result = elRes.getTime() >= (erRes.getTime());
     } else if (dataType == DataTypes.LONG) {
       result = elRes.getLong() >= (erRes.getLong());
-    } else if (dataType == DataTypes.DECIMAL) {
+    } else if (DataTypes.isDecimal(dataType)) {
       result = elRes.getDecimal().compareTo(erRes.getDecimal()) >= 0;
     } else  {
       throw new FilterUnsupportedException(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpression.java
index b8a8a7c..ddc3d30 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/GreaterThanExpression.java
@@ -66,7 +66,7 @@ public class GreaterThanExpression extends BinaryConditionalExpression {
       result = exprLeftRes.getTime() > (exprRightRes.getTime());
     } else if (dataType == DataTypes.LONG) {
       result = exprLeftRes.getLong() > (exprRightRes.getLong());
-    } else if (dataType == DataTypes.DECIMAL) {
+    } else if (DataTypes.isDecimal(dataType)) {
       result = exprLeftRes.getDecimal().compareTo(exprRightRes.getDecimal()) > 0;
     } else {
       throw new FilterUnsupportedException(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/InExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/InExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/InExpression.java
index 7243741..a560cc3 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/InExpression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/InExpression.java
@@ -68,7 +68,7 @@ public class InExpression extends BinaryConditionalExpression {
           val = new ExpressionResult(val.getDataType(), expressionResVal.getLong());
         } else if (dataType == DataTypes.DATE || dataType == DataTypes.TIMESTAMP) {
           val = new ExpressionResult(val.getDataType(), expressionResVal.getTime());
-        } else if (dataType == DataTypes.DECIMAL) {
+        } else if (DataTypes.isDecimal(dataType)) {
           val = new ExpressionResult(val.getDataType(), expressionResVal.getDecimal());
         } else {
           throw new FilterUnsupportedException(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpression.java
index 6a9fc3c..df7d791 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanEqualToExpression.java
@@ -64,7 +64,7 @@ public class LessThanEqualToExpression extends BinaryConditionalExpression {
       result = elRes.getTime() <= (erRes.getTime());
     } else if (dataType == DataTypes.LONG) {
       result = elRes.getLong() <= (erRes.getLong());
-    } else if (dataType == DataTypes.DECIMAL) {
+    } else if (DataTypes.isDecimal(dataType)) {
       result = elRes.getDecimal().compareTo(erRes.getDecimal()) <= 0;
     } else {
       throw new FilterUnsupportedException("DataType: " + exprResValue1.getDataType()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpression.java
index 4283d83..f4b7f7c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/LessThanExpression.java
@@ -68,7 +68,7 @@ public class LessThanExpression extends BinaryConditionalExpression {
       result = elRes.getTime() < (erRes.getTime());
     } else if (dataType == DataTypes.LONG) {
       result = elRes.getLong() < (erRes.getLong());
-    } else if (dataType == DataTypes.DECIMAL) {
+    } else if (DataTypes.isDecimal(dataType)) {
       result = elRes.getDecimal().compareTo(erRes.getDecimal()) < 0;
     } else {
       throw new FilterUnsupportedException("DataType: " + val1.getDataType() +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
index 5055caf..5046722 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotEqualsExpression.java
@@ -77,7 +77,7 @@ public class NotEqualsExpression extends BinaryConditionalExpression {
       result = val1.getTime().longValue() != val2.getTime().longValue();
     } else if (dataType == DataTypes.LONG) {
       result = elRes.getLong().longValue() != (erRes.getLong()).longValue();
-    } else if (dataType == DataTypes.DECIMAL) {
+    } else if (DataTypes.isDecimal(dataType)) {
       result = elRes.getDecimal().compareTo(erRes.getDecimal()) != 0;
     } else {
       throw new FilterUnsupportedException(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpression.java
index 89a0374..c4a2fc8 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpression.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/NotInExpression.java
@@ -89,7 +89,7 @@ public class NotInExpression extends BinaryConditionalExpression {
           val = new ExpressionResult(val.getDataType(), exprResVal.getTime());
         } else if (dataType == DataTypes.LONG) {
           val = new ExpressionResult(val.getDataType(), exprResVal.getLong());
-        } else if (dataType == DataTypes.DECIMAL) {
+        } else if (DataTypes.isDecimal(dataType)) {
           val = new ExpressionResult(val.getDataType(), exprResVal.getDecimal());
         } else {
           throw new FilterUnsupportedException(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index ecac617..6943b8b 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -1318,7 +1318,7 @@ public final class FilterUtil {
         dateToStr = parser.parse(memberVal);
         dictionaryDate = parser.parse(dictionaryVal);
         return dictionaryDate.compareTo(dateToStr);
-      } else if (dataType == DataTypes.DECIMAL) {
+      } else if (DataTypes.isDecimal(dataType)) {
         java.math.BigDecimal javaDecValForDictVal = new java.math.BigDecimal(dictionaryVal);
         java.math.BigDecimal javaDecValForMemberVal = new java.math.BigDecimal(memberVal);
         return javaDecValForDictVal.compareTo(javaDecValForMemberVal);
@@ -1430,7 +1430,7 @@ public final class FilterUtil {
         Double d1 = Double.parseDouble(filterMember1);
         Double d2 = Double.parseDouble(filterMember2);
         return d1.compareTo(d2);
-      } else if (dataType == DataTypes.DECIMAL) {
+      } else if (DataTypes.isDecimal(dataType)) {
         if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(filterMember1)) {
           return 1;
         }