You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/30 09:22:03 UTC

[13/35] carbondata git commit: [CARBONDATA-1539] Change data type from enum to class

[CARBONDATA-1539] Change data type from enum to class

DataType should be java class instead of enum, to hold more information for decimal and complex type. And it is needed to decouple carbon core and spark.

No logic is changed in this PR.

This closes #1402


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/956833e5
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/956833e5
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/956833e5

Branch: refs/heads/streaming_ingest
Commit: 956833e5525742616d8a0e6b885132acd7291b76
Parents: 75e0bd4
Author: Jacky Li <ja...@qq.com>
Authored: Sun Oct 8 22:06:01 2017 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Fri Oct 13 14:56:00 2017 +0530

----------------------------------------------------------------------
 .../cache/dictionary/ColumnDictionaryInfo.java  |  60 +-
 .../core/constants/CarbonCommonConstants.java   |   4 +-
 .../core/datamap/dev/DataMapFactory.java        |   1 -
 .../carbondata/core/datastore/TableSpec.java    |   7 +-
 .../core/datastore/block/SegmentProperties.java |   5 +-
 .../impl/FixedLengthDimensionDataChunk.java     |  40 +-
 .../core/datastore/impl/FileFactory.java        |   6 +-
 .../core/datastore/page/ColumnPage.java         | 349 +++++-----
 .../core/datastore/page/LazyColumnPage.java     |  89 +--
 .../datastore/page/SafeDecimalColumnPage.java   |  33 +-
 .../datastore/page/SafeFixLengthColumnPage.java |  62 +-
 .../datastore/page/UnsafeDecimalColumnPage.java |  81 ++-
 .../page/UnsafeFixLengthColumnPage.java         | 123 ++--
 .../datastore/page/VarLengthColumnPageBase.java |  15 +-
 .../page/encoding/ColumnPageEncoder.java        |   6 +-
 .../page/encoding/ColumnPageEncoderMeta.java    | 243 ++++---
 .../page/encoding/DefaultEncodingFactory.java   | 122 ++--
 .../page/encoding/EncodingFactory.java          | 109 ++--
 .../adaptive/AdaptiveDeltaIntegralCodec.java    | 192 +++---
 .../adaptive/AdaptiveFloatingCodec.java         |  66 +-
 .../adaptive/AdaptiveIntegralCodec.java         | 140 ++--
 .../encoding/compress/DirectCompressCodec.java  |   3 +-
 .../datastore/page/encoding/rle/RLECodec.java   | 113 ++--
 .../page/statistics/LVStringStatsCollector.java |   3 +-
 .../statistics/PrimitivePageStatsCollector.java | 307 ++++-----
 .../core/indexstore/UnsafeMemoryDMStore.java    |  87 ++-
 .../blockletindex/BlockletDataMap.java          | 107 ++--
 .../core/indexstore/row/DataMapRowImpl.java     |  18 +-
 .../DirectDictionaryKeyGeneratorFactory.java    |  14 +-
 .../DateDirectDictionaryGenerator.java          |   9 +-
 .../TimeStampDirectDictionaryGenerator.java     |  15 +-
 .../core/memory/HeapMemoryAllocator.java        |   2 +-
 .../ThriftWrapperSchemaConverterImpl.java       |  70 +-
 .../core/metadata/datatype/ArrayType.java       |  37 ++
 .../core/metadata/datatype/BooleanType.java     |  33 +
 .../core/metadata/datatype/ByteArrayType.java   |  33 +
 .../core/metadata/datatype/ByteType.java        |  32 +
 .../core/metadata/datatype/DataType.java        | 156 ++---
 .../core/metadata/datatype/DataTypes.java       | 117 ++++
 .../core/metadata/datatype/DateType.java        |  32 +
 .../core/metadata/datatype/DecimalType.java     |  33 +
 .../core/metadata/datatype/DoubleType.java      |  32 +
 .../core/metadata/datatype/FloatType.java       |  31 +
 .../core/metadata/datatype/IntType.java         |  32 +
 .../core/metadata/datatype/LegacyLongType.java  |  33 +
 .../core/metadata/datatype/LongType.java        |  32 +
 .../core/metadata/datatype/MapType.java         |  37 ++
 .../core/metadata/datatype/NullType.java        |  32 +
 .../core/metadata/datatype/ShortIntType.java    |  38 ++
 .../core/metadata/datatype/ShortType.java       |  32 +
 .../core/metadata/datatype/StringType.java      |  31 +
 .../core/metadata/datatype/StructType.java      |  37 ++
 .../core/metadata/datatype/TimestampType.java   |  32 +
 .../core/metadata/schema/table/CarbonTable.java |   3 -
 .../schema/table/column/CarbonColumn.java       |   2 +-
 .../table/column/CarbonImplicitDimension.java   |   3 +-
 .../schema/table/column/ColumnSchema.java       |  23 +-
 .../impl/AbstractScannedResultCollector.java    |  38 +-
 .../impl/DictionaryBasedResultCollector.java    |   6 +-
 .../RestructureBasedRawResultCollector.java     |   4 +-
 .../RestructureBasedVectorResultCollector.java  |  63 +-
 .../core/scan/complextypes/ArrayQueryType.java  |   5 +-
 .../scan/complextypes/PrimitiveQueryType.java   |  29 +-
 .../executor/impl/AbstractQueryExecutor.java    |   3 +-
 .../core/scan/executor/util/QueryUtil.java      |  61 +-
 .../scan/executor/util/RestructureUtil.java     | 122 ++--
 .../core/scan/expression/ExpressionResult.java  | 572 ++++++++---------
 .../expression/RangeExpressionEvaluator.java    |   5 +-
 .../conditional/EqualToExpression.java          |  51 +-
 .../GreaterThanEqualToExpression.java           |  49 +-
 .../conditional/GreaterThanExpression.java      |  49 +-
 .../expression/conditional/InExpression.java    |  49 +-
 .../conditional/LessThanEqualToExpression.java  |  49 +-
 .../conditional/LessThanExpression.java         |  49 +-
 .../conditional/NotEqualsExpression.java        |  51 +-
 .../expression/conditional/NotInExpression.java |  53 +-
 .../scan/expression/logical/AndExpression.java  |  14 +-
 .../expression/logical/FalseExpression.java     |   4 +-
 .../scan/expression/logical/OrExpression.java   |  15 +-
 .../expression/logical/RangeExpression.java     |  14 +-
 .../scan/expression/logical/TrueExpression.java |   4 +-
 .../scan/filter/FilterExpressionProcessor.java  |  14 +-
 .../carbondata/core/scan/filter/FilterUtil.java | 151 ++---
 .../executer/ExcludeFilterExecuterImpl.java     |  22 +-
 .../executer/IncludeFilterExecuterImpl.java     |  22 +-
 .../executer/RangeValueFilterExecuterImpl.java  |   4 +-
 .../executer/RestructureEvaluatorImpl.java      |   4 +-
 .../executer/RowLevelFilterExecuterImpl.java    |  96 ++-
 ...velRangeLessThanEqualFilterExecuterImpl.java |   3 +-
 .../RowLevelRangeLessThanFiterExecuterImpl.java |   3 +-
 .../resolver/ConditionalFilterResolverImpl.java |  10 +-
 .../RowLevelRangeFilterResolverImpl.java        |   4 +-
 .../visitor/CustomTypeDictionaryVisitor.java    |   3 +-
 .../core/scan/partition/PartitionUtil.java      |  76 ++-
 .../core/scan/partition/RangePartitioner.java   |   3 +-
 .../vector/MeasureDataVectorProcessor.java      |  22 +-
 .../util/AbstractDataFileFooterConverter.java   |  23 +-
 .../core/util/CarbonMetadataUtil.java           |  48 +-
 .../apache/carbondata/core/util/CarbonUtil.java | 109 ++--
 .../carbondata/core/util/DataTypeUtil.java      | 641 +++++++++----------
 .../core/util/comparator/Comparator.java        |  54 +-
 .../sortindex/CarbonDictionarySortModel.java    | 109 ++--
 .../dictionary/AbstractDictionaryCacheTest.java |   5 +-
 .../dictionary/ColumnDictionaryInfoTest.java    |  45 +-
 .../DictionaryCacheLoaderImplTest.java          |   6 +-
 .../DictionaryColumnUniqueIdentifierTest.java   |  26 +-
 .../cache/dictionary/ForwardDictionaryTest.java |   3 +-
 .../dictionary/ReverseDictionaryCacheTest.java  |   3 +-
 .../core/carbon/ColumnIdentifierTest.java       |  11 +-
 .../datastore/block/SegmentPropertiesTest.java  |  28 +-
 .../block/SegmentPropertiesTestUtil.java        |  25 +-
 .../datastore/page/encoding/RLECodecSuite.java  |  11 +-
 ...ctDictionaryKeyGeneratorFactoryUnitTest.java |   5 +-
 .../core/metadata/CarbonMetadataTest.java       |   5 +-
 .../ThriftWrapperSchemaConverterImplTest.java   |  33 +-
 .../metadata/schema/table/CarbonTableTest.java  |   5 +-
 .../table/CarbonTableWithComplexTypesTest.java  |  16 +-
 .../impl/RawBasedResultCollectorTest.java       |   2 +-
 .../complextypes/PrimitiveQueryTypeTest.java    |  21 +-
 .../scan/executor/util/RestructureUtilTest.java |  18 +-
 .../scan/expression/ColumnExpressionTest.java   |   5 +-
 .../scan/expression/ExpressionResultTest.java   | 141 ++--
 .../scan/expression/LiteralExpressionTest.java  |   8 +-
 .../conditional/EqualToExpressionUnitTest.java  |  31 +-
 .../GreaterThanEqualToExpressionUnitTest.java   |  45 +-
 .../GreaterThanExpressionUnitTest.java          |  41 +-
 .../conditional/InExpressionUnitTest.java       |  47 +-
 .../LessThanEqualToExpressionUnitTest.java      |  41 +-
 .../conditional/LessThanExpressionUnitTest.java |  41 +-
 .../conditional/ListExpressionUnitTest.java     |   5 +-
 .../NotEqualsExpressionUnitTest.java            |  49 +-
 .../conditional/NotInExpressionUnitTest.java    |  47 +-
 .../expression/logical/AndExpressionTest.java   |   9 +-
 .../expression/logical/FalseExpressionTest.java |   6 +-
 .../expression/logical/OrExpressionTest.java    |   9 +-
 .../core/scan/filter/FilterUtilTest.java        |  59 +-
 .../core/util/CarbonMetadataUtilTest.java       |   2 +-
 .../carbondata/core/util/CarbonUtilTest.java    |  15 +-
 .../core/util/DataFileFooterConverterTest.java  |  37 +-
 .../carbondata/core/util/DataTypeUtilTest.java  |  70 +-
 .../core/util/RangeFilterProcessorTest.java     |  97 +--
 .../CarbonDictionarySortInfoPreparatorTest.java |   5 +-
 .../CarbonDictionarySortModelTest.java          |  73 +--
 .../hadoop/ft/CarbonInputFormat_FT.java         |   5 +-
 .../hadoop/ft/CarbonInputMapperTest.java        |   5 +-
 .../test/util/ObjectSerializationUtilTest.java  |   9 +-
 .../hadoop/test/util/StoreCreator.java          |  15 +-
 .../hive/CarbonDictionaryDecodeReadSupport.java | 109 +---
 .../carbondata/presto/CarbonTypeUtil.java       |  32 +-
 .../presto/CarbonVectorizedRecordReader.java    |  31 +-
 .../carbondata/presto/CarbondataMetadata.java   |  51 +-
 .../carbondata/presto/PrestoFilterUtil.java     |  25 +-
 .../CarbonDictionaryDecodeReadSupport.scala     |   7 +-
 .../presto/util/CarbonDataStoreCreator.scala    |  22 +-
 .../testsuite/datamap/DataMapWriterSuite.scala  |   4 +-
 .../partition/TestDDLForPartitionTable.scala    |   9 +-
 ...ForPartitionTableWithDefaultProperties.scala |  10 +-
 .../apache/carbondata/spark/rdd/CarbonRDD.scala |   4 +-
 .../spark/rdd/CarbonScanPartitionRDD.scala      |   4 +-
 .../carbondata/spark/util/CarbonScalaUtil.scala |  46 +-
 .../carbondata/spark/util/CommonUtil.scala      |   6 +-
 .../spark/util/DataTypeConverterUtil.scala      |  92 +--
 .../spark/util/GlobalDictionaryUtil.scala       |   8 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   6 +-
 .../command/carbonTableSchemaCommon.scala       |  14 +-
 .../readsupport/SparkRowReadSupportImpl.java    |  28 +-
 .../spark/CarbonDataFrameWriter.scala           |   6 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   4 +-
 .../carbondata/spark/util/CarbonSparkUtil.scala |   2 +-
 .../spark/sql/CarbonDatasourceRelation.scala    |  18 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     |  23 +-
 .../org/apache/spark/sql/CarbonSparkUtil.scala  |   2 +-
 .../spark/sql/optimizer/CarbonFilters.scala     |  16 +-
 .../VectorizedCarbonRecordReader.java           |  31 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   4 +-
 .../carbondata/spark/util/CarbonSparkUtil.scala |   2 +-
 .../spark/sql/CarbonDataFrameWriter.scala       |   2 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     |  56 +-
 .../apache/spark/sql/hive/CarbonRelation.scala  |  36 +-
 .../spark/sql/optimizer/CarbonFilters.scala     |  24 +-
 .../processing/datatypes/StructDataType.java    |   4 +-
 .../converter/impl/FieldEncoderFactory.java     |  62 +-
 .../impl/NonDictionaryFieldConverterImpl.java   |   5 +-
 .../loading/parser/CarbonParserFactory.java     |  49 +-
 .../partition/impl/HashPartitionerImpl.java     |  23 +-
 .../sort/unsafe/UnsafeCarbonRowPage.java        | 187 +++---
 .../holder/UnsafeSortTempFileChunkHolder.java   |  40 +-
 .../merger/UnsafeIntermediateFileMerger.java    |  46 +-
 .../merger/CompactionResultSortProcessor.java   |  16 +-
 .../sort/sortdata/IntermediateFileMerger.java   |  36 +-
 .../processing/sort/sortdata/SortDataRows.java  |  38 +-
 .../sort/sortdata/SortTempFileChunkHolder.java  |  38 +-
 .../store/CarbonFactDataHandlerColumnar.java    |   3 +-
 .../carbondata/processing/store/TablePage.java  |  11 +-
 .../util/CarbonDataProcessorUtil.java           |  28 +-
 .../carbondata/processing/StoreCreator.java     |  15 +-
 196 files changed, 4448 insertions(+), 4256 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnDictionaryInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnDictionaryInfo.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnDictionaryInfo.java
