You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/30 09:21:54 UTC

[04/35] carbondata git commit: [CARBONDATA-1539] Change data type from enum to class

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 1addd03..ef89771 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.types._
 import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
 import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, ColumnIdentifier}
-import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.util.DataTypeUtil
@@ -165,34 +165,34 @@ case class CarbonDictionaryDecoder(
              """.stripMargin
 
             val caseCode = getDictionaryColumnIds(index)._3.getDataType match {
-              case DataType.INT =>
+              case CarbonDataTypes.INT =>
                 s"""
                    |int $value = Integer.parseInt(new String($valueIntern,
                    |org.apache.carbondata.core.constants.CarbonCommonConstants
                    |.DEFAULT_CHARSET_CLASS));
                  """.stripMargin
-              case DataType.SHORT =>
+              case CarbonDataTypes.SHORT =>
                 s"""
                    |short $value =
                    |Short.parseShort(new String($valueIntern,
                    |org.apache.carbondata.core.constants.CarbonCommonConstants
                    |.DEFAULT_CHARSET_CLASS));
                  """.stripMargin
-              case DataType.DOUBLE =>
+              case CarbonDataTypes.DOUBLE =>
                 s"""
                    |double $value =
                    |Double.parseDouble(new String($valueIntern,
                    |org.apache.carbondata.core.constants.CarbonCommonConstants
                    |.DEFAULT_CHARSET_CLASS));
                  """.stripMargin
-              case DataType.LONG =>
+              case CarbonDataTypes.LONG =>
                 s"""
                    |long $value =
                    |Long.parseLong(new String($valueIntern,
                    |org.apache.carbondata.core.constants.CarbonCommonConstants
                    |.DEFAULT_CHARSET_CLASS));
                  """.stripMargin
-              case DataType.DECIMAL =>
+              case CarbonDataTypes.DECIMAL =>
                 s"""
                    |org.apache.spark.sql.types.Decimal $value =
                    |Decimal.apply(new java.math.BigDecimal(
@@ -382,13 +382,13 @@ object CarbonDictionaryDecoder {
   def convertCarbonToSparkDataType(carbonDimension: CarbonDimension,
       relation: CarbonRelation): types.DataType = {
     carbonDimension.getDataType match {
-      case DataType.STRING => StringType
-      case DataType.SHORT => ShortType
-      case DataType.INT => IntegerType
-      case DataType.LONG => LongType
-      case DataType.DOUBLE => DoubleType
-      case DataType.BOOLEAN => BooleanType
-      case DataType.DECIMAL =>
+      case CarbonDataTypes.STRING => StringType
+      case CarbonDataTypes.SHORT => ShortType
+      case CarbonDataTypes.INT => IntegerType
+      case CarbonDataTypes.LONG => LongType
+      case CarbonDataTypes.DOUBLE => DoubleType
+      case CarbonDataTypes.BOOLEAN => BooleanType
+      case CarbonDataTypes.DECIMAL =>
         val scale: Int = carbonDimension.getColumnSchema.getScale
         val precision: Int = carbonDimension.getColumnSchema.getPrecision
         if (scale == 0 && precision == 0) {
@@ -396,12 +396,12 @@ object CarbonDictionaryDecoder {
         } else {
           DecimalType(precision, scale)
         }
-      case DataType.TIMESTAMP => TimestampType
-      case DataType.DATE => DateType
-      case DataType.STRUCT =>
+      case CarbonDataTypes.TIMESTAMP => TimestampType
+      case CarbonDataTypes.DATE => DateType
+      case CarbonDataTypes.STRUCT =>
         CarbonMetastoreTypes
           .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>")
-      case DataType.ARRAY =>
+      case CarbonDataTypes.ARRAY =>
         CarbonMetastoreTypes
           .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>")
     }
@@ -467,13 +467,13 @@ class CarbonDecoderRDD(
   def convertCarbonToSparkDataType(carbonDimension: CarbonDimension,
       relation: CarbonRelation): types.DataType = {
     carbonDimension.getDataType match {
-      case DataType.STRING => StringType
-      case DataType.SHORT => ShortType
-      case DataType.INT => IntegerType
-      case DataType.LONG => LongType
-      case DataType.DOUBLE => DoubleType
-      case DataType.BOOLEAN => BooleanType
-      case DataType.DECIMAL =>
+      case CarbonDataTypes.STRING => StringType
+      case CarbonDataTypes.SHORT => ShortType
+      case CarbonDataTypes.INT => IntegerType
+      case CarbonDataTypes.LONG => LongType
+      case CarbonDataTypes.DOUBLE => DoubleType
+      case CarbonDataTypes.BOOLEAN => BooleanType
+      case CarbonDataTypes.DECIMAL =>
         val scale: Int = carbonDimension.getColumnSchema.getScale
         val precision: Int = carbonDimension.getColumnSchema.getPrecision
         if (scale == 0 && precision == 0) {
@@ -481,12 +481,12 @@ class CarbonDecoderRDD(
         } else {
           DecimalType(precision, scale)
         }
-      case DataType.TIMESTAMP => TimestampType
-      case DataType.DATE => DateType
-      case DataType.STRUCT =>
+      case CarbonDataTypes.TIMESTAMP => TimestampType
+      case CarbonDataTypes.DATE => DateType
+      case CarbonDataTypes.STRUCT =>
         CarbonMetastoreTypes
           .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>")
-      case DataType.ARRAY =>
+      case CarbonDataTypes.ARRAY =>
         CarbonMetastoreTypes
           .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>")
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index f245df6..6bac0da 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql.hive
 
+import java.util
 import java.util.LinkedHashSet
 
 import scala.Array.canBuildFrom
@@ -28,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Stati
 import org.apache.spark.sql.types._
 
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.datatype.DataType.DECIMAL
+import org.apache.carbondata.core.metadata.datatype.DataTypes.DECIMAL
 import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.path.CarbonStorePath
@@ -45,7 +46,7 @@ case class CarbonRelation(
   extends LeafNode with MultiInstanceRelation {
 
   def recursiveMethod(dimName: String, childDim: CarbonDimension): String = {
-    childDim.getDataType.toString.toLowerCase match {
+    childDim.getDataType.getName.toLowerCase match {
       case "array" => s"${
         childDim.getColName.substring(dimName.length + 1)
       }:array<${ getArrayChildren(childDim.getColName) }>"
@@ -58,7 +59,7 @@ case class CarbonRelation(
 
   def getArrayChildren(dimName: String): String = {
     metaData.carbonTable.getChildren(dimName).asScala.map(childDim => {
-      childDim.getDataType.toString.toLowerCase match {
+      childDim.getDataType.getName.toLowerCase match {
         case "array" => s"array<${ getArrayChildren(childDim.getColName) }>"
         case "struct" => s"struct<${ getStructChildren(childDim.getColName) }>"
         case dType => addDecimalScaleAndPrecision(childDim, dType)
@@ -68,7 +69,7 @@ case class CarbonRelation(
 
   def getStructChildren(dimName: String): String = {
     metaData.carbonTable.getChildren(dimName).asScala.map(childDim => {
-      childDim.getDataType.toString.toLowerCase match {
+      childDim.getDataType.getName.toLowerCase match {
         case "array" => s"${
           childDim.getColName.substring(dimName.length + 1)
         }:array<${ getArrayChildren(childDim.getColName) }>"
@@ -95,8 +96,7 @@ case class CarbonRelation(
     sett.asScala.toSeq.map(dim => {
       val dimval = metaData.carbonTable
         .getDimensionByName(metaData.carbonTable.getFactTableName, dim.getColName)
-      val output: DataType = dimval.getDataType
-        .toString.toLowerCase match {
+      val output: DataType = dimval.getDataType.getName.toLowerCase match {
         case "array" =>
           CarbonMetastoreTypes.toDataType(s"array<${ getArrayChildren(dim.getColName) }>")
         case "struct" =>
@@ -118,14 +118,17 @@ case class CarbonRelation(
     new LinkedHashSet(
       tableMeta.carbonTable.
         getMeasureByTableName(tableMeta.carbonTable.getFactTableName).
-        asScala.asJava).asScala.toSeq
-      .map(x => AttributeReference(x.getColName, CarbonMetastoreTypes.toDataType(
-        metaData.carbonTable.getMeasureByName(factTable, x.getColName).getDataType.toString
-          .toLowerCase match {
-          case "decimal" => "decimal(" + x.getPrecision + "," + x.getScale + ")"
-          case others => others
-        }),
-        nullable = true)())
+        asScala.asJava).asScala.toSeq.map { x =>
+      val metastoreType = metaData.carbonTable.getMeasureByName(factTable, x.getColName)
+        .getDataType.getName.toLowerCase match {
+        case "decimal" => "decimal(" + x.getPrecision + "," + x.getScale + ")"
+        case others => others
+      }
+      AttributeReference(
+        x.getColName,
+        CarbonMetastoreTypes.toDataType(metastoreType),
+        nullable = true)()
+    }
   }
 
   override val output = {
@@ -134,7 +137,7 @@ case class CarbonRelation(
     // convert each column to Attribute
     columns.filter(!_.isInvisible).map { column =>
       if (column.isDimension()) {
-        val output: DataType = column.getDataType.toString.toLowerCase match {
+        val output: DataType = column.getDataType.getName.toLowerCase match {
           case "array" =>
             CarbonMetastoreTypes.toDataType(s"array<${getArrayChildren(column.getColName)}>")
           case "struct" =>
@@ -147,8 +150,7 @@ case class CarbonRelation(
           qualifier = Option(tableName + "." + column.getColName))
       } else {
         val output = CarbonMetastoreTypes.toDataType {
-          column.getDataType.toString
-            .toLowerCase match {
+          column.getDataType.getName.toLowerCase match {
             case "decimal" => "decimal(" + column.getColumnSchema.getPrecision + "," + column
               .getColumnSchema.getScale + ")"
             case others => others

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index 73c9760..dbc807d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.types._
 import org.apache.spark.sql.CarbonContainsWith
 import org.apache.spark.sql.CarbonEndsWith
 
-import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
 import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
@@ -134,9 +134,9 @@ object CarbonFilters {
     def getCarbonLiteralExpression(name: String, value: Any): CarbonExpression = {
       val dataTypeOfAttribute = CarbonScalaUtil.convertSparkToCarbonDataType(dataTypeOf(name))
       val dataType = if (Option(value).isDefined
-                         && dataTypeOfAttribute == DataType.STRING
+                         && dataTypeOfAttribute == CarbonDataTypes.STRING
                          && value.isInstanceOf[Double]) {
-        DataType.DOUBLE
+        CarbonDataTypes.DOUBLE
       } else {
         dataTypeOfAttribute
       }
@@ -396,24 +396,6 @@ object CarbonFilters {
     }
   }
 
-  private def getActualCarbonDataType(column: String, carbonTable: CarbonTable) = {
-    var carbonColumn: CarbonColumn =
-      carbonTable.getDimensionByName(carbonTable.getFactTableName, column)
-    val dataType = if (carbonColumn != null) {
-      carbonColumn.getDataType
-    } else {
-      carbonColumn = carbonTable.getMeasureByName(carbonTable.getFactTableName, column)
-      carbonColumn.getDataType match {
-        case DataType.INT => DataType.INT
-        case DataType.SHORT => DataType.SHORT
-        case DataType.LONG => DataType.LONG
-        case DataType.DECIMAL => DataType.DECIMAL
-        case _ => DataType.DOUBLE
-      }
-    }
-    CarbonScalaUtil.convertCarbonToSparkDataType(dataType)
-  }
-
   // Convert scala list to java list, Cannot use scalaList.asJava as while deserializing it is
   // not able find the classes inside scala list and gives ClassNotFoundException.
   private def convertToJavaList(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
index e90fd4a..a340ab1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
@@ -176,8 +176,8 @@ public class StructDataType implements GenericDataType<StructObject> {
   }
 
   /*
-       * parse bytearray and bit pack
-       */
+   * parse bytearray and bit pack
+   */
   @Override
   public void parseAndBitPack(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
       KeyGenerator[] generator) throws IOException, KeyGenException {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 2efbe26..193d3e6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -25,6 +25,8 @@ import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.dictionary.client.DictionaryClient;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
@@ -108,35 +110,37 @@ public class FieldEncoderFactory {
       Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
       CarbonTableIdentifier carbonTableIdentifier, DictionaryClient client, Boolean useOnePass,
       String storePath, Map<Object, Integer> localCache) {
-    switch (carbonColumn.getDataType()) {
-      case ARRAY:
-        List<CarbonDimension> listOfChildDimensions =
-            ((CarbonDimension) carbonColumn).getListOfChildDimensions();
-        // Create array parser with complex delimiter
-        ArrayDataType arrayDataType =
-            new ArrayDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId());
-        for (CarbonDimension dimension : listOfChildDimensions) {
-          arrayDataType.addChildren(createComplexType(dimension, carbonColumn.getColName(), cache,
-              carbonTableIdentifier, client, useOnePass, storePath, localCache));
-        }
-        return arrayDataType;
-      case STRUCT:
-        List<CarbonDimension> dimensions =
-            ((CarbonDimension) carbonColumn).getListOfChildDimensions();
-        // Create struct parser with complex delimiter
-        StructDataType structDataType =
-            new StructDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId());
-        for (CarbonDimension dimension : dimensions) {
-          structDataType.addChildren(createComplexType(dimension, carbonColumn.getColName(), cache,
-              carbonTableIdentifier, client, useOnePass, storePath, localCache));
-        }
-        return structDataType;
-      case MAP:
-        throw new UnsupportedOperationException("Complex type Map is not supported yet");
-      default:
-        return new PrimitiveDataType(carbonColumn.getColName(), parentName,
-            carbonColumn.getColumnId(), (CarbonDimension) carbonColumn, cache,
-            carbonTableIdentifier, client, useOnePass, storePath, localCache);
+    DataType dataType = carbonColumn.getDataType();
+    if (dataType == DataTypes.ARRAY) {
+      List<CarbonDimension> listOfChildDimensions =
+          ((CarbonDimension) carbonColumn).getListOfChildDimensions();
+      // Create array parser with complex delimiter
+      ArrayDataType arrayDataType =
+          new ArrayDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId());
+      for (CarbonDimension dimension : listOfChildDimensions) {
+        arrayDataType.addChildren(
+            createComplexType(dimension, carbonColumn.getColName(), cache, carbonTableIdentifier,
+                client, useOnePass, storePath, localCache));
+      }
+      return arrayDataType;
+    } else if (dataType == DataTypes.STRUCT) {
+      List<CarbonDimension> dimensions =
+          ((CarbonDimension) carbonColumn).getListOfChildDimensions();
+      // Create struct parser with complex delimiter
+      StructDataType structDataType =
+          new StructDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId());
+      for (CarbonDimension dimension : dimensions) {
+        structDataType.addChildren(
+            createComplexType(dimension, carbonColumn.getColName(), cache, carbonTableIdentifier,
+                client, useOnePass, storePath, localCache));
+      }
+      return structDataType;
+    } else if (dataType == DataTypes.MAP) {
+      throw new UnsupportedOperationException("Complex type Map is not supported yet");
+    } else {
+      return new PrimitiveDataType(carbonColumn.getColName(), parentName,
+          carbonColumn.getColumnId(), (CarbonDimension) carbonColumn, cache, carbonTableIdentifier,
+          client, useOnePass, storePath, localCache);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
index 8170680..eac9d69 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.processing.loading.converter.impl;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.loading.DataField;
@@ -52,7 +53,7 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter {
 
   @Override public void convert(CarbonRow row, BadRecordLogHolder logHolder) {
     String dimensionValue = row.getString(index);
-    if (null == dimensionValue && column.getDataType() != DataType.STRING) {
+    if (null == dimensionValue && column.getDataType() != DataTypes.STRING) {
       logHolder.setReason(
           CarbonDataProcessorUtil.prepareFailureReason(column.getColName(), column.getDataType()));
       updateWithNullValue(row);
@@ -81,7 +82,7 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter {
   }
 
   private void updateWithNullValue(CarbonRow row) {
-    if (dataType == DataType.STRING) {
+    if (dataType == DataTypes.STRING) {
       row.update(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, index);
     } else {
       row.update(CarbonCommonConstants.EMPTY_BYTE_ARRAY, index);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/processing/src/main/java/org/apache/carbondata/processing/loading/parser/CarbonParserFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/CarbonParserFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/CarbonParserFactory.java
index 0ee1d90..608d0a9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/CarbonParserFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/CarbonParserFactory.java
@@ -18,6 +18,8 @@ package org.apache.carbondata.processing.loading.parser;
 
 import java.util.List;
 
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.processing.loading.parser.impl.ArrayParserImpl;
@@ -51,30 +53,29 @@ public final class CarbonParserFactory {
    */
   private static GenericParser createParser(CarbonColumn carbonColumn, String[] complexDelimiters,
       String nullFormat, int depth) {
-    switch (carbonColumn.getDataType()) {
-      case ARRAY:
-        List<CarbonDimension> listOfChildDimensions =
-            ((CarbonDimension) carbonColumn).getListOfChildDimensions();
-        // Create array parser with complex delimiter
-        ArrayParserImpl arrayParser = new ArrayParserImpl(complexDelimiters[depth], nullFormat);
-        for (CarbonDimension dimension : listOfChildDimensions) {
-          arrayParser
-              .addChildren(createParser(dimension, complexDelimiters, nullFormat, depth + 1));
-        }
-        return arrayParser;
-      case STRUCT:
-        List<CarbonDimension> dimensions =
-            ((CarbonDimension) carbonColumn).getListOfChildDimensions();
-        // Create struct parser with complex delimiter
-        StructParserImpl parser = new StructParserImpl(complexDelimiters[depth], nullFormat);
-        for (CarbonDimension dimension : dimensions) {
-          parser.addChildren(createParser(dimension, complexDelimiters, nullFormat, depth + 1));
-        }
-        return parser;
-      case MAP:
-        throw new UnsupportedOperationException("Complex type Map is not supported yet");
-      default:
-        return new PrimitiveParserImpl();
+    DataType dataType = carbonColumn.getDataType();
+    if (dataType == DataTypes.ARRAY) {
+      List<CarbonDimension> listOfChildDimensions =
+          ((CarbonDimension) carbonColumn).getListOfChildDimensions();
+      // Create array parser with complex delimiter
+      ArrayParserImpl arrayParser = new ArrayParserImpl(complexDelimiters[depth], nullFormat);
+      for (CarbonDimension dimension : listOfChildDimensions) {
+        arrayParser.addChildren(createParser(dimension, complexDelimiters, nullFormat, depth + 1));
+      }
+      return arrayParser;
+    } else if (dataType == DataTypes.STRUCT) {
+      List<CarbonDimension> dimensions =
+          ((CarbonDimension) carbonColumn).getListOfChildDimensions();
+      // Create struct parser with complex delimiter
+      StructParserImpl parser = new StructParserImpl(complexDelimiters[depth], nullFormat);
+      for (CarbonDimension dimension : dimensions) {
+        parser.addChildren(createParser(dimension, complexDelimiters, nullFormat, depth + 1));
+      }
+      return parser;
+    } else if (dataType == DataTypes.MAP) {
+      throw new UnsupportedOperationException("Complex type Map is not supported yet");
+    } else {
+      return new PrimitiveParserImpl();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java
index 06bd716..a41f734 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.processing.loading.partition.impl;
 
 import java.util.List;
 
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.processing.loading.partition.Partitioner;
 
@@ -36,19 +38,14 @@ public class HashPartitionerImpl implements Partitioner<Object[]> {
     this.numberOfBuckets = numberOfBuckets;
     hashes = new Hash[indexes.size()];
     for (int i = 0; i < indexes.size(); i++) {
-      switch (columnSchemas.get(i).getDataType()) {
-        case SHORT:
-        case INT:
-        case LONG:
-          hashes[i] = new IntegralHash(indexes.get(i));
-          break;
-        case DOUBLE:
-        case FLOAT:
-        case DECIMAL:
-          hashes[i] = new DecimalHash(indexes.get(i));
-          break;
-        default:
-          hashes[i] = new StringHash(indexes.get(i));
+      DataType dataType = columnSchemas.get(i).getDataType();
+      if (dataType == DataTypes.SHORT || dataType == DataTypes.INT || dataType == DataTypes.LONG) {
+        hashes[i] = new IntegralHash(indexes.get(i));
+      } else if (dataType == DataTypes.DOUBLE || dataType == DataTypes.FLOAT ||
+          dataType == DataTypes.DECIMAL) {
+        hashes[i] = new DecimalHash(indexes.get(i));
+      } else {
+        hashes[i] = new StringHash(indexes.get(i));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
index 14ab838..ff42e2d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
@@ -28,6 +28,7 @@ import org.apache.carbondata.core.memory.MemoryBlock;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
 /**
@@ -125,41 +126,35 @@ public class UnsafeCarbonRowPage {
     for (int mesCount = 0; mesCount < measureSize; mesCount++) {
       Object value = row[mesCount + dimensionSize];
       if (null != value) {
-        switch (measureDataType[mesCount]) {
-          case SHORT:
-            Short sval = (Short) value;
-            CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, sval);
-            size += 2;
-            break;
-          case INT:
-            Integer ival = (Integer) value;
-            CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, ival);
-            size += 4;
-            break;
-          case LONG:
-            Long val = (Long) value;
-            CarbonUnsafe.getUnsafe().putLong(baseObject, address + size, val);
-            size += 8;
-            break;
-          case DOUBLE:
-            Double doubleVal = (Double) value;
-            CarbonUnsafe.getUnsafe().putDouble(baseObject, address + size, doubleVal);
-            size += 8;
-            break;
-          case DECIMAL:
-            BigDecimal decimalVal = (BigDecimal) value;
-            byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(decimalVal);
-            CarbonUnsafe.getUnsafe().putShort(baseObject, address + size,
-                (short) bigDecimalInBytes.length);
-            size += 2;
-            CarbonUnsafe.getUnsafe()
-                .copyMemory(bigDecimalInBytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
-                    address + size, bigDecimalInBytes.length);
-            size += bigDecimalInBytes.length;
-            break;
-          default:
-            throw  new IllegalArgumentException("unsupported data type:" +
-                measureDataType[mesCount]);
+        DataType dataType = measureDataType[mesCount];
+        if (dataType == DataTypes.SHORT) {
+          Short sval = (Short) value;
+          CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, sval);
+          size += 2;
+        } else if (dataType == DataTypes.INT) {
+          Integer ival = (Integer) value;
+          CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, ival);
+          size += 4;
+        } else if (dataType == DataTypes.LONG) {
+          Long val = (Long) value;
+          CarbonUnsafe.getUnsafe().putLong(baseObject, address + size, val);
+          size += 8;
+        } else if (dataType == DataTypes.DOUBLE) {
+          Double doubleVal = (Double) value;
+          CarbonUnsafe.getUnsafe().putDouble(baseObject, address + size, doubleVal);
+          size += 8;
+        } else if (dataType == DataTypes.DECIMAL) {
+          BigDecimal decimalVal = (BigDecimal) value;
+          byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(decimalVal);
+          CarbonUnsafe.getUnsafe()
+              .putShort(baseObject, address + size, (short) bigDecimalInBytes.length);
+          size += 2;
+          CarbonUnsafe.getUnsafe()
+              .copyMemory(bigDecimalInBytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
+                  address + size, bigDecimalInBytes.length);
+          size += bigDecimalInBytes.length;
+        } else {
+          throw new IllegalArgumentException("unsupported data type:" + measureDataType[mesCount]);
         }
         set(nullSetWords, mesCount);
       } else {
@@ -213,39 +208,33 @@ public class UnsafeCarbonRowPage {
 
     for (int mesCount = 0; mesCount < measureSize; mesCount++) {
       if (isSet(nullSetWords, mesCount)) {
-        switch (measureDataType[mesCount]) {
-          case SHORT:
-            Short sval = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
-            size += 2;
-            rowToFill[dimensionSize + mesCount] = sval;
-            break;
-          case INT:
-            Integer ival = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
-            size += 4;
-            rowToFill[dimensionSize + mesCount] = ival;
-            break;
-          case LONG:
-            Long val = CarbonUnsafe.getUnsafe().getLong(baseObject, address + size);
-            size += 8;
-            rowToFill[dimensionSize + mesCount] = val;
-            break;
-          case DOUBLE:
-            Double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
-            size += 8;
-            rowToFill[dimensionSize + mesCount] = doubleVal;
-            break;
-          case DECIMAL:
-            short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
-            byte[] bigDecimalInBytes = new byte[aShort];
-            size += 2;
-            CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, bigDecimalInBytes,
-                CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
-            size += bigDecimalInBytes.length;
-            rowToFill[dimensionSize + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
-            break;
-          default:
-            throw new IllegalArgumentException("unsupported data type:" +
-                measureDataType[mesCount]);
+        DataType dataType = measureDataType[mesCount];
+        if (dataType == DataTypes.SHORT) {
+          Short sval = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+          size += 2;
+          rowToFill[dimensionSize + mesCount] = sval;
+        } else if (dataType == DataTypes.INT) {
+          Integer ival = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+          size += 4;
+          rowToFill[dimensionSize + mesCount] = ival;
+        } else if (dataType == DataTypes.LONG) {
+          Long val = CarbonUnsafe.getUnsafe().getLong(baseObject, address + size);
+          size += 8;
+          rowToFill[dimensionSize + mesCount] = val;
+        } else if (dataType == DataTypes.DOUBLE) {
+          Double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
+          size += 8;
+          rowToFill[dimensionSize + mesCount] = doubleVal;
+        } else if (dataType == DataTypes.DECIMAL) {
+          short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+          byte[] bigDecimalInBytes = new byte[aShort];
+          size += 2;
+          CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, bigDecimalInBytes,
+              CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
+          size += bigDecimalInBytes.length;
+          rowToFill[dimensionSize + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
+        } else {
+          throw new IllegalArgumentException("unsupported data type:" + measureDataType[mesCount]);
         }
       } else {
         rowToFill[dimensionSize + mesCount] = null;
@@ -301,40 +290,34 @@ public class UnsafeCarbonRowPage {
 
     for (int mesCount = 0; mesCount < measureSize; mesCount++) {
       if (isSet(nullSetWords, mesCount)) {
-        switch (measureDataType[mesCount]) {
-          case SHORT:
-            short sval = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
-            size += 2;
-            stream.writeShort(sval);
-            break;
-          case INT:
-            int ival = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
-            size += 4;
-            stream.writeInt(ival);
-            break;
-          case LONG:
-            long val = CarbonUnsafe.getUnsafe().getLong(baseObject, address + size);
-            size += 8;
-            stream.writeLong(val);
-            break;
-          case DOUBLE:
-            double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
-            size += 8;
-            stream.writeDouble(doubleVal);
-            break;
-          case DECIMAL:
-            short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
-            byte[] bigDecimalInBytes = new byte[aShort];
-            size += 2;
-            CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, bigDecimalInBytes,
-                CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
-            size += bigDecimalInBytes.length;
-            stream.writeShort(aShort);
-            stream.write(bigDecimalInBytes);
-            break;
-          default:
-            throw new IllegalArgumentException("unsupported data type:" +
-                measureDataType[mesCount]);
+        DataType dataType = measureDataType[mesCount];
+        if (dataType == DataTypes.SHORT) {
+          short sval = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+          size += 2;
+          stream.writeShort(sval);
+        } else if (dataType == DataTypes.INT) {
+          int ival = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+          size += 4;
+          stream.writeInt(ival);
+        } else if (dataType == DataTypes.LONG) {
+          long val = CarbonUnsafe.getUnsafe().getLong(baseObject, address + size);
+          size += 8;
+          stream.writeLong(val);
+        } else if (dataType == DataTypes.DOUBLE) {
+          double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
+          size += 8;
+          stream.writeDouble(doubleVal);
+        } else if (dataType == DataTypes.DECIMAL) {
+          short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+          byte[] bigDecimalInBytes = new byte[aShort];
+          size += 2;
+          CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, bigDecimalInBytes,
+              CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
+          size += bigDecimalInBytes.length;
+          stream.writeShort(aShort);
+          stream.write(bigDecimalInBytes);
+        } else {
+          throw new IllegalArgumentException("unsupported data type:" + measureDataType[mesCount]);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index 5fed2ea..404a521 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
@@ -325,28 +326,23 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
 
       for (int mesCount = 0; mesCount < measureCount; mesCount++) {
         if (UnsafeCarbonRowPage.isSet(words, mesCount)) {
-          switch (measureDataType[mesCount]) {
-            case SHORT:
-              row[dimensionCount + mesCount] = stream.readShort();
-              break;
-            case INT:
-              row[dimensionCount + mesCount] = stream.readInt();
-              break;
-            case LONG:
-              row[dimensionCount + mesCount] = stream.readLong();
-              break;
-            case DOUBLE:
-              row[dimensionCount + mesCount] = stream.readDouble();
-              break;
-            case DECIMAL:
-              short aShort = stream.readShort();
-              byte[] bigDecimalInBytes = new byte[aShort];
-              stream.readFully(bigDecimalInBytes);
-              row[dimensionCount + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
-              break;
-            default:
-              throw new IllegalArgumentException("unsupported data type:" +
-                  measureDataType[mesCount]);
+          DataType dataType = measureDataType[mesCount];
+          if (dataType == DataTypes.SHORT) {
+            row[dimensionCount + mesCount] = stream.readShort();
+          } else if (dataType == DataTypes.INT) {
+            row[dimensionCount + mesCount] = stream.readInt();
+          } else if (dataType == DataTypes.LONG) {
+            row[dimensionCount + mesCount] = stream.readLong();
+          } else if (dataType == DataTypes.DOUBLE) {
+            row[dimensionCount + mesCount] = stream.readDouble();
+          } else if (dataType == DataTypes.DECIMAL) {
+            short aShort = stream.readShort();
+            byte[] bigDecimalInBytes = new byte[aShort];
+            stream.readFully(bigDecimalInBytes);
+            row[dimensionCount + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
+          } else {
+            throw new IllegalArgumentException(
+                "unsupported data type:" + measureDataType[mesCount]);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index 4303ec8..9f7d6c3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Callable;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
 import org.apache.carbondata.processing.loading.sort.unsafe.holder.SortTempChunkHolder;
@@ -315,31 +316,26 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
     for (int mesCount = 0; mesCount < measureSize; mesCount++) {
       Object value = row[mesCount + dimensionSize];
       if (null != value) {
-        switch (type[mesCount]) {
-          case SHORT:
-            rowData.putShort(size, (Short) value);
-            size += 2;
-            break;
-          case INT:
-            rowData.putInt(size, (Integer) value);
-            size += 4;
-            break;
-          case LONG:
-            rowData.putLong(size, (Long) value);
-            size += 8;
-            break;
-          case DOUBLE:
-            rowData.putDouble(size, (Double) value);
-            size += 8;
-            break;
-          case DECIMAL:
-            byte[] bigDecimalInBytes = (byte[]) value;
-            rowData.putShort(size, (short)bigDecimalInBytes.length);
-            size += 2;
-            for (int i = 0; i < bigDecimalInBytes.length; i++) {
-              rowData.put(size++, bigDecimalInBytes[i]);
-            }
-            break;
+        DataType dataType = type[mesCount];
+        if (dataType == DataTypes.SHORT) {
+          rowData.putShort(size, (Short) value);
+          size += 2;
+        } else if (dataType == DataTypes.INT) {
+          rowData.putInt(size, (Integer) value);
+          size += 4;
+        } else if (dataType == DataTypes.LONG) {
+          rowData.putLong(size, (Long) value);
+          size += 8;
+        } else if (dataType == DataTypes.DOUBLE) {
+          rowData.putDouble(size, (Double) value);
+          size += 8;
+        } else if (dataType == DataTypes.DECIMAL) {
+          byte[] bigDecimalInBytes = (byte[]) value;
+          rowData.putShort(size, (short) bigDecimalInBytes.length);
+          size += 2;
+          for (int i = 0; i < bigDecimalInBytes.length; i++) {
+            rowData.put(size++, bigDecimalInBytes[i]);
+          }
         }
         UnsafeCarbonRowPage.set(nullSetWords, mesCount);
       } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index edffae9..0e20ef5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
@@ -256,14 +257,13 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
    * @return
    */
   private Object getConvertedMeasureValue(Object value, DataType type) {
-    switch (type) {
-      case DECIMAL:
-        if (value != null) {
-          value = ((Decimal) value).toJavaBigDecimal();
-        }
-        return value;
-      default:
-        return value;
+    if (type == DataTypes.DECIMAL) {
+      if (value != null) {
+        value = ((Decimal) value).toJavaBigDecimal();
+      }
+      return value;
+    } else {
+      return value;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
index d4a8dd6..3671316 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
@@ -30,6 +30,7 @@ import java.util.concurrent.Callable;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.NonDictionaryUtil;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
@@ -345,26 +346,21 @@ public class IntermediateFileMerger implements Callable<Void> {
       for (int counter = 0; counter < mergerParameters.getMeasureColCount(); counter++) {
         if (null != NonDictionaryUtil.getMeasure(fieldIndex, row)) {
           stream.write((byte) 1);
-          switch (aggType[counter]) {
-            case SHORT:
-              stream.writeShort((short)NonDictionaryUtil.getMeasure(fieldIndex, row));
-              break;
-            case INT:
-              stream.writeInt((int)NonDictionaryUtil.getMeasure(fieldIndex, row));
-              break;
-            case LONG:
-              stream.writeLong((long)NonDictionaryUtil.getMeasure(fieldIndex, row));
-              break;
-            case DOUBLE:
-              stream.writeDouble((Double) NonDictionaryUtil.getMeasure(fieldIndex, row));
-              break;
-            case DECIMAL:
-              byte[] bigDecimalInBytes = (byte[]) NonDictionaryUtil.getMeasure(fieldIndex, row);
-              stream.writeInt(bigDecimalInBytes.length);
-              stream.write(bigDecimalInBytes);
-              break;
-            default:
-              throw new IllegalArgumentException("unsupported data type:" + aggType[counter]);
+          DataType dataType = aggType[counter];
+          if (dataType == DataTypes.SHORT) {
+            stream.writeShort((short) NonDictionaryUtil.getMeasure(fieldIndex, row));
+          } else if (dataType == DataTypes.INT) {
+            stream.writeInt((int) NonDictionaryUtil.getMeasure(fieldIndex, row));
+          } else if (dataType == DataTypes.LONG) {
+            stream.writeLong((long) NonDictionaryUtil.getMeasure(fieldIndex, row));
+          } else if (dataType == DataTypes.DOUBLE) {
+            stream.writeDouble((Double) NonDictionaryUtil.getMeasure(fieldIndex, row));
+          } else if (dataType == DataTypes.DECIMAL) {
+            byte[] bigDecimalInBytes = (byte[]) NonDictionaryUtil.getMeasure(fieldIndex, row);
+            stream.writeInt(bigDecimalInBytes.length);
+            stream.write(bigDecimalInBytes);
+          } else {
+            throw new IllegalArgumentException("unsupported data type:" + aggType[counter]);
           }
         } else {
           stream.write((byte) 0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
index 11df276..5cc96c5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -293,27 +294,22 @@ public class SortDataRows {
           Object value = row[mesCount + dimColCount];
           if (null != value) {
             stream.write((byte) 1);
-            switch (type[mesCount]) {
-              case SHORT:
-                stream.writeShort((Short) value);
-                break;
-              case INT:
-                stream.writeInt((Integer) value);
-                break;
-              case LONG:
-                stream.writeLong((Long) value);
-                break;
-              case DOUBLE:
-                stream.writeDouble((Double) value);
-                break;
-              case DECIMAL:
-                BigDecimal val = (BigDecimal) value;
-                byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
-                stream.writeInt(bigDecimalInBytes.length);
-                stream.write(bigDecimalInBytes);
-                break;
-              default:
-                throw new IllegalArgumentException("unsupported data type:" + type[mesCount]);
+            DataType dataType = type[mesCount];
+            if (dataType == DataTypes.SHORT) {
+              stream.writeShort((Short) value);
+            } else if (dataType == DataTypes.INT) {
+              stream.writeInt((Integer) value);
+            } else if (dataType == DataTypes.LONG) {
+              stream.writeLong((Long) value);
+            } else if (dataType == DataTypes.DOUBLE) {
+              stream.writeDouble((Double) value);
+            } else if (dataType == DataTypes.DECIMAL) {
+              BigDecimal val = (BigDecimal) value;
+              byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
+              stream.writeInt(bigDecimalInBytes.length);
+              stream.write(bigDecimalInBytes);
+            } else {
+              throw new IllegalArgumentException("unsupported data type:" + type[mesCount]);
             }
           } else {
             stream.write((byte) 0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
index 3e56605..5d339c7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
@@ -343,27 +344,22 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
       // read measure values
       for (int i = 0; i < this.measureCount; i++) {
         if (stream.readByte() == 1) {
-          switch (aggType[i]) {
-            case SHORT:
-              measures[index++] = stream.readShort();
-              break;
-            case INT:
-              measures[index++] = stream.readInt();
-              break;
-            case LONG:
-              measures[index++] = stream.readLong();
-              break;
-            case DOUBLE:
-              measures[index++] = stream.readDouble();
-              break;
-            case DECIMAL:
-              int len = stream.readInt();
-              byte[] buff = new byte[len];
-              stream.readFully(buff);
-              measures[index++] = DataTypeUtil.byteToBigDecimal(buff);
-              break;
-            default:
-              throw new IllegalArgumentException("unsupported data type:" + aggType[i]);
+          DataType dataType = aggType[i];
+          if (dataType == DataTypes.SHORT) {
+            measures[index++] = stream.readShort();
+          } else if (dataType == DataTypes.INT) {
+            measures[index++] = stream.readInt();
+          } else if (dataType == DataTypes.LONG) {
+            measures[index++] = stream.readLong();
+          } else if (dataType == DataTypes.DOUBLE) {
+            measures[index++] = stream.readDouble();
+          } else if (dataType == DataTypes.DECIMAL) {
+            int len = stream.readInt();
+            byte[] buff = new byte[len];
+            stream.readFully(buff);
+            measures[index++] = DataTypeUtil.byteToBigDecimal(buff);
+          } else {
+            throw new IllegalArgumentException("unsupported data type:" + aggType[i]);
           }
         } else {
           measures[index++] = null;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 78f1637..fdf44cf 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -45,6 +45,7 @@ import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.util.CarbonProperties;
@@ -503,7 +504,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
         new ArrayList<Integer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     DataType[] type = model.getMeasureDataType();
     for (int j = 0; j < type.length; j++) {
-      if (type[j] != DataType.BYTE && type[j] != DataType.DECIMAL) {
+      if (type[j] != DataTypes.BYTE && type[j] != DataTypes.DECIMAL) {
         otherMeasureIndexList.add(j);
       } else {
         customMeasureIndexList.add(j);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index 287de0a..d2cf1c4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -46,6 +46,7 @@ import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 
 import org.apache.spark.sql.types.Decimal;
@@ -91,14 +92,14 @@ public class TablePage {
     dictDimensionPages = new ColumnPage[numDictDimension];
     for (int i = 0; i < dictDimensionPages.length; i++) {
       TableSpec.DimensionSpec spec = tableSpec.getDimensionSpec(i);
-      ColumnPage page = ColumnPage.newPage(spec, DataType.BYTE_ARRAY, pageSize);
-      page.setStatsCollector(KeyPageStatsCollector.newInstance(DataType.BYTE_ARRAY));
+      ColumnPage page = ColumnPage.newPage(spec, DataTypes.BYTE_ARRAY, pageSize);
+      page.setStatsCollector(KeyPageStatsCollector.newInstance(DataTypes.BYTE_ARRAY));
       dictDimensionPages[i] = page;
     }
     noDictDimensionPages = new ColumnPage[model.getNoDictionaryCount()];
     for (int i = 0; i < noDictDimensionPages.length; i++) {
       TableSpec.DimensionSpec spec = tableSpec.getDimensionSpec(i + numDictDimension);
-      ColumnPage page = ColumnPage.newPage(spec, DataType.STRING, pageSize);
+      ColumnPage page = ColumnPage.newPage(spec, DataTypes.STRING, pageSize);
       page.setStatsCollector(LVStringStatsCollector.newInstance());
       noDictDimensionPages[i] = page;
     }
@@ -113,7 +114,7 @@ public class TablePage {
     for (int i = 0; i < measurePages.length; i++) {
       TableSpec.MeasureSpec spec = model.getTableSpec().getMeasureSpec(i);
       ColumnPage page;
-      if (spec.getSchemaDataType() == DataType.DECIMAL) {
+      if (spec.getSchemaDataType() == DataTypes.DECIMAL) {
         page = ColumnPage.newDecimalPage(spec, dataTypes[i], pageSize);
       } else {
         page = ColumnPage.newPage(spec, dataTypes[i], pageSize);
@@ -182,7 +183,7 @@ public class TablePage {
 
       // in compaction flow the measure with decimal type will come as Spark decimal.
       // need to convert it to byte array.
-      if (measurePages[i].getDataType() == DataType.DECIMAL &&
+      if (measurePages[i].getDataType() == DataTypes.DECIMAL &&
           model.isCompactionFlow() &&
           value != null) {
         value = ((Decimal) value).toJavaBigDecimal();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 79e49ef..1c7f9e7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -41,6 +41,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
@@ -278,8 +279,8 @@ public final class CarbonDataProcessorUtil {
     StringBuilder dimString = new StringBuilder();
     for (int i = 0; i < dataFields.length; i++) {
       DataField dataField = dataFields[i];
-      if (dataField.getColumn().getDataType().equals(DataType.ARRAY) || dataField.getColumn()
-          .getDataType().equals(DataType.STRUCT)) {
+      if (dataField.getColumn().getDataType() == DataTypes.ARRAY ||
+          dataField.getColumn().getDataType() == DataTypes.STRUCT) {
         addAllComplexTypeChildren((CarbonDimension) dataField.getColumn(), dimString, "");
         dimString.append(CarbonCommonConstants.SEMICOLON_SPC_CHARACTER);
       }
@@ -321,22 +322,19 @@ public final class CarbonDataProcessorUtil {
     for (int i = 0; i < hierarchies.length; i++) {
       String[] levels = hierarchies[i].split(CarbonCommonConstants.HASH_SPC_CHARACTER);
       String[] levelInfo = levels[0].split(CarbonCommonConstants.COLON_SPC_CHARACTER);
-      GenericDataType g = levelInfo[1].equals(CarbonCommonConstants.ARRAY) ?
+      GenericDataType g = levelInfo[1].toLowerCase().contains(CarbonCommonConstants.ARRAY) ?
           new ArrayDataType(levelInfo[0], "", levelInfo[3]) :
           new StructDataType(levelInfo[0], "", levelInfo[3]);
       complexTypesMap.put(levelInfo[0], g);
       for (int j = 1; j < levels.length; j++) {
         levelInfo = levels[j].split(CarbonCommonConstants.COLON_SPC_CHARACTER);
-        switch (levelInfo[1]) {
-          case CarbonCommonConstants.ARRAY:
-            g.addChildren(new ArrayDataType(levelInfo[0], levelInfo[2], levelInfo[3]));
-            break;
-          case CarbonCommonConstants.STRUCT:
-            g.addChildren(new StructDataType(levelInfo[0], levelInfo[2], levelInfo[3]));
-            break;
-          default:
-            g.addChildren(new PrimitiveDataType(levelInfo[0], levelInfo[2], levelInfo[3],
-                Integer.parseInt(levelInfo[4])));
+        if (levelInfo[1].toLowerCase().contains(CarbonCommonConstants.ARRAY)) {
+          g.addChildren(new ArrayDataType(levelInfo[0], levelInfo[2], levelInfo[3]));
+        } else if (levelInfo[1].toLowerCase().contains(CarbonCommonConstants.STRUCT)) {
+          g.addChildren(new StructDataType(levelInfo[0], levelInfo[2], levelInfo[3]));
+        } else {
+          g.addChildren(new PrimitiveDataType(levelInfo[0], levelInfo[2], levelInfo[3],
+              Integer.parseInt(levelInfo[4])));
         }
       }
     }
@@ -396,7 +394,7 @@ public final class CarbonDataProcessorUtil {
       String tableName) {
     DataType[] type = new DataType[measureCount];
     for (int i = 0; i < type.length; i++) {
-      type[i] = DataType.DOUBLE;
+      type[i] = DataTypes.DOUBLE;
     }
     CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
         databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
@@ -458,7 +456,7 @@ public final class CarbonDataProcessorUtil {
       int measureCount) {
     DataType[] type = new DataType[measureCount];
     for (int i = 0; i < type.length; i++) {
-      type[i] = DataType.DOUBLE;
+      type[i] = DataTypes.DOUBLE;
     }
     List<CarbonMeasure> measures = carbonTable.getMeasureByTableName(tableName);
     for (int i = 0; i < measureCount; i++) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index 61771ea..37331c9 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -50,6 +50,7 @@ import org.apache.carbondata.core.metadata.ColumnIdentifier;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
 import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
@@ -183,7 +184,7 @@ public class StoreCreator {
     ColumnSchema id = new ColumnSchema();
     id.setColumnName("ID");
     id.setColumnar(true);
-    id.setDataType(DataType.INT);
+    id.setDataType(DataTypes.INT);
     id.setEncodingList(encodings);
     id.setColumnUniqueId(UUID.randomUUID().toString());
     id.setDimensionColumn(true);
@@ -193,7 +194,7 @@ public class StoreCreator {
     ColumnSchema date = new ColumnSchema();
     date.setColumnName("date");
     date.setColumnar(true);
-    date.setDataType(DataType.STRING);
+    date.setDataType(DataTypes.STRING);
     date.setEncodingList(encodings);
     date.setColumnUniqueId(UUID.randomUUID().toString());
     date.setDimensionColumn(true);
@@ -203,7 +204,7 @@ public class StoreCreator {
     ColumnSchema country = new ColumnSchema();
     country.setColumnName("country");
     country.setColumnar(true);
-    country.setDataType(DataType.STRING);
+    country.setDataType(DataTypes.STRING);
     country.setEncodingList(encodings);
     country.setColumnUniqueId(UUID.randomUUID().toString());
     country.setDimensionColumn(true);
@@ -213,7 +214,7 @@ public class StoreCreator {
     ColumnSchema name = new ColumnSchema();
     name.setColumnName("name");
     name.setColumnar(true);
-    name.setDataType(DataType.STRING);
+    name.setDataType(DataTypes.STRING);
     name.setEncodingList(encodings);
     name.setColumnUniqueId(UUID.randomUUID().toString());
     name.setDimensionColumn(true);
@@ -223,7 +224,7 @@ public class StoreCreator {
     ColumnSchema phonetype = new ColumnSchema();
     phonetype.setColumnName("phonetype");
     phonetype.setColumnar(true);
-    phonetype.setDataType(DataType.STRING);
+    phonetype.setDataType(DataTypes.STRING);
     phonetype.setEncodingList(encodings);
     phonetype.setColumnUniqueId(UUID.randomUUID().toString());
     phonetype.setDimensionColumn(true);
@@ -233,7 +234,7 @@ public class StoreCreator {
     ColumnSchema serialname = new ColumnSchema();
     serialname.setColumnName("serialname");
     serialname.setColumnar(true);
-    serialname.setDataType(DataType.STRING);
+    serialname.setDataType(DataTypes.STRING);
     serialname.setEncodingList(encodings);
     serialname.setColumnUniqueId(UUID.randomUUID().toString());
     serialname.setDimensionColumn(true);
@@ -243,7 +244,7 @@ public class StoreCreator {
     ColumnSchema salary = new ColumnSchema();
     salary.setColumnName("salary");
     salary.setColumnar(true);
-    salary.setDataType(DataType.INT);
+    salary.setDataType(DataTypes.INT);
     salary.setEncodingList(new ArrayList<Encoding>());
     salary.setColumnUniqueId(UUID.randomUUID().toString());
     salary.setDimensionColumn(false);