index bc748c6..223812e 100644
--- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnDictionaryInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ColumnDictionaryInfo.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
@@ -193,7 +194,7 @@ public class ColumnDictionaryInfo extends AbstractColumnDictionaryInfo {
       int surrogateKey = sortedSurrogates.get(mid);
       byte[] dictionaryValue = getDictionaryBytesFromSurrogate(surrogateKey);
       int cmp = -1;
-      if (this.getDataType() != DataType.STRING) {
+      if (this.getDataType() != DataTypes.STRING) {
         cmp = compareFilterKeyWithDictionaryKey(
             new String(dictionaryValue, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)),
             filterKey, this.getDataType());
@@ -240,7 +241,7 @@ public class ColumnDictionaryInfo extends AbstractColumnDictionaryInfo {
         //fortify fix
         if (null == dictionaryValue) {
           cmp = -1;
-        } else if (this.getDataType() != DataType.STRING) {
+        } else if (this.getDataType() != DataTypes.STRING) {
           cmp = compareFilterKeyWithDictionaryKey(
               new String(dictionaryValue, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)),
               filterKey, this.getDataType());
@@ -270,35 +271,32 @@ public class ColumnDictionaryInfo extends AbstractColumnDictionaryInfo {
   private int compareFilterKeyWithDictionaryKey(String dictionaryVal, String memberVal,
       DataType dataType) {
     try {
-      switch (dataType) {
-        case SHORT:
-          return Short.compare((Short.parseShort(dictionaryVal)), (Short.parseShort(memberVal)));
-        case INT:
-          return Integer.compare((Integer.parseInt(dictionaryVal)), (Integer.parseInt(memberVal)));
-        case DOUBLE:
-          return DataTypeUtil
-              .compareDoubleWithNan((Double.parseDouble(dictionaryVal)),
-                  (Double.parseDouble(memberVal)));
-        case LONG:
-          return Long.compare((Long.parseLong(dictionaryVal)), (Long.parseLong(memberVal)));
-        case BOOLEAN:
-          return Boolean
-              .compare((Boolean.parseBoolean(dictionaryVal)), (Boolean.parseBoolean(memberVal)));
-        case DATE:
-        case TIMESTAMP:
-          String format = CarbonUtil.getFormatFromProperty(dataType);
-          SimpleDateFormat parser = new SimpleDateFormat(format);
-          Date dateToStr;
-          Date dictionaryDate;
-          dateToStr = parser.parse(memberVal);
-          dictionaryDate = parser.parse(dictionaryVal);
-          return dictionaryDate.compareTo(dateToStr);
-        case DECIMAL:
-          java.math.BigDecimal javaDecValForDictVal = new java.math.BigDecimal(dictionaryVal);
-          java.math.BigDecimal javaDecValForMemberVal = new java.math.BigDecimal(memberVal);
-          return javaDecValForDictVal.compareTo(javaDecValForMemberVal);
-        default:
-          return -1;
+      if (dataType == DataTypes.SHORT) {
+        return Short.compare((Short.parseShort(dictionaryVal)), (Short.parseShort(memberVal)));
+      } else if (dataType == DataTypes.INT) {
+        return Integer.compare((Integer.parseInt(dictionaryVal)), (Integer.parseInt(memberVal)));
+      } else if (dataType == DataTypes.DOUBLE) {
+        return DataTypeUtil.compareDoubleWithNan(
+            (Double.parseDouble(dictionaryVal)), (Double.parseDouble(memberVal)));
+      } else if (dataType == DataTypes.LONG) {
+        return Long.compare((Long.parseLong(dictionaryVal)), (Long.parseLong(memberVal)));
+      } else if (dataType == DataTypes.BOOLEAN) {
+        return Boolean.compare(
+            (Boolean.parseBoolean(dictionaryVal)), (Boolean.parseBoolean(memberVal)));
+      } else if (dataType == DataTypes.DATE || dataType == DataTypes.TIMESTAMP) {
+        String format = CarbonUtil.getFormatFromProperty(dataType);
+        SimpleDateFormat parser = new SimpleDateFormat(format);
+        Date dateToStr;
+        Date dictionaryDate;
+        dateToStr = parser.parse(memberVal);
+        dictionaryDate = parser.parse(dictionaryVal);
+        return dictionaryDate.compareTo(dateToStr);
+      } else if (dataType == DataTypes.DECIMAL) {
+        java.math.BigDecimal javaDecValForDictVal = new java.math.BigDecimal(dictionaryVal);
+        java.math.BigDecimal javaDecValForMemberVal = new java.math.BigDecimal(memberVal);
+        return javaDecValForDictVal.compareTo(javaDecValForMemberVal);
+      } else {
+        return -1;
       }
     } catch (Exception e) {
       //In all data types excluding String data type the null member will be the highest

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 51b92cc..0510d7f 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -632,8 +632,8 @@ public final class CarbonCommonConstants {
   public static final String SHORT = "Short";
   public static final String NUMERIC = "Numeric";
   public static final String TIMESTAMP = "Timestamp";
-  public static final String ARRAY = "ARRAY";
-  public static final String STRUCT = "STRUCT";
+  public static final String ARRAY = "array";
+  public static final String STRUCT = "struct";
   public static final String FROM = "from";
   /**
    * FACT_UPDATE_EXTENSION.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index 9796a77..62cf813 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -21,7 +21,6 @@ import java.util.List;
 
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
-import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.events.ChangeEvent;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
index b0b62db..9f29e27 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.Writable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
@@ -62,7 +63,7 @@ public class TableSpec {
         if (dimension.isComplex()) {
           DimensionSpec spec = new DimensionSpec(ColumnType.COMPLEX, dimension);
           dimensionSpec[dimIndex++] = spec;
-        } else if (dimension.getDataType() == DataType.TIMESTAMP && !dimension
+        } else if (dimension.getDataType() == DataTypes.TIMESTAMP && !dimension
             .isDirectDictionaryEncoding()) {
           DimensionSpec spec = new DimensionSpec(ColumnType.PLAIN_VALUE, dimension);
           dimensionSpec[dimIndex++] = spec;
@@ -167,7 +168,7 @@ public class TableSpec {
     @Override
     public void write(DataOutput out) throws IOException {
       out.writeUTF(fieldName);
-      out.writeByte(schemaDataType.ordinal());
+      out.writeByte(schemaDataType.getId());
       out.writeByte(columnType.ordinal());
       out.writeInt(scale);
       out.writeInt(precision);
@@ -176,7 +177,7 @@ public class TableSpec {
     @Override
     public void readFields(DataInput in) throws IOException {
       this.fieldName = in.readUTF();
-      this.schemaDataType = DataType.valueOf(in.readByte());
+      this.schemaDataType = DataTypes.valueOf(in.readByte());
       this.columnType = ColumnType.valueOf(in.readByte());
       this.scale = in.readInt();
       this.precision = in.readInt();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
index a742a5b..2ac200f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter;
 import org.apache.carbondata.core.keygenerator.columnar.impl.MultiDimKeyVarLengthVariableSplitGenerator;
 import org.apache.carbondata.core.keygenerator.mdkey.MultiDimKeyVarLengthGenerator;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
@@ -378,7 +379,7 @@ public class SegmentProperties {
         // as complex type will be stored at last so once complex type started all the dimension
         // will be added to complex type
         else if (isComplexDimensionStarted || CarbonUtil.hasDataType(columnSchema.getDataType(),
-            new DataType[] { DataType.ARRAY, DataType.STRUCT })) {
+            new DataType[] { DataTypes.ARRAY, DataTypes.STRUCT })) {
           cardinalityIndexForComplexDimensionColumn.add(tableOrdinal);
           carbonDimension =
               new CarbonDimension(columnSchema, dimensonOrdinal++, -1, -1, ++complexTypeOrdinal);
@@ -668,7 +669,7 @@ public class SegmentProperties {
     int k = eachDimColumnValueSize.length + eachComplexDimColumnValueSize.length;
     for (int i = 0; i < measures.size(); i++) {
       DataType dataType = measures.get(i).getDataType();
-      if (dataType.equals(DataType.DECIMAL)) {
+      if (dataType.equals(DataTypes.DECIMAL)) {
         dimensionValueSize[k++] = -1;
       } else {
         dimensionValueSize[k++] = 8;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionDataChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionDataChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionDataChunk.java
index 97fd226..6629d31 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionDataChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionDataChunk.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.chunk.impl;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory;
 import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory.DimensionStoreType;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
@@ -103,17 +105,14 @@ public class FixedLengthDimensionDataChunk extends AbstractDimensionDataChunk {
         if (valueFromSurrogate == null) {
           vector.putNull(vectorOffset++);
         } else {
-          switch (columnVectorInfo.directDictionaryGenerator.getReturnType()) {
-            case INT:
-              vector.putInt(vectorOffset++, (int) valueFromSurrogate);
-              break;
-            case LONG:
-              vector.putLong(vectorOffset++, (long) valueFromSurrogate);
-              break;
-            default:
-              throw new IllegalArgumentException(
-                  "unsupported data type: " + columnVectorInfo.directDictionaryGenerator
-                      .getReturnType());
+          DataType dataType = columnVectorInfo.directDictionaryGenerator.getReturnType();
+          if (dataType == DataTypes.INT) {
+            vector.putInt(vectorOffset++, (int) valueFromSurrogate);
+          } else if (dataType == DataTypes.LONG) {
+            vector.putLong(vectorOffset++, (long) valueFromSurrogate);
+          } else {
+            throw new IllegalArgumentException("unsupported data type: " +
+                columnVectorInfo.directDictionaryGenerator.getReturnType());
           }
         }
       }
@@ -147,17 +146,14 @@ public class FixedLengthDimensionDataChunk extends AbstractDimensionDataChunk {
         if (valueFromSurrogate == null) {
           vector.putNull(vectorOffset++);
         } else {
-          switch (columnVectorInfo.directDictionaryGenerator.getReturnType()) {
-            case INT:
-              vector.putInt(vectorOffset++, (int) valueFromSurrogate);
-              break;
-            case LONG:
-              vector.putLong(vectorOffset++, (long) valueFromSurrogate);
-              break;
-            default:
-              throw new IllegalArgumentException(
-                  "unsupported data type: " + columnVectorInfo.directDictionaryGenerator
-                      .getReturnType());
+          DataType dataType = columnVectorInfo.directDictionaryGenerator.getReturnType();
+          if (dataType == DataTypes.INT) {
+            vector.putInt(vectorOffset++, (int) valueFromSurrogate);
+          } else if (dataType == DataTypes.LONG) {
+            vector.putLong(vectorOffset++, (long) valueFromSurrogate);
+          } else {
+            throw new IllegalArgumentException("unsupported data type: " +
+                columnVectorInfo.directDictionaryGenerator.getReturnType());
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index 3191200..97f0b3f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -30,7 +30,11 @@ import java.util.zip.GZIPInputStream;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.FileHolder;
-import org.apache.carbondata.core.datastore.filesystem.*;
+import org.apache.carbondata.core.datastore.filesystem.AlluxioCarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.HDFSCarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.LocalCarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.ViewFSCarbonFile;
 import org.apache.carbondata.core.util.CarbonUtil;
 
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index 6c534d6..012413b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -30,17 +30,18 @@ import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsColle
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonProperties;
 
-import static org.apache.carbondata.core.metadata.datatype.DataType.BYTE;
-import static org.apache.carbondata.core.metadata.datatype.DataType.BYTE_ARRAY;
-import static org.apache.carbondata.core.metadata.datatype.DataType.DECIMAL;
-import static org.apache.carbondata.core.metadata.datatype.DataType.DOUBLE;
-import static org.apache.carbondata.core.metadata.datatype.DataType.FLOAT;
-import static org.apache.carbondata.core.metadata.datatype.DataType.INT;
-import static org.apache.carbondata.core.metadata.datatype.DataType.LONG;
-import static org.apache.carbondata.core.metadata.datatype.DataType.SHORT;
-import static org.apache.carbondata.core.metadata.datatype.DataType.SHORT_INT;
+import static org.apache.carbondata.core.metadata.datatype.DataTypes.BYTE;
+import static org.apache.carbondata.core.metadata.datatype.DataTypes.BYTE_ARRAY;
+import static org.apache.carbondata.core.metadata.datatype.DataTypes.DECIMAL;
+import static org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE;
+import static org.apache.carbondata.core.metadata.datatype.DataTypes.FLOAT;
+import static org.apache.carbondata.core.metadata.datatype.DataTypes.INT;
+import static org.apache.carbondata.core.metadata.datatype.DataTypes.LONG;
+import static org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT;
+import static org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT_INT;
 
 public abstract class ColumnPage {
 
@@ -183,59 +184,42 @@ public abstract class ColumnPage {
       int pageSize) throws MemoryException {
     ColumnPage instance;
     if (unsafe) {
-      switch (dataType) {
-        case BYTE:
-        case SHORT:
-        case SHORT_INT:
-        case INT:
-        case LONG:
-        case FLOAT:
-        case DOUBLE:
-          instance = new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize);
-          break;
-        case DECIMAL:
-          instance = new UnsafeDecimalColumnPage(columnSpec, dataType, pageSize);
-          break;
-        case STRING:
-        case BYTE_ARRAY:
-          instance =
-              new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize);
-          break;
-        default:
-          throw new RuntimeException("Unsupported data dataType: " + dataType);
+      if (dataType == DataTypes.BYTE ||
+          dataType == DataTypes.SHORT ||
+          dataType == DataTypes.SHORT_INT ||
+          dataType == DataTypes.INT ||
+          dataType == DataTypes.LONG ||
+          dataType == DataTypes.FLOAT ||
+          dataType == DataTypes.DOUBLE) {
+        instance = new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize);
+      } else if (dataType == DataTypes.DECIMAL) {
+        instance = new UnsafeDecimalColumnPage(columnSpec, dataType, pageSize);
+      } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY) {
+        instance = new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize);
+      } else {
+        throw new RuntimeException("Unsupported data dataType: " + dataType);
       }
     } else {
-      switch (dataType) {
-        case BYTE:
-          instance = newBytePage(columnSpec, new byte[pageSize]);
-          break;
-        case SHORT:
-          instance = newShortPage(columnSpec, new short[pageSize]);
-          break;
-        case SHORT_INT:
-          instance = newShortIntPage(columnSpec, new byte[pageSize * 3]);
-          break;
-        case INT:
-          instance = newIntPage(columnSpec, new int[pageSize]);
-          break;
-        case LONG:
-          instance = newLongPage(columnSpec, new long[pageSize]);
-          break;
-        case FLOAT:
-          instance = newFloatPage(columnSpec, new float[pageSize]);
-          break;
-        case DOUBLE:
-          instance = newDoublePage(columnSpec, new double[pageSize]);
-          break;
-        case DECIMAL:
-          instance = newDecimalPage(columnSpec, new byte[pageSize][]);
-          break;
-        case STRING:
-        case BYTE_ARRAY:
-          instance = new SafeVarLengthColumnPage(columnSpec, dataType, pageSize);
-          break;
-        default:
-          throw new RuntimeException("Unsupported data dataType: " + dataType);
+      if (dataType == DataTypes.BYTE) {
+        instance = newBytePage(columnSpec, new byte[pageSize]);
+      } else if (dataType == DataTypes.SHORT) {
+        instance = newShortPage(columnSpec, new short[pageSize]);
+      } else if (dataType == DataTypes.SHORT_INT) {
+        instance = newShortIntPage(columnSpec, new byte[pageSize * 3]);
+      } else if (dataType == DataTypes.INT) {
+        instance = newIntPage(columnSpec, new int[pageSize]);
+      } else if (dataType == DataTypes.LONG) {
+        instance = newLongPage(columnSpec, new long[pageSize]);
+      } else if (dataType == DataTypes.FLOAT) {
+        instance = newFloatPage(columnSpec, new float[pageSize]);
+      } else if (dataType == DataTypes.DOUBLE) {
+        instance = newDoublePage(columnSpec, new double[pageSize]);
+      } else if (dataType == DataTypes.DECIMAL) {
+        instance = newDecimalPage(columnSpec, new byte[pageSize][]);
+      } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY) {
+        instance = new SafeVarLengthColumnPage(columnSpec, dataType, pageSize);
+      } else {
+        throw new RuntimeException("Unsupported data dataType: " + dataType);
       }
     }
     return instance;
@@ -360,38 +344,29 @@ public abstract class ColumnPage {
       nullBitSet.set(rowId);
       return;
     }
-    switch (dataType) {
-      case BYTE:
-        putByte(rowId, (byte) value);
-        statsCollector.update((byte) value);
-        break;
-      case SHORT:
-        putShort(rowId, (short) value);
-        statsCollector.update((short) value);
-        break;
-      case INT:
-        putInt(rowId, (int) value);
-        statsCollector.update((int) value);
-        break;
-      case LONG:
-        putLong(rowId, (long) value);
-        statsCollector.update((long) value);
-        break;
-      case DOUBLE:
-        putDouble(rowId, (double) value);
-        statsCollector.update((double) value);
-        break;
-      case DECIMAL:
-        putDecimal(rowId, (BigDecimal) value);
-        statsCollector.update((BigDecimal) value);
-        break;
-      case STRING:
-      case BYTE_ARRAY:
-        putBytes(rowId, (byte[]) value);
-        statsCollector.update((byte[]) value);
-        break;
-      default:
-        throw new RuntimeException("unsupported data type: " + dataType);
+    if (dataType == DataTypes.BYTE) {
+      putByte(rowId, (byte) value);
+      statsCollector.update((byte) value);
+    } else if (dataType == DataTypes.SHORT) {
+      putShort(rowId, (short) value);
+      statsCollector.update((short) value);
+    } else if (dataType == DataTypes.INT) {
+      putInt(rowId, (int) value);
+      statsCollector.update((int) value);
+    } else if (dataType == DataTypes.LONG) {
+      putLong(rowId, (long) value);
+      statsCollector.update((long) value);
+    } else if (dataType == DataTypes.DOUBLE) {
+      putDouble(rowId, (double) value);
+      statsCollector.update((double) value);
+    } else if (dataType == DataTypes.DECIMAL) {
+      putDecimal(rowId, (BigDecimal) value);
+      statsCollector.update((BigDecimal) value);
+    } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY) {
+      putBytes(rowId, (byte[]) value);
+      statsCollector.update((byte[]) value);
+    } else {
+      throw new RuntimeException("unsupported data type: " + dataType);
     }
   }
 
@@ -445,27 +420,20 @@ public abstract class ColumnPage {
    * Set null at rowId
    */
   private void putNull(int rowId) {
-    switch (dataType) {
-      case BYTE:
-        putByte(rowId, (byte) 0);
-        break;
-      case SHORT:
-        putShort(rowId, (short) 0);
-        break;
-      case INT:
-        putInt(rowId, 0);
-        break;
-      case LONG:
-        putLong(rowId, 0L);
-        break;
-      case DOUBLE:
-        putDouble(rowId, 0.0);
-        break;
-      case DECIMAL:
-        putDecimal(rowId, BigDecimal.ZERO);
-        break;
-      default:
-        throw new IllegalArgumentException("unsupported data type: " + dataType);
+    if (dataType == DataTypes.BYTE) {
+      putByte(rowId, (byte) 0);
+    } else if (dataType == DataTypes.SHORT) {
+      putShort(rowId, (short) 0);
+    } else if (dataType == DataTypes.INT) {
+      putInt(rowId, 0);
+    } else if (dataType == DataTypes.LONG) {
+      putLong(rowId, 0L);
+    } else if (dataType == DataTypes.DOUBLE) {
+      putDouble(rowId, 0.0);
+    } else if (dataType == DataTypes.DECIMAL) {
+      putDecimal(rowId, BigDecimal.ZERO);
+    } else {
+      throw new IllegalArgumentException("unsupported data type: " + dataType);
     }
   }
 
@@ -573,27 +541,26 @@ public abstract class ColumnPage {
    * Compress page data using specified compressor
    */
   public byte[] compress(Compressor compressor) throws MemoryException, IOException {
-    switch (dataType) {
-      case BYTE:
-        return compressor.compressByte(getBytePage());
-      case SHORT:
-        return compressor.compressShort(getShortPage());
-      case SHORT_INT:
-        return compressor.compressByte(getShortIntPage());
-      case INT:
-        return compressor.compressInt(getIntPage());
-      case LONG:
-        return compressor.compressLong(getLongPage());
-      case FLOAT:
-        return compressor.compressFloat(getFloatPage());
-      case DOUBLE:
-        return compressor.compressDouble(getDoublePage());
-      case DECIMAL:
-        return compressor.compressByte(getDecimalPage());
-      case BYTE_ARRAY:
-        return compressor.compressByte(getLVFlattenedBytePage());
-      default:
-        throw new UnsupportedOperationException("unsupport compress column page: " + dataType);
+    if (dataType == DataTypes.BYTE) {
+      return compressor.compressByte(getBytePage());
+    } else if (dataType == DataTypes.SHORT) {
+      return compressor.compressShort(getShortPage());
+    } else if (dataType == DataTypes.SHORT_INT) {
+      return compressor.compressByte(getShortIntPage());
+    } else if (dataType == DataTypes.INT) {
+      return compressor.compressInt(getIntPage());
+    } else if (dataType == DataTypes.LONG) {
+      return compressor.compressLong(getLongPage());
+    } else if (dataType == DataTypes.FLOAT) {
+      return compressor.compressFloat(getFloatPage());
+    } else if (dataType == DataTypes.DOUBLE) {
+      return compressor.compressDouble(getDoublePage());
+    } else if (dataType == DataTypes.DECIMAL) {
+      return compressor.compressByte(getDecimalPage());
+    } else if (dataType == DataTypes.BYTE_ARRAY) {
+      return compressor.compressByte(getLVFlattenedBytePage());
+    } else {
+      throw new UnsupportedOperationException("unsupport compress column page: " + dataType);
     }
   }
 
@@ -606,34 +573,34 @@ public abstract class ColumnPage {
       throws MemoryException {
     Compressor compressor = CompressorFactory.getInstance().getCompressor(meta.getCompressorName());
     TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
-    switch (meta.getStoreDataType()) {
-      case BYTE:
-        byte[] byteData = compressor.unCompressByte(compressedData, offset, length);
-        return newBytePage(columnSpec, byteData);
-      case SHORT:
-        short[] shortData = compressor.unCompressShort(compressedData, offset, length);
-        return newShortPage(columnSpec, shortData);
-      case SHORT_INT:
-        byte[] shortIntData = compressor.unCompressByte(compressedData, offset, length);
-        return newShortIntPage(columnSpec, shortIntData);
-      case INT:
-        int[] intData = compressor.unCompressInt(compressedData, offset, length);
-        return newIntPage(columnSpec, intData);
-      case LONG:
-        long[] longData = compressor.unCompressLong(compressedData, offset, length);
-        return newLongPage(columnSpec, longData);
-      case FLOAT:
-        float[] floatData = compressor.unCompressFloat(compressedData, offset, length);
-        return newFloatPage(columnSpec, floatData);
-      case DOUBLE:
-        double[] doubleData = compressor.unCompressDouble(compressedData, offset, length);
-        return newDoublePage(columnSpec, doubleData);
-      case BYTE_ARRAY:
-        byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
-        return newLVBytesPage(columnSpec, lvVarBytes);
-      default:
-        throw new UnsupportedOperationException("unsupport uncompress column page: " +
-            meta.getStoreDataType());
+    DataType storeDataType = meta.getStoreDataType();
+    if (storeDataType == DataTypes.BYTE) {
+      byte[] byteData = compressor.unCompressByte(compressedData, offset, length);
+      return newBytePage(columnSpec, byteData);
+    } else if (storeDataType == DataTypes.SHORT) {
+      short[] shortData = compressor.unCompressShort(compressedData, offset, length);
+      return newShortPage(columnSpec, shortData);
+    } else if (storeDataType == DataTypes.SHORT_INT) {
+      byte[] shortIntData = compressor.unCompressByte(compressedData, offset, length);
+      return newShortIntPage(columnSpec, shortIntData);
+    } else if (storeDataType == DataTypes.INT) {
+      int[] intData = compressor.unCompressInt(compressedData, offset, length);
+      return newIntPage(columnSpec, intData);
+    } else if (storeDataType == DataTypes.LONG) {
+      long[] longData = compressor.unCompressLong(compressedData, offset, length);
+      return newLongPage(columnSpec, longData);
+    } else if (storeDataType == DataTypes.FLOAT) {
+      float[] floatData = compressor.unCompressFloat(compressedData, offset, length);
+      return newFloatPage(columnSpec, floatData);
+    } else if (storeDataType == DataTypes.DOUBLE) {
+      double[] doubleData = compressor.unCompressDouble(compressedData, offset, length);
+      return newDoublePage(columnSpec, doubleData);
+    } else if (storeDataType == DataTypes.BYTE_ARRAY) {
+      byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
+      return newLVBytesPage(columnSpec, lvVarBytes);
+    } else {
+      throw new UnsupportedOperationException("unsupport uncompress column page: " +
+          meta.getStoreDataType());
     }
   }
 
@@ -645,35 +612,35 @@ public abstract class ColumnPage {
     Compressor compressor = CompressorFactory.getInstance().getCompressor(meta.getCompressorName());
     TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
     ColumnPage decimalPage = null;
-    switch (meta.getStoreDataType()) {
-      case BYTE:
-        byte[] byteData = compressor.unCompressByte(compressedData, offset, length);
-        decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), byteData.length);
-        decimalPage.setBytePage(byteData);
-        return decimalPage;
-      case SHORT:
-        short[] shortData = compressor.unCompressShort(compressedData, offset, length);
-        decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), shortData.length);
-        decimalPage.setShortPage(shortData);
-        return decimalPage;
-      case SHORT_INT:
-        byte[] shortIntData = compressor.unCompressByte(compressedData, offset, length);
-        decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), shortIntData.length);
-        decimalPage.setShortIntPage(shortIntData);
-        return decimalPage;
-      case INT:
-        int[] intData = compressor.unCompressInt(compressedData, offset, length);
-        decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), intData.length);
-        decimalPage.setIntPage(intData);
-        return decimalPage;
-      case LONG:
-        long[] longData = compressor.unCompressLong(compressedData, offset, length);
-        decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), longData.length);
-        decimalPage.setLongPage(longData);
-        return decimalPage;
-      default:
-        byte[] lvEncodedBytes = compressor.unCompressByte(compressedData, offset, length);
-        return newDecimalPage(columnSpec, lvEncodedBytes);
+    DataType storeDataType = meta.getStoreDataType();
+    if (storeDataType == DataTypes.BYTE) {
+      byte[] byteData = compressor.unCompressByte(compressedData, offset, length);
+      decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), byteData.length);
+      decimalPage.setBytePage(byteData);
+      return decimalPage;
+    } else if (storeDataType == DataTypes.SHORT) {
+      short[] shortData = compressor.unCompressShort(compressedData, offset, length);
+      decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), shortData.length);
+      decimalPage.setShortPage(shortData);
+      return decimalPage;
+    } else if (storeDataType == DataTypes.SHORT_INT) {
+      byte[] shortIntData = compressor.unCompressByte(compressedData, offset, length);
+      decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), shortIntData.length);
+      decimalPage.setShortIntPage(shortIntData);
+      return decimalPage;
+    }  else if (storeDataType == DataTypes.INT) {
+      int[] intData = compressor.unCompressInt(compressedData, offset, length);
+      decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), intData.length);
+      decimalPage.setIntPage(intData);
+      return decimalPage;
+    } else if (storeDataType == DataTypes.LONG) {
+      long[] longData = compressor.unCompressLong(compressedData, offset, length);
+      decimalPage = createDecimalPage(columnSpec, meta.getStoreDataType(), longData.length);
+      decimalPage.setLongPage(longData);
+      return decimalPage;
+    } else {
+      byte[] lvEncodedBytes = compressor.unCompressByte(compressedData, offset, length);
+      return newDecimalPage(columnSpec, lvEncodedBytes);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
index 4bdb252..cebb3c0 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.page;
 
 import java.math.BigDecimal;
 
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
 
 /**
@@ -50,41 +52,41 @@ public class LazyColumnPage extends ColumnPage {
 
   @Override
   public long getLong(int rowId) {
-    switch (columnPage.getDataType()) {
-      case BYTE:
-        return converter.decodeLong(columnPage.getByte(rowId));
-      case SHORT:
-        return converter.decodeLong(columnPage.getShort(rowId));
-      case SHORT_INT:
-        return converter.decodeLong(columnPage.getShortInt(rowId));
-      case INT:
-        return converter.decodeLong(columnPage.getInt(rowId));
-      case LONG:
-        return columnPage.getLong(rowId);
-      default:
-        throw new RuntimeException("internal error: " + this.toString());
+    DataType dataType = columnPage.getDataType();
+    if (dataType == DataTypes.BYTE) {
+      return converter.decodeLong(columnPage.getByte(rowId));
+    } else if (dataType == DataTypes.SHORT) {
+      return converter.decodeLong(columnPage.getShort(rowId));
+    } else if (dataType == DataTypes.SHORT_INT) {
+      return converter.decodeLong(columnPage.getShortInt(rowId));
+    } else if (dataType == DataTypes.INT) {
+      return converter.decodeLong(columnPage.getInt(rowId));
+    } else if (dataType == DataTypes.LONG) {
+      return columnPage.getLong(rowId);
+    } else {
+      throw new RuntimeException("internal error: " + this.toString());
     }
   }
 
   @Override
   public double getDouble(int rowId) {
-    switch (columnPage.getDataType()) {
-      case BYTE:
-        return converter.decodeDouble(columnPage.getByte(rowId));
-      case SHORT:
-        return converter.decodeDouble(columnPage.getShort(rowId));
-      case SHORT_INT:
-        return converter.decodeDouble(columnPage.getShortInt(rowId));
-      case INT:
-        return converter.decodeDouble(columnPage.getInt(rowId));
-      case LONG:
-        return converter.decodeDouble(columnPage.getLong(rowId));
-      case FLOAT:
-        return converter.decodeDouble(columnPage.getFloat(rowId));
-      case DOUBLE:
-        return columnPage.getDouble(rowId);
-      default:
-        throw new RuntimeException("internal error: " + this.toString());
+    DataType dataType = columnPage.getDataType();
+    if (dataType == DataTypes.BYTE) {
+      return converter.decodeDouble(columnPage.getByte(rowId));
+    } else if (dataType == DataTypes.SHORT) {
+      return converter.decodeDouble(columnPage.getShort(rowId));
+    } else if (dataType == DataTypes.SHORT_INT) {
+      return converter.decodeDouble(columnPage.getShortInt(rowId));
+    } else if (dataType == DataTypes.INT) {
+      return converter.decodeDouble(columnPage.getInt(rowId));
+    } else if (dataType == DataTypes.LONG) {
+      return converter.decodeDouble(columnPage.getLong(rowId));
+    } else if (dataType == DataTypes.FLOAT) {
+      return converter.decodeDouble(columnPage.getFloat(rowId));
+    } else if (dataType == DataTypes.DOUBLE) {
+      return columnPage.getDouble(rowId);
+    } else {
+      throw new RuntimeException("internal error: " + this.toString());
     }
   }
 
@@ -97,20 +99,19 @@ public class LazyColumnPage extends ColumnPage {
   public BigDecimal getDecimal(int rowId) {
     DecimalConverterFactory.DecimalConverter decimalConverter =
         ((DecimalColumnPage) columnPage).getDecimalConverter();
-    switch (columnPage.getDataType()) {
-      case BYTE:
-        return decimalConverter.getDecimal(converter.decodeLong(columnPage.getByte(rowId)));
-      case SHORT:
-        return decimalConverter.getDecimal(converter.decodeLong(columnPage.getShort(rowId)));
-      case SHORT_INT:
-        return decimalConverter.getDecimal(converter.decodeLong(columnPage.getShortInt(rowId)));
-      case INT:
-        return decimalConverter.getDecimal(converter.decodeLong(columnPage.getInt(rowId)));
-      case LONG:
-      case DECIMAL:
-        return columnPage.getDecimal(rowId);
-      default:
-        throw new RuntimeException("internal error: " + this.toString());
+    DataType dataType = columnPage.getDataType();
+    if (dataType == DataTypes.BYTE) {
+      return decimalConverter.getDecimal(converter.decodeLong(columnPage.getByte(rowId)));
+    } else if (dataType == DataTypes.SHORT) {
+      return decimalConverter.getDecimal(converter.decodeLong(columnPage.getShort(rowId)));
+    } else if (dataType == DataTypes.SHORT_INT) {
+      return decimalConverter.getDecimal(converter.decodeLong(columnPage.getShortInt(rowId)));
+    } else if (dataType == DataTypes.INT) {
+      return decimalConverter.getDecimal(converter.decodeLong(columnPage.getInt(rowId)));
+    } else if (dataType == DataTypes.LONG || dataType == DataTypes.DECIMAL) {
+      return columnPage.getDecimal(rowId);
+    } else {
+      throw new RuntimeException("internal error: " + this.toString());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
index 01d3d87..cb2d3bd 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
@@ -21,6 +21,7 @@ import java.math.BigDecimal;
 
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.ByteUtil;
 
 /**
@@ -173,25 +174,19 @@ public class SafeDecimalColumnPage extends DecimalColumnPage {
   @Override
   public BigDecimal getDecimal(int rowId) {
     long value;
-    switch (dataType) {
-      case BYTE:
-        value = getByte(rowId);
-        break;
-      case SHORT:
-        value = getShort(rowId);
-        break;
-      case SHORT_INT:
-        value = getShortInt(rowId);
-        break;
-      case INT:
-        value = getInt(rowId);
-        break;
-      case LONG:
-        value = getLong(rowId);
-        break;
-      default:
-        byte[] bytes = byteArrayData[rowId];
-        return decimalConverter.getDecimal(bytes);
+    if (dataType == DataTypes.BYTE) {
+      value = getByte(rowId);
+    } else if (dataType == DataTypes.SHORT) {
+      value = getShort(rowId);
+    } else if (dataType == DataTypes.SHORT_INT) {
+      value = getShortInt(rowId);
+    } else if (dataType == DataTypes.INT) {
+      value = getInt(rowId);
+    } else if (dataType == DataTypes.LONG) {
+      value = getLong(rowId);
+    } else {
+      byte[] bytes = byteArrayData[rowId];
+      return decimalConverter.getDecimal(bytes);
     }
     return decimalConverter.getDecimal(value);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
index 33d306d..5f848c0 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
@@ -21,6 +21,7 @@ import java.math.BigDecimal;
 
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.ByteUtil;
 
 /**
@@ -318,40 +319,33 @@ public class SafeFixLengthColumnPage extends ColumnPage {
    */
   @Override
   public void convertValue(ColumnPageValueConverter codec) {
-    switch (dataType) {
-      case BYTE:
-        for (int i = 0; i < pageSize; i++) {
-          codec.encode(i, byteData[i]);
-        }
-        break;
-      case SHORT:
-        for (int i = 0; i < pageSize; i++) {
-          codec.encode(i, shortData[i]);
-        }
-        break;
-      case INT:
-        for (int i = 0; i < pageSize; i++) {
-          codec.encode(i, intData[i]);
-        }
-        break;
-      case LONG:
-        for (int i = 0; i < pageSize; i++) {
-          codec.encode(i, longData[i]);
-        }
-        break;
-      case FLOAT:
-        for (int i = 0; i < pageSize; i++) {
-          codec.encode(i, floatData[i]);
-        }
-        break;
-      case DOUBLE:
-        for (int i = 0; i < pageSize; i++) {
-          codec.encode(i, doubleData[i]);
-        }
-        break;
-      default:
-        throw new UnsupportedOperationException("not support value conversion on " +
-            dataType + " page");
+    if (dataType == DataTypes.BYTE) {
+      for (int i = 0; i < pageSize; i++) {
+        codec.encode(i, byteData[i]);
+      }
+    } else if (dataType == DataTypes.SHORT) {
+      for (int i = 0; i < pageSize; i++) {
+        codec.encode(i, shortData[i]);
+      }
+    } else if (dataType == DataTypes.INT) {
+      for (int i = 0; i < pageSize; i++) {
+        codec.encode(i, intData[i]);
+      }
+    } else if (dataType == DataTypes.LONG) {
+      for (int i = 0; i < pageSize; i++) {
+        codec.encode(i, longData[i]);
+      }
+    } else if (dataType == DataTypes.FLOAT) {
+      for (int i = 0; i < pageSize; i++) {
+        codec.encode(i, floatData[i]);
+      }
+    } else if (dataType == DataTypes.DOUBLE) {
+      for (int i = 0; i < pageSize; i++) {
+        codec.encode(i, doubleData[i]);
+      }
+    } else {
+      throw new UnsupportedOperationException("not support value conversion on " +
+          dataType + " page");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
index 45fa7d8..b4f33b8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeDecimalColumnPage.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.ByteUtil;
 
 /**
@@ -46,29 +47,25 @@ public class UnsafeDecimalColumnPage extends DecimalColumnPage {
   }
 
   private void initMemory() throws MemoryException {
-    switch (dataType) {
-      case BYTE:
-      case SHORT:
-      case INT:
-      case LONG:
-        int size = pageSize << dataType.getSizeBits();
-        memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
-        baseAddress = memoryBlock.getBaseObject();
-        baseOffset = memoryBlock.getBaseOffset();
-        break;
-      case SHORT_INT:
-        size = pageSize * 3;
-        memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
-        baseAddress = memoryBlock.getBaseObject();
-        baseOffset = memoryBlock.getBaseOffset();
-        break;
-      case DECIMAL:
-        memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) (capacity));
-        baseAddress = memoryBlock.getBaseObject();
-        baseOffset = memoryBlock.getBaseOffset();
-        break;
-      default:
-        throw new UnsupportedOperationException("invalid data type: " + dataType);
+    if (dataType == DataTypes.BYTE ||
+        dataType == DataTypes.SHORT ||
+        dataType == DataTypes.INT ||
+        dataType == DataTypes.LONG) {
+      int size = pageSize << dataType.getSizeBits();
+      memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
+      baseAddress = memoryBlock.getBaseObject();
+      baseOffset = memoryBlock.getBaseOffset();
+    } else if (dataType == DataTypes.SHORT_INT) {
+      int size = pageSize * 3;
+      memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
+      baseAddress = memoryBlock.getBaseObject();
+      baseOffset = memoryBlock.getBaseOffset();
+    } else if (dataType == DataTypes.DECIMAL) {
+      memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) (capacity));
+      baseAddress = memoryBlock.getBaseObject();
+      baseOffset = memoryBlock.getBaseOffset();
+    } else {
+      throw new UnsupportedOperationException("invalid data type: " + dataType);
     }
   }
 
@@ -236,28 +233,22 @@ public class UnsafeDecimalColumnPage extends DecimalColumnPage {
   @Override
   public BigDecimal getDecimal(int rowId) {
     long value;
-    switch (dataType) {
-      case BYTE:
-        value = getByte(rowId);
-        break;
-      case SHORT:
-        value = getShort(rowId);
-        break;
-      case SHORT_INT:
-        value = getShortInt(rowId);
-        break;
-      case INT:
-        value = getInt(rowId);
-        break;
-      case LONG:
-        value = getLong(rowId);
-        break;
-      default:
-        int length = rowOffset[rowId + 1] - rowOffset[rowId];
-        byte[] bytes = new byte[length];
-        CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset[rowId], bytes,
-            CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
-        return decimalConverter.getDecimal(bytes);
+    if (dataType == DataTypes.BYTE) {
+      value = getByte(rowId);
+    } else if (dataType == DataTypes.SHORT) {
+      value = getShort(rowId);
+    } else if (dataType == DataTypes.SHORT_INT) {
+      value = getShortInt(rowId);
+    } else if (dataType == DataTypes.INT) {
+      value = getInt(rowId);
+    } else if (dataType == DataTypes.LONG) {
+      value = getLong(rowId);
+    } else {
+      int length = rowOffset[rowId + 1] - rowOffset[rowId];
+      byte[] bytes = new byte[length];
+      CarbonUnsafe.getUnsafe().copyMemory(baseAddress, baseOffset + rowOffset[rowId], bytes,
+          CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
+      return decimalConverter.getDecimal(bytes);
     }
     return decimalConverter.getDecimal(value);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
index 7b55889..055c27a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
@@ -27,10 +27,10 @@ import org.apache.carbondata.core.memory.MemoryBlock;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 
-import static org.apache.carbondata.core.metadata.datatype.DataType.BYTE;
 
 // This extension uses unsafe memory to store page data, for fix length data type only (byte,
 // short, integer, long, float, double)
@@ -46,37 +46,33 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
 
   private final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
 
-  private static final int byteBits = BYTE.getSizeBits();
-  private static final int shortBits = DataType.SHORT.getSizeBits();
-  private static final int intBits = DataType.INT.getSizeBits();
-  private static final int longBits = DataType.LONG.getSizeBits();
-  private static final int floatBits = DataType.FLOAT.getSizeBits();
-  private static final int doubleBits = DataType.DOUBLE.getSizeBits();
+  private static final int byteBits = DataTypes.BYTE.getSizeBits();
+  private static final int shortBits = DataTypes.SHORT.getSizeBits();
+  private static final int intBits = DataTypes.INT.getSizeBits();
+  private static final int longBits = DataTypes.LONG.getSizeBits();
+  private static final int floatBits = DataTypes.FLOAT.getSizeBits();
+  private static final int doubleBits = DataTypes.DOUBLE.getSizeBits();
 
   UnsafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize)
       throws MemoryException {
     super(columnSpec, dataType, pageSize);
-    switch (dataType) {
-      case BYTE:
-      case SHORT:
-      case INT:
-      case LONG:
-      case FLOAT:
-      case DOUBLE:
-        int size = pageSize << dataType.getSizeBits();
-        memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
-        baseAddress = memoryBlock.getBaseObject();
-        baseOffset = memoryBlock.getBaseOffset();
-        break;
-      case SHORT_INT:
-        size = pageSize * 3;
-        memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
-        baseAddress = memoryBlock.getBaseObject();
-        baseOffset = memoryBlock.getBaseOffset();
-        break;
-      case DECIMAL:
-      case STRING:
-        throw new UnsupportedOperationException("invalid data type: " + dataType);
+    if (dataType == DataTypes.BYTE ||
+        dataType == DataTypes.SHORT ||
+        dataType == DataTypes.INT ||
+        dataType == DataTypes.LONG ||
+        dataType == DataTypes.FLOAT ||
+        dataType == DataTypes.DOUBLE) {
+      int size = pageSize << dataType.getSizeBits();
+      memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
+      baseAddress = memoryBlock.getBaseObject();
+      baseOffset = memoryBlock.getBaseOffset();
+    } else if (dataType == DataTypes.SHORT_INT) {
+      int size = pageSize * 3;
+      memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, size);
+      baseAddress = memoryBlock.getBaseObject();
+      baseOffset = memoryBlock.getBaseOffset();
+    } else if (dataType == DataTypes.DECIMAL || dataType == DataTypes.STRING) {
+      throw new UnsupportedOperationException("invalid data type: " + dataType);
     }
   }
 
@@ -330,45 +326,38 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
   @Override
   public void convertValue(ColumnPageValueConverter codec) {
     int pageSize = getPageSize();
-    switch (dataType) {
-      case BYTE:
-        for (int i = 0; i < pageSize; i++) {
-          long offset = i << byteBits;
-          codec.encode(i, CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + offset));
-        }
-        break;
-      case SHORT:
-        for (int i = 0; i < pageSize; i++) {
-          long offset = i << shortBits;
-          codec.encode(i, CarbonUnsafe.getUnsafe().getShort(baseAddress, baseOffset + offset));
-        }
-        break;
-      case INT:
-        for (int i = 0; i < pageSize; i++) {
-          long offset = i << intBits;
-          codec.encode(i, CarbonUnsafe.getUnsafe().getInt(baseAddress, baseOffset + offset));
-        }
-        break;
-      case LONG:
-        for (int i = 0; i < pageSize; i++) {
-          long offset = i << longBits;
-          codec.encode(i, CarbonUnsafe.getUnsafe().getLong(baseAddress, baseOffset + offset));
-        }
-        break;
-      case FLOAT:
-        for (int i = 0; i < pageSize; i++) {
-          long offset = i << floatBits;
-          codec.encode(i, CarbonUnsafe.getUnsafe().getFloat(baseAddress, baseOffset + offset));
-        }
-        break;
-      case DOUBLE:
-        for (int i = 0; i < pageSize; i++) {
-          long offset = i << doubleBits;
-          codec.encode(i, CarbonUnsafe.getUnsafe().getDouble(baseAddress, baseOffset + offset));
-        }
-        break;
-      default:
-        throw new UnsupportedOperationException("invalid data type: " + dataType);
+    if (dataType == DataTypes.BYTE) {
+      for (int i = 0; i < pageSize; i++) {
+        long offset = i << byteBits;
+        codec.encode(i, CarbonUnsafe.getUnsafe().getByte(baseAddress, baseOffset + offset));
+      }
+    } else if (dataType == DataTypes.SHORT) {
+      for (int i = 0; i < pageSize; i++) {
+        long offset = i << shortBits;
+        codec.encode(i, CarbonUnsafe.getUnsafe().getShort(baseAddress, baseOffset + offset));
+      }
+    } else if (dataType == DataTypes.INT) {
+      for (int i = 0; i < pageSize; i++) {
+        long offset = i << intBits;
+        codec.encode(i, CarbonUnsafe.getUnsafe().getInt(baseAddress, baseOffset + offset));
+      }
+    } else if (dataType == DataTypes.LONG) {
+      for (int i = 0; i < pageSize; i++) {
+        long offset = i << longBits;
+        codec.encode(i, CarbonUnsafe.getUnsafe().getLong(baseAddress, baseOffset + offset));
+      }
+    } else if (dataType == DataTypes.FLOAT) {
+      for (int i = 0; i < pageSize; i++) {
+        long offset = i << floatBits;
+        codec.encode(i, CarbonUnsafe.getUnsafe().getFloat(baseAddress, baseOffset + offset));
+      }
+    } else if (dataType == DataTypes.DOUBLE) {
+      for (int i = 0; i < pageSize; i++) {
+        long offset = i << doubleBits;
+        codec.encode(i, CarbonUnsafe.getUnsafe().getDouble(baseAddress, baseOffset + offset));
+      }
+    } else {
+      throw new UnsupportedOperationException("invalid data type: " + dataType);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
index 276b7ff..60c7112 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -27,19 +27,20 @@ import org.apache.carbondata.core.memory.MemoryBlock;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 
-import static org.apache.carbondata.core.metadata.datatype.DataType.BYTE;
-import static org.apache.carbondata.core.metadata.datatype.DataType.DECIMAL;
+import static org.apache.carbondata.core.metadata.datatype.DataTypes.BYTE;
+import static org.apache.carbondata.core.metadata.datatype.DataTypes.DECIMAL;
 
 public abstract class VarLengthColumnPageBase extends ColumnPage {
 
   static final int byteBits = BYTE.getSizeBits();
-  static final int shortBits = DataType.SHORT.getSizeBits();
-  static final int intBits = DataType.INT.getSizeBits();
-  static final int longBits = DataType.LONG.getSizeBits();
+  static final int shortBits = DataTypes.SHORT.getSizeBits();
+  static final int intBits = DataTypes.INT.getSizeBits();
+  static final int longBits = DataTypes.LONG.getSizeBits();
   // default size for each row, grows as needed
   static final int DEFAULT_ROW_SIZE = 8;
 
@@ -115,7 +116,7 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
             columnSpec.getScale());
     int size = decimalConverter.getSize();
     if (size < 0) {
-      return getLVBytesColumnPage(columnSpec, lvEncodedBytes, DataType.DECIMAL);
+      return getLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.DECIMAL);
     } else {
       // Here the size is always fixed.
       return getDecimalColumnPage(columnSpec, lvEncodedBytes, size);
@@ -127,7 +128,7 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
    */
   static ColumnPage newLVBytesColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes)
       throws MemoryException {
-    return getLVBytesColumnPage(columnSpec, lvEncodedBytes, DataType.BYTE_ARRAY);
+    return getLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY);
   }
 
   private static ColumnPage getDecimalColumnPage(TableSpec.ColumnSpec columnSpec,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
index 3b5ae57..15e26e7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
@@ -33,7 +33,7 @@ import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.compress.DirectCompressCodec;
 import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.BlockletMinMaxIndex;
@@ -149,9 +149,9 @@ public abstract class ColumnPageEncoder {
   private static EncodedColumnPage encodeChildColumn(byte[][] data)
       throws IOException, MemoryException {
     TableSpec.ColumnSpec spec =
-        new TableSpec.ColumnSpec("complex_inner_column", DataType.BYTE_ARRAY, ColumnType.COMPLEX);
+        new TableSpec.ColumnSpec("complex_inner_column", DataTypes.BYTE_ARRAY, ColumnType.COMPLEX);
     ColumnPage page = ColumnPage.wrapByteArrayPage(spec, data);
-    ColumnPageEncoder encoder = new DirectCompressCodec(DataType.BYTE_ARRAY).createEncoder(null);
+    ColumnPageEncoder encoder = new DirectCompressCodec(DataTypes.BYTE_ARRAY).createEncoder(null);
     return encoder.encode(page);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
index 3f8fca6..daccc32 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.Writable;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
@@ -87,7 +88,7 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
   @Override
   public void write(DataOutput out) throws IOException {
     columnSpec.write(out);
-    out.writeByte(storeDataType.ordinal());
+    out.writeByte(storeDataType.getId());
     out.writeInt(getDecimal());
     out.writeByte(getDataTypeSelected());
     writeMinMax(out);
@@ -98,7 +99,7 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
   public void readFields(DataInput in) throws IOException {
     columnSpec = new TableSpec.ColumnSpec();
     columnSpec.readFields(in);
-    storeDataType = DataType.valueOf(in.readByte());
+    storeDataType = DataTypes.valueOf(in.readByte());
     setDecimal(in.readInt());
     setDataTypeSelected(in.readByte());
     readMinMax(in);
@@ -106,109 +107,95 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
   }
 
   private void writeMinMax(DataOutput out) throws IOException {
-    switch (columnSpec.getSchemaDataType()) {
-      case BYTE:
-        out.writeByte((byte) getMaxValue());
-        out.writeByte((byte) getMinValue());
-        out.writeLong(0L); // unique value is obsoleted, maintain for compatibility
-        break;
-      case SHORT:
-        out.writeShort((short) getMaxValue());
-        out.writeShort((short) getMinValue());
-        out.writeLong(0L); // unique value is obsoleted, maintain for compatibility
-        break;
-      case INT:
-        out.writeInt((int) getMaxValue());
-        out.writeInt((int) getMinValue());
-        out.writeLong(0L); // unique value is obsoleted, maintain for compatibility
-        break;
-      case LONG:
-        out.writeLong((Long) getMaxValue());
-        out.writeLong((Long) getMinValue());
-        out.writeLong(0L); // unique value is obsoleted, maintain for compatibility
-        break;
-      case DOUBLE:
-        out.writeDouble((Double) getMaxValue());
-        out.writeDouble((Double) getMinValue());
-        out.writeDouble(0d); // unique value is obsoleted, maintain for compatibility
-        break;
-      case DECIMAL:
-        byte[] maxAsBytes = getMaxAsBytes(columnSpec.getSchemaDataType());
-        byte[] minAsBytes = getMinAsBytes(columnSpec.getSchemaDataType());
-        byte[] unique = DataTypeUtil.bigDecimalToByte(BigDecimal.ZERO);
-        out.writeShort((short) maxAsBytes.length);
-        out.write(maxAsBytes);
-        out.writeShort((short) minAsBytes.length);
-        out.write(minAsBytes);
-        // unique value is obsoleted, maintain for compatibility
-        out.writeShort((short) unique.length);
-        out.write(unique);
-        out.writeInt(scale);
-        out.writeInt(precision);
-        break;
-      case BYTE_ARRAY:
-        // for complex type, it will come here, ignoring stats for complex type
-        // TODO: support stats for complex type
-        break;
-      default:
-        throw new IllegalArgumentException("invalid data type: " + storeDataType);
+    DataType dataType = columnSpec.getSchemaDataType();
+    if (dataType == DataTypes.BYTE) {
+      out.writeByte((byte) getMaxValue());
+      out.writeByte((byte) getMinValue());
+      out.writeLong(0L); // unique value is obsoleted, maintain for compatibility
+    } else if (dataType == DataTypes.SHORT) {
+      out.writeShort((short) getMaxValue());
+      out.writeShort((short) getMinValue());
+      out.writeLong(0L); // unique value is obsoleted, maintain for compatibility
+    } else if (dataType == DataTypes.INT) {
+      out.writeInt((int) getMaxValue());
+      out.writeInt((int) getMinValue());
+      out.writeLong(0L); // unique value is obsoleted, maintain for compatibility
+    } else if (dataType == DataTypes.LONG) {
+      out.writeLong((Long) getMaxValue());
+      out.writeLong((Long) getMinValue());
+      out.writeLong(0L); // unique value is obsoleted, maintain for compatibility
+    } else if (dataType == DataTypes.DOUBLE) {
+      out.writeDouble((Double) getMaxValue());
+      out.writeDouble((Double) getMinValue());
+      out.writeDouble(0d); // unique value is obsoleted, maintain for compatibility
+    } else if (dataType == DataTypes.DECIMAL) {
+      byte[] maxAsBytes = getMaxAsBytes(columnSpec.getSchemaDataType());
+      byte[] minAsBytes = getMinAsBytes(columnSpec.getSchemaDataType());
+      byte[] unique = DataTypeUtil.bigDecimalToByte(BigDecimal.ZERO);
+      out.writeShort((short) maxAsBytes.length);
+      out.write(maxAsBytes);
+      out.writeShort((short) minAsBytes.length);
+      out.write(minAsBytes);
+      // unique value is obsoleted, maintain for compatibility
+      out.writeShort((short) unique.length);
+      out.write(unique);
+      out.writeInt(scale);
+      out.writeInt(precision);
+    } else if (dataType == DataTypes.BYTE_ARRAY) {
+      // for complex type, it will come here, ignoring stats for complex type
+      // TODO: support stats for complex type
+    } else {
+      throw new IllegalArgumentException("invalid data type: " + storeDataType);
     }
   }
 
   private void readMinMax(DataInput in) throws IOException {
-    switch (columnSpec.getSchemaDataType()) {
-      case BYTE:
-        this.setMaxValue(in.readByte());
-        this.setMinValue(in.readByte());
-        in.readLong();  // for non exist value which is obsoleted, it is backward compatibility;
-        break;
-      case SHORT:
-        this.setMaxValue(in.readShort());
-        this.setMinValue(in.readShort());
-        in.readLong();  // for non exist value which is obsoleted, it is backward compatibility;
-        break;
-      case INT:
-        this.setMaxValue(in.readInt());
-        this.setMinValue(in.readInt());
-        in.readLong();  // for non exist value which is obsoleted, it is backward compatibility;
-        break;
-      case LONG:
-        this.setMaxValue(in.readLong());
-        this.setMinValue(in.readLong());
-        in.readLong();  // for non exist value which is obsoleted, it is backward compatibility;
-        break;
-      case DOUBLE:
-        this.setMaxValue(in.readDouble());
-        this.setMinValue(in.readDouble());
-        in.readDouble(); // for non exist value which is obsoleted, it is backward compatibility;
-        break;
-      case DECIMAL:
-        byte[] max = new byte[in.readShort()];
-        in.readFully(max);
-        this.setMaxValue(DataTypeUtil.byteToBigDecimal(max));
-        byte[] min = new byte[in.readShort()];
-        in.readFully(min);
-        this.setMinValue(DataTypeUtil.byteToBigDecimal(min));
-        // unique value is obsoleted, maintain for compatiability
-        short uniqueLength = in.readShort();
-        in.readFully(new byte[uniqueLength]);
-        this.scale = in.readInt();
-        this.precision = in.readInt();
-        break;
-      case BYTE_ARRAY:
-        // for complex type, it will come here, ignoring stats for complex type
-        // TODO: support stats for complex type
-        break;
-      default:
-        throw new IllegalArgumentException("invalid data type: " + storeDataType);
+    DataType dataType = columnSpec.getSchemaDataType();
+    if (dataType == DataTypes.BYTE) {
+      this.setMaxValue(in.readByte());
+      this.setMinValue(in.readByte());
+      in.readLong();  // for non exist value which is obsoleted, it is backward compatibility;
+    } else if (dataType == DataTypes.SHORT) {
+      this.setMaxValue(in.readShort());
+      this.setMinValue(in.readShort());
+      in.readLong();  // for non exist value which is obsoleted, it is backward compatibility;
+    } else if (dataType == DataTypes.INT) {
+      this.setMaxValue(in.readInt());
+      this.setMinValue(in.readInt());
+      in.readLong();  // for non exist value which is obsoleted, it is backward compatibility;
+    } else if (dataType == DataTypes.LONG) {
+      this.setMaxValue(in.readLong());
+      this.setMinValue(in.readLong());
+      in.readLong();  // for non exist value which is obsoleted, it is backward compatibility;
+    } else if (dataType == DataTypes.DOUBLE) {
+      this.setMaxValue(in.readDouble());
+      this.setMinValue(in.readDouble());
+      in.readDouble(); // for non exist value which is obsoleted, it is backward compatibility;
+    } else if (dataType == DataTypes.DECIMAL) {
+      byte[] max = new byte[in.readShort()];
+      in.readFully(max);
+      this.setMaxValue(DataTypeUtil.byteToBigDecimal(max));
+      byte[] min = new byte[in.readShort()];
+      in.readFully(min);
+      this.setMinValue(DataTypeUtil.byteToBigDecimal(min));
+      // unique value is obsoleted, maintain for compatiability
+      short uniqueLength = in.readShort();
+      in.readFully(new byte[uniqueLength]);
+      this.scale = in.readInt();
+      this.precision = in.readInt();
+    } else if (dataType == DataTypes.BYTE_ARRAY) {
+      // for complex type, it will come here, ignoring stats for complex type
+      // TODO: support stats for complex type
+    } else {
+      throw new IllegalArgumentException("invalid data type: " + storeDataType);
     }
   }
 
-  public byte[] getMaxAsBytes(DataType dataType) {
+  private byte[] getMaxAsBytes(DataType dataType) {
     return getValueAsBytes(getMaxValue(), dataType);
   }
 
-  public byte[] getMinAsBytes(DataType dataType) {
+  private byte[] getMinAsBytes(DataType dataType) {
     return getValueAsBytes(getMinValue(), dataType);
   }
 
@@ -217,40 +204,38 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
    */
   private byte[] getValueAsBytes(Object value, DataType dataType) {
     ByteBuffer b;
-    switch (dataType) {
-      case BYTE:
-        b = ByteBuffer.allocate(8);
-        b.putLong((byte) value);
-        b.flip();
-        return b.array();
-      case SHORT:
-        b = ByteBuffer.allocate(8);
-        b.putLong((short) value);
-        b.flip();
-        return b.array();
-      case INT:
-        b = ByteBuffer.allocate(8);
-        b.putLong((int) value);
-        b.flip();
-        return b.array();
-      case LONG:
-        b = ByteBuffer.allocate(8);
-        b.putLong((long) value);
-        b.flip();
-        return b.array();
-      case DOUBLE:
-        b = ByteBuffer.allocate(8);
-        b.putDouble((double) value);
-        b.flip();
-        return b.array();
-      case DECIMAL:
-        return DataTypeUtil.bigDecimalToByte((BigDecimal)value);
-      case STRING:
-      case TIMESTAMP:
-      case DATE:
-        return (byte[]) value;
-      default:
-        throw new IllegalArgumentException("Invalid data type: " + storeDataType);
+    if (dataType == DataTypes.BYTE_ARRAY) {
+      b = ByteBuffer.allocate(8);
+      b.putLong((byte) value);
+      b.flip();
+      return b.array();
+    } else if (dataType == DataTypes.SHORT) {
+      b = ByteBuffer.allocate(8);
+      b.putLong((short) value);
+      b.flip();
+      return b.array();
+    } else if (dataType == DataTypes.INT) {
+      b = ByteBuffer.allocate(8);
+      b.putLong((int) value);
+      b.flip();
+      return b.array();
+    } else if (dataType == DataTypes.LONG) {
+      b = ByteBuffer.allocate(8);
+      b.putLong((long) value);
+      b.flip();
+      return b.array();
+    } else if (dataType == DataTypes.DOUBLE) {
+      b = ByteBuffer.allocate(8);
+      b.putDouble((double) value);
+      b.flip();
+      return b.array();
+    } else if (dataType == DataTypes.DECIMAL) {
+      return DataTypeUtil.bigDecimalToByte((BigDecimal) value);
+    } else if (dataType == DataTypes.STRING || dataType == DataTypes.TIMESTAMP
+        || dataType == DataTypes.DATE) {
+      return (byte[]) value;
+    } else {
+      throw new IllegalArgumentException("Invalid data type: " + storeDataType);
     }
   }