You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/30 09:21:55 UTC

[05/35] carbondata git commit: [CARBONDATA-1539] Change data type from enum to class

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index 4b04116..ea90bbf 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -50,6 +50,7 @@ import org.apache.carbondata.core.metadata.ColumnIdentifier;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
 import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
@@ -187,7 +188,7 @@ public class StoreCreator {
     ColumnSchema id = new ColumnSchema();
     id.setColumnName("ID");
     id.setColumnar(true);
-    id.setDataType(DataType.INT);
+    id.setDataType(DataTypes.INT);
     id.setEncodingList(encodings);
     id.setColumnUniqueId(UUID.randomUUID().toString());
     id.setDimensionColumn(true);
@@ -197,7 +198,7 @@ public class StoreCreator {
     ColumnSchema date = new ColumnSchema();
     date.setColumnName("date");
     date.setColumnar(true);
-    date.setDataType(DataType.STRING);
+    date.setDataType(DataTypes.STRING);
     date.setEncodingList(encodings);
     date.setColumnUniqueId(UUID.randomUUID().toString());
     date.setDimensionColumn(true);
@@ -208,7 +209,7 @@ public class StoreCreator {
     ColumnSchema country = new ColumnSchema();
     country.setColumnName("country");
     country.setColumnar(true);
-    country.setDataType(DataType.STRING);
+    country.setDataType(DataTypes.STRING);
     country.setEncodingList(encodings);
     country.setColumnUniqueId(UUID.randomUUID().toString());
     country.setDimensionColumn(true);
@@ -219,7 +220,7 @@ public class StoreCreator {
     ColumnSchema name = new ColumnSchema();
     name.setColumnName("name");
     name.setColumnar(true);
-    name.setDataType(DataType.STRING);
+    name.setDataType(DataTypes.STRING);
     name.setEncodingList(encodings);
     name.setColumnUniqueId(UUID.randomUUID().toString());
     name.setDimensionColumn(true);
@@ -230,7 +231,7 @@ public class StoreCreator {
     ColumnSchema phonetype = new ColumnSchema();
     phonetype.setColumnName("phonetype");
     phonetype.setColumnar(true);
-    phonetype.setDataType(DataType.STRING);
+    phonetype.setDataType(DataTypes.STRING);
     phonetype.setEncodingList(encodings);
     phonetype.setColumnUniqueId(UUID.randomUUID().toString());
     phonetype.setDimensionColumn(true);
@@ -241,7 +242,7 @@ public class StoreCreator {
     ColumnSchema serialname = new ColumnSchema();
     serialname.setColumnName("serialname");
     serialname.setColumnar(true);
-    serialname.setDataType(DataType.STRING);
+    serialname.setDataType(DataTypes.STRING);
     serialname.setEncodingList(encodings);
     serialname.setColumnUniqueId(UUID.randomUUID().toString());
     serialname.setDimensionColumn(true);
@@ -252,7 +253,7 @@ public class StoreCreator {
     ColumnSchema salary = new ColumnSchema();
     salary.setColumnName("salary");
     salary.setColumnar(true);
-    salary.setDataType(DataType.INT);
+    salary.setDataType(DataTypes.INT);
     salary.setEncodingList(new ArrayList<Encoding>());
     salary.setColumnUniqueId(UUID.randomUUID().toString());
     salary.setDimensionColumn(false);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java
index 34a1936..36ae65c 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
@@ -136,13 +137,12 @@ public class CarbonDictionaryDecodeReadSupport<T> implements CarbonReadSupport<T
    */
   private Writable createWritableObject(Object obj, CarbonColumn carbonColumn) throws IOException {
     DataType dataType = carbonColumn.getDataType();
-    switch (dataType) {
-      case STRUCT:
-        return createStruct(obj, carbonColumn);
-      case ARRAY:
-        return createArray(obj, carbonColumn);
-      default:
-        return createWritablePrimitive(obj, carbonColumn);
+    if (dataType == DataTypes.STRUCT) {
+      return createStruct(obj, carbonColumn);
+    } else if (dataType == DataTypes.ARRAY) {
+      return createArray(obj, carbonColumn);
+    } else {
+      return createWritablePrimitive(obj, carbonColumn);
     }
   }
 
@@ -219,77 +219,30 @@ public class CarbonDictionaryDecodeReadSupport<T> implements CarbonReadSupport<T
     if (obj == null) {
       return null;
     }
-    switch (dataType) {
-      case NULL:
-        return null;
-      case DOUBLE:
-        return new DoubleWritable((double) obj);
-      case INT:
-        return new IntWritable((int) obj);
-      case LONG:
-        return new LongWritable((long) obj);
-      case SHORT:
-        return new ShortWritable((short) obj);
-      case DATE:
-        Calendar c = Calendar.getInstance();
-        c.setTime(new Date(0));
-        c.add(Calendar.DAY_OF_YEAR, (Integer) obj);
-        Date date = new java.sql.Date(c.getTime().getTime());
-        return new DateWritable(date);
-      case TIMESTAMP:
-        return new TimestampWritable(new Timestamp((long) obj / 1000));
-      case STRING:
-        return new Text(obj.toString());
-      case DECIMAL:
-        return new HiveDecimalWritable(
-            HiveDecimal.create(new java.math.BigDecimal(obj.toString())));
-      default:
-        throw new IOException("unsupported data type:" + dataType);
-    }
-  }
-
-  /**
-   * If we need to use the same Writable[] then we can use this method
-   *
-   * @param writable
-   * @param obj
-   * @param carbonColumn
-   * @throws IOException
-   */
-  private void setPrimitive(Writable writable, Object obj, CarbonColumn carbonColumn)
-      throws IOException {
-    DataType dataType = carbonColumn.getDataType();
-    if (obj == null) {
-      writable.write(null);
-    }
-    switch (dataType) {
-      case DOUBLE:
-        ((DoubleWritable) writable).set((double) obj);
-        break;
-      case INT:
-        ((IntWritable) writable).set((int) obj);
-        break;
-      case LONG:
-        ((LongWritable) writable).set((long) obj);
-        break;
-      case SHORT:
-        ((ShortWritable) writable).set((short) obj);
-        break;
-      case DATE:
-        ((DateWritable) writable).set(new Date((Long) obj));
-        break;
-      case TIMESTAMP:
-        ((TimestampWritable) writable).set(new Timestamp((long) obj));
-        break;
-      case STRING:
-        ((Text) writable).set(obj.toString());
-        break;
-      case DECIMAL:
-        ((HiveDecimalWritable) writable)
-            .set(HiveDecimal.create(new java.math.BigDecimal(obj.toString())));
-        break;
-      default:
-        throw new IOException("unsupported data type:" + dataType);
+    if (dataType == DataTypes.NULL) {
+      return null;
+    } else if (dataType == DataTypes.DOUBLE) {
+      return new DoubleWritable((double) obj);
+    } else if (dataType == DataTypes.INT) {
+      return new IntWritable((int) obj);
+    } else if (dataType == DataTypes.LONG) {
+      return new LongWritable((long) obj);
+    } else if (dataType == DataTypes.SHORT) {
+      return new ShortWritable((short) obj);
+    } else if (dataType == DataTypes.DATE) {
+      Calendar c = Calendar.getInstance();
+      c.setTime(new Date(0));
+      c.add(Calendar.DAY_OF_YEAR, (Integer) obj);
+      Date date = new java.sql.Date(c.getTime().getTime());
+      return new DateWritable(date);
+    } else if (dataType == DataTypes.TIMESTAMP) {
+      return new TimestampWritable(new Timestamp((long) obj / 1000));
+    } else if (dataType == DataTypes.STRING) {
+      return new Text(obj.toString());
+    } else if (dataType == DataTypes.DECIMAL) {
+      return new HiveDecimalWritable(HiveDecimal.create(new java.math.BigDecimal(obj.toString())));
+    } else {
+      throw new IOException("unsupported data type:" + dataType);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonTypeUtil.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonTypeUtil.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonTypeUtil.java
index 6cb2915..e2c9c68 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonTypeUtil.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonTypeUtil.java
@@ -6,28 +6,28 @@ import org.apache.spark.sql.types.DataTypes;
 
 public class CarbonTypeUtil {
 
-  public static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType(
+  static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType(
       DataType carbonDataType) {
-    switch (carbonDataType) {
-      case STRING:
-        return DataTypes.StringType;
-      case SHORT:
-        return DataTypes.ShortType;
-      case INT:
-        return DataTypes.IntegerType;
-      case LONG:
+    if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.STRING) {
+      return DataTypes.StringType;
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT) {
+      return DataTypes.ShortType;
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT) {
+      return DataTypes.IntegerType;
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) {
         return DataTypes.LongType;
-      case DOUBLE:
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE) {
         return DataTypes.DoubleType;
-      case BOOLEAN:
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN) {
         return DataTypes.BooleanType;
-      case DECIMAL:
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DECIMAL) {
         return DataTypes.createDecimalType();
-      case TIMESTAMP:
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.TIMESTAMP) {
         return DataTypes.TimestampType;
-      case DATE:
-        return DataTypes.DateType;
-      default: return null;
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DATE) {
+      return DataTypes.DateType;
+    } else {
+      return null;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java
index f474433..2db2d23 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.scan.executor.QueryExecutor;
 import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
@@ -193,28 +194,24 @@ class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
             null);
       } else {
         fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
-            CarbonTypeUtil.convertCarbonToSparkDataType(DataType.INT), true, null);
+            CarbonTypeUtil.convertCarbonToSparkDataType(DataTypes.INT), true, null);
       }
     }
 
     for (int i = 0; i < queryMeasures.size(); i++) {
       QueryMeasure msr = queryMeasures.get(i);
-      switch (msr.getMeasure().getDataType()) {
-        case SHORT:
-        case INT:
-        case LONG:
-          fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
-              CarbonTypeUtil.convertCarbonToSparkDataType(msr.getMeasure().getDataType()), true,
-              null);
-          break;
-        case DECIMAL:
-          fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
-              new DecimalType(msr.getMeasure().getPrecision(),
-                  msr.getMeasure().getScale()), true, null);
-          break;
-        default:
-          fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
-              CarbonTypeUtil.convertCarbonToSparkDataType(DataType.DOUBLE), true, null);
+      DataType dataType = msr.getMeasure().getDataType();
+      if (dataType == DataTypes.SHORT || dataType == DataTypes.INT || dataType == DataTypes.LONG) {
+        fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
+            CarbonTypeUtil.convertCarbonToSparkDataType(msr.getMeasure().getDataType()), true,
+            null);
+      } else if (dataType == DataTypes.DECIMAL) {
+        fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
+            new DecimalType(msr.getMeasure().getPrecision(), msr.getMeasure().getScale()), true,
+            null);
+      } else {
+        fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
+            CarbonTypeUtil.convertCarbonToSparkDataType(DataTypes.DOUBLE), true, null);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
index a81c06f..99109d1 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.presto;
 
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.presto.impl.CarbonTableReader;
 import com.facebook.presto.spi.*;
@@ -221,32 +222,30 @@ public class CarbondataMetadata implements ConnectorMetadata {
 
   public static Type carbonDataType2SpiMapper(ColumnSchema columnSchema) {
     DataType colType = columnSchema.getDataType();
-    switch (colType) {
-      case BOOLEAN:
-        return BooleanType.BOOLEAN;
-      case SHORT:
-        return SmallintType.SMALLINT;
-      case INT:
-        return IntegerType.INTEGER;
-      case LONG:
-        return BigintType.BIGINT;
-      case FLOAT:
-      case DOUBLE:
-        return DoubleType.DOUBLE;
-      case DECIMAL:
-        if(columnSchema.getPrecision() > 0){
-          return DecimalType.createDecimalType(columnSchema.getPrecision(), columnSchema.getScale());
-        } else {
-          return DecimalType.createDecimalType();
-        }
-      case STRING:
-        return VarcharType.VARCHAR;
-      case DATE:
-        return DateType.DATE;
-      case TIMESTAMP:
-        return TimestampType.TIMESTAMP;
-      default:
-        return VarcharType.VARCHAR;
+    if (colType == DataTypes.BOOLEAN) {
+      return BooleanType.BOOLEAN;
+    } else if (colType == DataTypes.SHORT) {
+      return SmallintType.SMALLINT;
+    } else if (colType == DataTypes.INT) {
+      return IntegerType.INTEGER;
+    } else if (colType == DataTypes.LONG) {
+      return BigintType.BIGINT;
+    } else if (colType == DataTypes.FLOAT || colType == DataTypes.DOUBLE) {
+      return DoubleType.DOUBLE;
+    } else if (colType == DataTypes.DECIMAL) {
+      if (columnSchema.getPrecision() > 0) {
+        return DecimalType.createDecimalType(columnSchema.getPrecision(), columnSchema.getScale());
+      } else {
+        return DecimalType.createDecimalType();
+      }
+    } else if (colType == DataTypes.STRING) {
+      return VarcharType.VARCHAR;
+    } else if (colType == DataTypes.DATE) {
+      return DateType.DATE;
+    } else if (colType == DataTypes.TIMESTAMP) {
+      return TimestampType.TIMESTAMP;
+    } else {
+      return VarcharType.VARCHAR;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java
index a958e63..c9fb177 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
@@ -69,17 +70,17 @@ public class PrestoFilterUtil {
 
   private static DataType Spi2CarbondataTypeMapper(CarbondataColumnHandle carbondataColumnHandle) {
     Type colType = carbondataColumnHandle.getColumnType();
-    if (colType == BooleanType.BOOLEAN) return DataType.BOOLEAN;
-    else if (colType == SmallintType.SMALLINT) return DataType.SHORT;
-    else if (colType == IntegerType.INTEGER) return DataType.INT;
-    else if (colType == BigintType.BIGINT) return DataType.LONG;
-    else if (colType == DoubleType.DOUBLE) return DataType.DOUBLE;
-    else if (colType == VarcharType.VARCHAR) return DataType.STRING;
-    else if (colType == DateType.DATE) return DataType.DATE;
-    else if (colType == TimestampType.TIMESTAMP) return DataType.TIMESTAMP;
+    if (colType == BooleanType.BOOLEAN) return DataTypes.BOOLEAN;
+    else if (colType == SmallintType.SMALLINT) return DataTypes.SHORT;
+    else if (colType == IntegerType.INTEGER) return DataTypes.INT;
+    else if (colType == BigintType.BIGINT) return DataTypes.LONG;
+    else if (colType == DoubleType.DOUBLE) return DataTypes.DOUBLE;
+    else if (colType == VarcharType.VARCHAR) return DataTypes.STRING;
+    else if (colType == DateType.DATE) return DataTypes.DATE;
+    else if (colType == TimestampType.TIMESTAMP) return DataTypes.TIMESTAMP;
     else if (colType.equals(DecimalType.createDecimalType(carbondataColumnHandle.getPrecision(),
-        carbondataColumnHandle.getScale()))) return DataType.DECIMAL;
-    else return DataType.STRING;
+        carbondataColumnHandle.getScale()))) return DataTypes.DECIMAL;
+    else return DataTypes.STRING;
   }
 
   /**
@@ -171,10 +172,10 @@ public class PrestoFilterUtil {
       }
       if (singleValues.size() == 1) {
         Expression ex;
-        if (coltype.equals(DataType.STRING)) {
+        if (coltype.equals(DataTypes.STRING)) {
           ex = new EqualToExpression(colExpression,
               new LiteralExpression(singleValues.get(0), coltype));
-        } else if (coltype.equals(DataType.TIMESTAMP) || coltype.equals(DataType.DATE)) {
+        } else if (coltype.equals(DataTypes.TIMESTAMP) || coltype.equals(DataTypes.DATE)) {
           Long value = (Long) singleValues.get(0);
           ex = new EqualToExpression(colExpression, new LiteralExpression(value, coltype));
         } else ex = new EqualToExpression(colExpression,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala b/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
index e3985e0..b9a9f0d 100644
--- a/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
+++ b/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
@@ -21,10 +21,9 @@ import io.airlift.slice.{Slice, Slices}
 import io.airlift.slice.Slices._
 
 import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
-import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryChunksWrapper,
-DictionaryColumnUniqueIdentifier}
+import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryChunksWrapper, DictionaryColumnUniqueIdentifier}
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
 import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
@@ -111,7 +110,7 @@ class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] {
       data.map { value =>
         DataTypeUtil
           .getDataBasedOnDataType(dictionaries(columnNo)
-            .getDictionaryValueForKey(value.asInstanceOf[Int]), DataType.STRING)
+            .getDictionaryValueForKey(value.asInstanceOf[Int]), DataTypes.STRING)
       }
     } else {
       data

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index 07f9699..0a811b7 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -40,7 +40,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.fileoperations.{AtomicFileOperations, AtomicFileOperationsImpl, FileWriteOperation}
 import org.apache.carbondata.core.metadata.converter.{SchemaConverter, ThriftWrapperSchemaConverterImpl}
-import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension, CarbonMeasure, ColumnSchema}
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
@@ -153,7 +153,7 @@ object CarbonDataStoreCreator {
     val id: ColumnSchema = new ColumnSchema()
     id.setColumnName("ID")
     id.setColumnar(true)
-    id.setDataType(DataType.INT)
+    id.setDataType(DataTypes.INT)
     id.setEncodingList(encodings)
     id.setColumnUniqueId(UUID.randomUUID().toString)
     id.setColumnReferenceId(id.getColumnUniqueId)
@@ -169,7 +169,7 @@ object CarbonDataStoreCreator {
     val date: ColumnSchema = new ColumnSchema()
     date.setColumnName("date")
     date.setColumnar(true)
-    date.setDataType(DataType.DATE)
+    date.setDataType(DataTypes.DATE)
     date.setEncodingList(dictEncoding)
     date.setColumnUniqueId(UUID.randomUUID().toString)
     date.setDimensionColumn(true)
@@ -180,7 +180,7 @@ object CarbonDataStoreCreator {
     val country: ColumnSchema = new ColumnSchema()
     country.setColumnName("country")
     country.setColumnar(true)
-    country.setDataType(DataType.STRING)
+    country.setDataType(DataTypes.STRING)
     country.setEncodingList(encodings)
     country.setColumnUniqueId(UUID.randomUUID().toString)
     country.setColumnReferenceId(country.getColumnUniqueId)
@@ -192,7 +192,7 @@ object CarbonDataStoreCreator {
     val name: ColumnSchema = new ColumnSchema()
     name.setColumnName("name")
     name.setColumnar(true)
-    name.setDataType(DataType.STRING)
+    name.setDataType(DataTypes.STRING)
     name.setEncodingList(encodings)
     name.setColumnUniqueId(UUID.randomUUID().toString)
     name.setDimensionColumn(true)
@@ -203,7 +203,7 @@ object CarbonDataStoreCreator {
     val phonetype: ColumnSchema = new ColumnSchema()
     phonetype.setColumnName("phonetype")
     phonetype.setColumnar(true)
-    phonetype.setDataType(DataType.STRING)
+    phonetype.setDataType(DataTypes.STRING)
     phonetype.setEncodingList(encodings)
     phonetype.setColumnUniqueId(UUID.randomUUID().toString)
     phonetype.setDimensionColumn(true)
@@ -214,7 +214,7 @@ object CarbonDataStoreCreator {
     val serialname: ColumnSchema = new ColumnSchema()
     serialname.setColumnName("serialname")
     serialname.setColumnar(true)
-    serialname.setDataType(DataType.STRING)
+    serialname.setDataType(DataTypes.STRING)
     serialname.setEncodingList(encodings)
     serialname.setColumnUniqueId(UUID.randomUUID().toString)
     serialname.setDimensionColumn(true)
@@ -225,7 +225,7 @@ object CarbonDataStoreCreator {
     val salary: ColumnSchema = new ColumnSchema()
     salary.setColumnName("salary")
     salary.setColumnar(true)
-    salary.setDataType(DataType.DOUBLE)
+    salary.setDataType(DataTypes.DOUBLE)
     salary.setEncodingList(encodings)
     salary.setColumnUniqueId(UUID.randomUUID().toString)
     salary.setDimensionColumn(false)
@@ -236,7 +236,7 @@ object CarbonDataStoreCreator {
     val bonus: ColumnSchema = new ColumnSchema()
     bonus.setColumnName("bonus")
     bonus.setColumnar(true)
-    bonus.setDataType(DataType.DECIMAL)
+    bonus.setDataType(DataTypes.DECIMAL)
     bonus.setPrecision(10)
     bonus.setScale(4)
     bonus.setEncodingList(encodings)
@@ -249,7 +249,7 @@ object CarbonDataStoreCreator {
     val dob: ColumnSchema = new ColumnSchema()
     dob.setColumnName("dob")
     dob.setColumnar(true)
-    dob.setDataType(DataType.TIMESTAMP)
+    dob.setDataType(DataTypes.TIMESTAMP)
     dob.setEncodingList(dictEncoding)
     dob.setColumnUniqueId(UUID.randomUUID().toString)
     dob.setDimensionColumn(true)
@@ -260,7 +260,7 @@ object CarbonDataStoreCreator {
     val shortField: ColumnSchema = new ColumnSchema()
     shortField.setColumnName("shortField")
     shortField.setColumnar(true)
-    shortField.setDataType(DataType.SHORT)
+    shortField.setDataType(DataTypes.SHORT)
     shortField.setEncodingList(encodings)
     shortField.setColumnUniqueId(UUID.randomUUID().toString)
     shortField.setDimensionColumn(false)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index 0b5141e..ba6ad31 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -31,7 +31,7 @@ import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.events.ChangeEvent
 import org.apache.carbondata.core.indexstore.schema.FilterType
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
 import org.apache.carbondata.core.util.CarbonProperties
 
 class C2DataMapFactory() extends DataMapFactory {
@@ -163,7 +163,7 @@ object DataMapWriterSuite {
         pageId: Int,
         pages: Array[ColumnPage]): Unit = {
       assert(pages.length == 1)
-      assert(pages(0).getDataType == DataType.STRING)
+      assert(pages(0).getDataType == DataTypes.STRING)
       val bytes: Array[Byte] = pages(0).getByteArrayPage()(0)
       assert(bytes.sameElements(Seq(0, 1, 'b'.toByte)))
       callbackSeq :+= s"add page data: blocklet $blockletId, page $pageId"

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
index c0dba74..3f99922 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
@@ -18,9 +18,10 @@
 package org.apache.carbondata.spark.testsuite.partition
 
 import org.scalatest.BeforeAndAfterAll
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.CarbonMetadata
-import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.util.CarbonProperties
@@ -53,7 +54,7 @@ class TestDDLForPartitionTable  extends QueryTest with BeforeAndAfterAll {
     val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
     assert(partitionInfo != null)
     assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("empno"))
-    assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataType.INT)
+    assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.INT)
     assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.size == 0)
     assert(partitionInfo.getPartitionType ==  PartitionType.HASH)
     assert(partitionInfo.getNumPartitions == 3)
@@ -76,7 +77,7 @@ class TestDDLForPartitionTable  extends QueryTest with BeforeAndAfterAll {
     val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
     assert(partitionInfo != null)
     assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("doj"))
-    assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataType.TIMESTAMP)
+    assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.TIMESTAMP)
     assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.size == 3)
     assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.get(0) == Encoding.DICTIONARY)
     assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.get(1) == Encoding.DIRECT_DICTIONARY)
@@ -103,7 +104,7 @@ class TestDDLForPartitionTable  extends QueryTest with BeforeAndAfterAll {
     val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
     assert(partitionInfo != null)
     assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("workgroupcategory"))
-    assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataType.STRING)
+    assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.STRING)
     assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.size == 1)
     assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.get(0) == Encoding.INVERTED_INDEX)
     assert(partitionInfo.getPartitionType == PartitionType.LIST)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala
index 1d660e8..317e2e2 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTableWithDefaultProperties.scala
@@ -19,7 +19,7 @@ package org.apache.carbondata.spark.testsuite.partition
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.CarbonMetadata
-import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.util.CarbonProperties
@@ -48,7 +48,7 @@ class TestDDLForPartitionTableWithDefaultProperties  extends QueryTest with Befo
     val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
     assert(partitionInfo != null)
     assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("empno"))
-    assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataType.INT)
+    assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.INT)
     assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.size == 0)
     assert(partitionInfo.getPartitionType ==  PartitionType.HASH)
     assert(partitionInfo.getNumPartitions == 3)
@@ -71,7 +71,7 @@ class TestDDLForPartitionTableWithDefaultProperties  extends QueryTest with Befo
     val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
     assert(partitionInfo != null)
     assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("doj"))
-    assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataType.TIMESTAMP)
+    assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.TIMESTAMP)
     assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.size == 3)
     assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.get(0) == Encoding.DICTIONARY)
     assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.get(1) == Encoding.DIRECT_DICTIONARY)
@@ -99,7 +99,7 @@ class TestDDLForPartitionTableWithDefaultProperties  extends QueryTest with Befo
     val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
     assert(partitionInfo != null)
     assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("projectenddate"))
-    assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataType.TIMESTAMP)
+    assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.TIMESTAMP)
     assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.size == 3)
     assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.get(0) == Encoding.DICTIONARY)
     assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.get(1) == Encoding.DIRECT_DICTIONARY)
@@ -131,7 +131,7 @@ class TestDDLForPartitionTableWithDefaultProperties  extends QueryTest with Befo
     val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
     assert(partitionInfo != null)
     assert(partitionInfo.getColumnSchemaList.get(0).getColumnName.equalsIgnoreCase("projectenddate"))
-    assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataType.DATE)
+    assert(partitionInfo.getColumnSchemaList.get(0).getDataType == DataTypes.DATE)
     assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.size == 3)
     assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.get(0) == Encoding.DICTIONARY)
     assert(partitionInfo.getColumnSchemaList.get(0).getEncodingList.get(1) == Encoding.DIRECT_DICTIONARY)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index a038ff3..bf46f67 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -17,8 +17,6 @@
 
 package org.apache.carbondata.spark.rdd
 
-import java.io.{ByteArrayInputStream, DataInputStream}
-
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
@@ -26,7 +24,7 @@ import org.apache.spark.{Dependency, OneToOneDependency, Partition, SparkContext
 import org.apache.spark.rdd.RDD
 
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, CarbonTaskInfo, SessionParams, TaskMetricsMap, ThreadLocalSessionInfo, ThreadLocalTaskInfo}
+import org.apache.carbondata.core.util._
 
 /**
  * This RDD maintains session level ThreadLocal

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
index d38be0a..80cbab8 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
@@ -36,7 +36,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datastore.block.{Distributable, SegmentProperties, TaskBlockInfo}
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.column.{CarbonDimension, CarbonMeasure}
@@ -204,7 +204,7 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel,
       } else {  // normal dictionary
         val dict = CarbonLoaderUtil.getDictionary(carbonTableIdentifier,
           dimension.getColumnIdentifier, storePath, partitionDataType)
-        if (partitionDataType == DataType.STRING) {
+        if (partitionDataType == DataTypes.STRING) {
           if (partitionType == PartitionType.RANGE) {
             partitionValue = ByteUtil.
               toBytes(dict.getDictionaryValueForKey(keyArray(partColIdx).toInt))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 65f2ba2..cf37a18 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -25,26 +25,26 @@ import org.apache.spark.sql.execution.command.DataTypeInfo
 import org.apache.spark.sql.types._
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType}
+import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes}
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
 
 object CarbonScalaUtil {
   def convertSparkToCarbonDataType(
       dataType: org.apache.spark.sql.types.DataType): CarbonDataType = {
     dataType match {
-      case StringType => CarbonDataType.STRING
-      case ShortType => CarbonDataType.SHORT
-      case IntegerType => CarbonDataType.INT
-      case LongType => CarbonDataType.LONG
-      case DoubleType => CarbonDataType.DOUBLE
-      case FloatType => CarbonDataType.FLOAT
-      case DateType => CarbonDataType.DATE
-      case BooleanType => CarbonDataType.BOOLEAN
-      case TimestampType => CarbonDataType.TIMESTAMP
-      case ArrayType(_, _) => CarbonDataType.ARRAY
-      case StructType(_) => CarbonDataType.STRUCT
-      case NullType => CarbonDataType.NULL
-      case _ => CarbonDataType.DECIMAL
+      case StringType => CarbonDataTypes.STRING
+      case ShortType => CarbonDataTypes.SHORT
+      case IntegerType => CarbonDataTypes.INT
+      case LongType => CarbonDataTypes.LONG
+      case DoubleType => CarbonDataTypes.DOUBLE
+      case FloatType => CarbonDataTypes.FLOAT
+      case DateType => CarbonDataTypes.DATE
+      case BooleanType => CarbonDataTypes.BOOLEAN
+      case TimestampType => CarbonDataTypes.TIMESTAMP
+      case ArrayType(_, _) => CarbonDataTypes.ARRAY
+      case StructType(_) => CarbonDataTypes.STRUCT
+      case NullType => CarbonDataTypes.NULL
+      case _ => CarbonDataTypes.DECIMAL
     }
   }
 
@@ -67,15 +67,15 @@ object CarbonScalaUtil {
 
   def convertCarbonToSparkDataType(dataType: CarbonDataType): types.DataType = {
     dataType match {
-      case CarbonDataType.STRING => StringType
-      case CarbonDataType.SHORT => ShortType
-      case CarbonDataType.INT => IntegerType
-      case CarbonDataType.LONG => LongType
-      case CarbonDataType.DOUBLE => DoubleType
-      case CarbonDataType.BOOLEAN => BooleanType
-      case CarbonDataType.DECIMAL => DecimalType.SYSTEM_DEFAULT
-      case CarbonDataType.TIMESTAMP => TimestampType
-      case CarbonDataType.DATE => DateType
+      case CarbonDataTypes.STRING => StringType
+      case CarbonDataTypes.SHORT => ShortType
+      case CarbonDataTypes.INT => IntegerType
+      case CarbonDataTypes.LONG => LongType
+      case CarbonDataTypes.DOUBLE => DoubleType
+      case CarbonDataTypes.BOOLEAN => BooleanType
+      case CarbonDataTypes.DECIMAL => DecimalType.SYSTEM_DEFAULT
+      case CarbonDataTypes.TIMESTAMP => TimestampType
+      case CarbonDataTypes.DATE => DateType
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index df25a37..bc24c12 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -41,7 +41,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.row.LoadStatusType
 import org.apache.carbondata.core.memory.{UnsafeMemoryManager, UnsafeSortMemoryManager}
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
 import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
@@ -328,14 +328,14 @@ object CommonUtil {
     }
     val comparator = Comparator.getComparator(columnDataType)
     var head = columnDataType match {
-      case DataType.STRING => ByteUtil.toBytes(rangeInfo.head)
+      case DataTypes.STRING => ByteUtil.toBytes(rangeInfo.head)
       case _ => PartitionUtil.getDataBasedOnDataType(rangeInfo.head, columnDataType,
         timestampFormatter, dateFormatter)
     }
     val iterator = rangeInfo.tail.toIterator
     while (iterator.hasNext) {
       val next = columnDataType match {
-        case DataType.STRING => ByteUtil.toBytes(iterator.next())
+        case DataTypes.STRING => ByteUtil.toBytes(iterator.next())
         case _ => PartitionUtil.getDataBasedOnDataType(iterator.next(), columnDataType,
           timestampFormatter, dateFormatter)
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
index fef542a..027c654 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
@@ -17,7 +17,7 @@
 
 package org.apache.carbondata.spark.util
 
-import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
 import org.apache.carbondata.format.{DataType => ThriftDataType}
 
 object DataTypeConverterUtil {
@@ -26,51 +26,51 @@ object DataTypeConverterUtil {
 
   def convertToCarbonType(dataType: String): DataType = {
     dataType.toLowerCase match {
-      case "string" => DataType.STRING
-      case "int" => DataType.INT
-      case "integer" => DataType.INT
-      case "tinyint" => DataType.SHORT
-      case "smallint" => DataType.SHORT
-      case "long" => DataType.LONG
-      case "bigint" => DataType.LONG
-      case "numeric" => DataType.DOUBLE
-      case "double" => DataType.DOUBLE
-      case "float" => DataType.DOUBLE
-      case "decimal" => DataType.DECIMAL
-      case FIXED_DECIMAL(_, _) => DataType.DECIMAL
-      case "timestamp" => DataType.TIMESTAMP
-      case "date" => DataType.DATE
-      case "array" => DataType.ARRAY
-      case "struct" => DataType.STRUCT
+      case "string" => DataTypes.STRING
+      case "int" => DataTypes.INT
+      case "integer" => DataTypes.INT
+      case "tinyint" => DataTypes.SHORT
+      case "smallint" => DataTypes.SHORT
+      case "long" => DataTypes.LONG
+      case "bigint" => DataTypes.LONG
+      case "numeric" => DataTypes.DOUBLE
+      case "double" => DataTypes.DOUBLE
+      case "float" => DataTypes.DOUBLE
+      case "decimal" => DataTypes.DECIMAL
+      case FIXED_DECIMAL(_, _) => DataTypes.DECIMAL
+      case "timestamp" => DataTypes.TIMESTAMP
+      case "date" => DataTypes.DATE
+      case "array" => DataTypes.ARRAY
+      case "struct" => DataTypes.STRUCT
       case _ => convertToCarbonTypeForSpark2(dataType)
     }
   }
 
   def convertToCarbonTypeForSpark2(dataType: String): DataType = {
     dataType.toLowerCase match {
-      case "stringtype" => DataType.STRING
-      case "inttype" => DataType.INT
-      case "integertype" => DataType.INT
-      case "tinyinttype" => DataType.SHORT
-      case "shorttype" => DataType.SHORT
-      case "longtype" => DataType.LONG
-      case "biginttype" => DataType.LONG
-      case "numerictype" => DataType.DOUBLE
-      case "doubletype" => DataType.DOUBLE
-      case "floattype" => DataType.DOUBLE
-      case "decimaltype" => DataType.DECIMAL
-      case FIXED_DECIMALTYPE(_, _) => DataType.DECIMAL
-      case "timestamptype" => DataType.TIMESTAMP
-      case "datetype" => DataType.DATE
+      case "stringtype" => DataTypes.STRING
+      case "inttype" => DataTypes.INT
+      case "integertype" => DataTypes.INT
+      case "tinyinttype" => DataTypes.SHORT
+      case "shorttype" => DataTypes.SHORT
+      case "longtype" => DataTypes.LONG
+      case "biginttype" => DataTypes.LONG
+      case "numerictype" => DataTypes.DOUBLE
+      case "doubletype" => DataTypes.DOUBLE
+      case "floattype" => DataTypes.DOUBLE
+      case "decimaltype" => DataTypes.DECIMAL
+      case FIXED_DECIMALTYPE(_, _) => DataTypes.DECIMAL
+      case "timestamptype" => DataTypes.TIMESTAMP
+      case "datetype" => DataTypes.DATE
       case others =>
         if (others != null && others.startsWith("arraytype")) {
-          DataType.ARRAY
+          DataTypes.ARRAY
         } else if (others != null && others.startsWith("structtype")) {
-          DataType.STRUCT
+          DataTypes.STRUCT
         } else if (others != null && others.startsWith("char")) {
-          DataType.STRING
+          DataTypes.STRING
         } else if (others != null && others.startsWith("varchar")) {
-          DataType.STRING
+          DataTypes.STRING
         } else {
           sys.error(s"Unsupported data type: $dataType")
         }
@@ -79,17 +79,17 @@ object DataTypeConverterUtil {
 
   def convertToString(dataType: DataType): String = {
     dataType match {
-      case DataType.STRING => "string"
-      case DataType.SHORT => "smallint"
-      case DataType.INT => "int"
-      case DataType.LONG => "bigint"
-      case DataType.DOUBLE => "double"
-      case DataType.FLOAT => "double"
-      case DataType.DECIMAL => "decimal"
-      case DataType.TIMESTAMP => "timestamp"
-      case DataType.DATE => "date"
-      case DataType.ARRAY => "array"
-      case DataType.STRUCT => "struct"
+      case DataTypes.STRING => "string"
+      case DataTypes.SHORT => "smallint"
+      case DataTypes.INT => "int"
+      case DataTypes.LONG => "bigint"
+      case DataTypes.DOUBLE => "double"
+      case DataTypes.FLOAT => "double"
+      case DataTypes.DECIMAL => "decimal"
+      case DataTypes.TIMESTAMP => "timestamp"
+      case DataTypes.DATE => "date"
+      case DataTypes.ARRAY => "array"
+      case DataTypes.STRUCT => "struct"
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index c121960..ddc4763 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -43,7 +43,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnIdentifier}
-import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.column.{CarbonDimension, ColumnSchema}
 import org.apache.carbondata.core.reader.CarbonDictionaryReader
@@ -260,11 +260,11 @@ object GlobalDictionaryUtil {
         None
       case Some(dim) =>
         dim.getDataType match {
-          case DataType.ARRAY =>
+          case DataTypes.ARRAY =>
             val arrDim = ArrayParser(dim, format)
             generateParserForChildrenDimension(dim, format, mapColumnValuesWithId, arrDim)
             Some(arrDim)
-          case DataType.STRUCT =>
+          case DataTypes.STRUCT =>
             val stuDim = StructParser(dim, format)
             generateParserForChildrenDimension(dim, format, mapColumnValuesWithId, stuDim)
             Some(stuDim)
@@ -478,7 +478,7 @@ object GlobalDictionaryUtil {
       // for Array, user set ArrayFiled: path, while ArrayField has a child Array.val
       val currentColName = {
         preDictDimension.getDataType match {
-          case DataType.ARRAY =>
+          case DataTypes.ARRAY =>
             if (children(0).isComplex) {
               "val." + colName.substring(middleDimName.length + 1)
             } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 7d4dd49..16301d6 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -35,7 +35,7 @@ import org.apache.spark.util.PartitionUtils
 import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
 import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
@@ -622,14 +622,14 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
         dimFields += field
       } else if (dictIncludeCols.exists(x => x.equalsIgnoreCase(field.column))) {
         dimFields += field
-      } else if (DataTypeUtil.getDataType(field.dataType.get.toUpperCase) == DataType.TIMESTAMP &&
+      } else if (DataTypeUtil.getDataType(field.dataType.get.toUpperCase) == DataTypes.TIMESTAMP &&
                  !dictIncludeCols.exists(x => x.equalsIgnoreCase(field.column))) {
         noDictionaryDims :+= field.column
         dimFields += field
       } else if (isDetectAsDimentionDatatype(field.dataType.get)) {
         dimFields += field
         // consider all String cols as noDicitonaryDims by default
-        if (DataType.STRING.getName.equalsIgnoreCase(field.dataType.get)) {
+        if (DataTypes.STRING.getName.equalsIgnoreCase(field.dataType.get)) {
           noDictionaryDims :+= field.column
         }
       } else if (sortKeyDimsTmp.exists(x => x.equalsIgnoreCase(field.column)) &&

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 1a39741..e5cfc84 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier
-import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema._
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
@@ -309,10 +309,10 @@ class AlterTableColumnSchemaGenerator(
     if (alterTableModel.highCardinalityDims.contains(colName)) {
       encoders.remove(Encoding.DICTIONARY)
     }
-    if (dataType == DataType.DATE) {
+    if (dataType == DataTypes.DATE) {
       encoders.add(Encoding.DIRECT_DICTIONARY)
     }
-    if (dataType == DataType.TIMESTAMP && !alterTableModel.highCardinalityDims.contains(colName)) {
+    if (dataType == DataTypes.TIMESTAMP && !alterTableModel.highCardinalityDims.contains(colName)) {
       encoders.add(Encoding.DIRECT_DICTIONARY)
     }
     val colPropMap = new java.util.HashMap[String, String]()
@@ -378,10 +378,10 @@ class TableNewProcessor(cm: TableModel) {
     if (highCardinalityDims.contains(colName)) {
       encoders.remove(Encoding.DICTIONARY)
     }
-    if (dataType == DataType.DATE) {
+    if (dataType == DataTypes.DATE) {
       encoders.add(Encoding.DIRECT_DICTIONARY)
     }
-    if (dataType == DataType.TIMESTAMP && !highCardinalityDims.contains(colName)) {
+    if (dataType == DataTypes.TIMESTAMP && !highCardinalityDims.contains(colName)) {
       encoders.add(Encoding.DIRECT_DICTIONARY)
     }
     columnSchema.setEncodingList(encoders)
@@ -508,7 +508,7 @@ class TableNewProcessor(cm: TableModel) {
     // Adding dummy measure if no measure is provided
     if (measureCount == 0) {
       val encoders = new java.util.ArrayList[Encoding]()
-      val columnSchema: ColumnSchema = getColumnSchema(DataType.DOUBLE,
+      val columnSchema: ColumnSchema = getColumnSchema(DataTypes.DOUBLE,
         CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE,
         index,
         true,
@@ -537,7 +537,7 @@ class TableNewProcessor(cm: TableModel) {
         val col = allColumns.find(_.getColumnName.equalsIgnoreCase(b))
         col match {
           case Some(colSchema: ColumnSchema) =>
-            if (colSchema.isDimensionColumn && !colSchema.isComplex) {
+            if (colSchema.isDimensionColumn && !colSchema.getDataType.isComplexType) {
               colSchema
             } else {
               LOGGER.error(s"Bucket field must be dimension column and " +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
index 62767fd..92c8402 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
@@ -21,7 +21,7 @@ import java.sql.Date;
 import java.sql.Timestamp;
 
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
@@ -52,27 +52,21 @@ public class SparkRowReadSupportImpl extends DictionaryDecodeReadSupport<Row> {
         if (data[i] == null) {
           continue;
         }
-        switch (dataTypes[i]) {
-          case STRING:
-            data[i] = UTF8String.fromString(data[i].toString());
-            break;
-          case TIMESTAMP:
-            data[i] = new Timestamp((long) data[i]);
-            break;
-          case DATE:
-            data[i] = new Date((long) data[i]);
-            break;
-          case LONG:
-            data[i] = data[i];
-            break;
-          default:
+        if (dataTypes[i] == DataTypes.STRING) {
+          data[i] = UTF8String.fromString(data[i].toString());
+        } else if (dataTypes[i] == DataTypes.TIMESTAMP) {
+          data[i] = new Timestamp((long) data[i]);
+        } else if (dataTypes[i] == DataTypes.DATE) {
+          data[i] = new Date((long) data[i]);
+        } else if (dataTypes[i] == DataTypes.LONG) {
+          data[i] = data[i];
         }
       }
       else if (carbonColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
         //convert the long to timestamp in case of direct dictionary column
-        if (DataType.TIMESTAMP == carbonColumns[i].getDataType()) {
+        if (DataTypes.TIMESTAMP == carbonColumns[i].getDataType()) {
           data[i] = new Timestamp((long) data[i] / 1000L);
-        } else if (DataType.DATE == carbonColumns[i].getDataType()) {
+        } else if (DataTypes.DATE == carbonColumns[i].getDataType()) {
           data[i] = new Date((long) data[i]);
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
index 0d1b1df..7881b93 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.types._
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonType}
+import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonType}
 
 class CarbonDataFrameWriter(val dataFrame: DataFrame) {
 
@@ -161,8 +161,8 @@ class CarbonDataFrameWriter(val dataFrame: DataFrame) {
     sparkType match {
       case StringType => CarbonType.STRING.getName
       case IntegerType => CarbonType.INT.getName
-      case ShortType => CarbonType.SHORT.getName
-      case LongType => CarbonType.LONG.getName
+      case ShortType => "smallint"
+      case LongType => "bigint"
       case FloatType => CarbonType.DOUBLE.getName
       case DoubleType => CarbonType.DOUBLE.getName
       case TimestampType => CarbonType.TIMESTAMP.getName

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index cf14a79..6eeeaf9 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -45,7 +45,7 @@ import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion}
-import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
@@ -1047,7 +1047,7 @@ object CarbonDataRDDFactory {
     }
 
     val partitioner = PartitionFactory.getPartitioner(partitionInfo)
-    if (partitionColumnDataType == DataType.STRING) {
+    if (partitionColumnDataType == DataTypes.STRING) {
       if (partitionInfo.getPartitionType == PartitionType.RANGE) {
         inputRDD.map { row => (ByteUtil.toBytes(row._1), row._2) }
           .partitionBy(partitioner)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
index 2f65fbc..118249a 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
@@ -38,7 +38,7 @@ object CarbonSparkUtil {
       carbonTable.getDimensionByTableName(carbonTable.getFactTableName).asScala.map { f =>
         (f.getColName.toLowerCase,
             f.hasEncoding(Encoding.DICTIONARY) && !f.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
-                !CarbonUtil.hasComplexDataType(f.getDataType))
+                !f.getDataType.isComplexType)
       }
     CarbonMetaData(dimensionsAttr, measureAttr, carbonTable, DictionaryMap(dictionary.toMap))
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
index 7e449b6..2e93a6c 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
@@ -162,7 +162,7 @@ case class CarbonRelation(
     extends LeafNode with MultiInstanceRelation {
 
   def recursiveMethod(dimName: String, childDim: CarbonDimension): String = {
-    childDim.getDataType.toString.toLowerCase match {
+    childDim.getDataType.getName.toLowerCase match {
       case "array" => s"${
         childDim.getColName.substring(dimName.length + 1)
       }:array<${ getArrayChildren(childDim.getColName) }>"
@@ -175,7 +175,7 @@ case class CarbonRelation(
 
   def getArrayChildren(dimName: String): String = {
     metaData.carbonTable.getChildren(dimName).asScala.map(childDim => {
-      childDim.getDataType.toString.toLowerCase match {
+      childDim.getDataType.getName.toLowerCase match {
         case "array" => s"array<${ getArrayChildren(childDim.getColName) }>"
         case "struct" => s"struct<${ getStructChildren(childDim.getColName) }>"
         case dType => addDecimalScaleAndPrecision(childDim, dType)
@@ -185,7 +185,7 @@ case class CarbonRelation(
 
   def getStructChildren(dimName: String): String = {
     metaData.carbonTable.getChildren(dimName).asScala.map(childDim => {
-      childDim.getDataType.toString.toLowerCase match {
+      childDim.getDataType.getName.toLowerCase match {
         case "array" => s"${
           childDim.getColName.substring(dimName.length + 1)
         }:array<${ getArrayChildren(childDim.getColName) }>"
@@ -213,8 +213,7 @@ case class CarbonRelation(
       .map(dim => {
       val dimval = metaData.carbonTable
           .getDimensionByName(metaData.carbonTable.getFactTableName, dim.getColName)
-      val output: DataType = dimval.getDataType
-          .toString.toLowerCase match {
+      val output: DataType = dimval.getDataType.getName.toLowerCase match {
         case "array" =>
           CarbonMetastoreTypes.toDataType(s"array<${ getArrayChildren(dim.getColName) }>")
         case "struct" =>
@@ -238,7 +237,7 @@ case class CarbonRelation(
           getMeasureByTableName(tableMeta.carbonTable.getFactTableName).
           asScala.asJava).asScala.toSeq.filter(!_.getColumnSchema.isInvisible)
         .map(x => AttributeReference(x.getColName, CarbonMetastoreTypes.toDataType(
-          metaData.carbonTable.getMeasureByName(factTable, x.getColName).getDataType.toString
+          metaData.carbonTable.getMeasureByName(factTable, x.getColName).getDataType.getName
               .toLowerCase match {
             case "float" => "double"
             case "decimal" => "decimal(" + x.getPrecision + "," + x.getScale + ")"
@@ -252,7 +251,7 @@ case class CarbonRelation(
         .asScala
     columns.filter(!_.isInvisible).map { column =>
       if (column.isDimension()) {
-        val output: DataType = column.getDataType.toString.toLowerCase match {
+        val output: DataType = column.getDataType.getName.toLowerCase match {
           case "array" =>
             CarbonMetastoreTypes.toDataType(s"array<${getArrayChildren(column.getColName)}>")
           case "struct" =>
@@ -266,8 +265,7 @@ case class CarbonRelation(
         )(qualifiers = tableName +: alias.toSeq)
       } else {
         AttributeReference(column.getColName, CarbonMetastoreTypes.toDataType(
-          column.getDataType.toString
-            .toLowerCase match {
+          column.getDataType.getName.toLowerCase match {
             case "float" => "double"
             case "decimal" => "decimal(" + column.getColumnSchema.getPrecision + "," + column
               .getColumnSchema.getScale + ")"
@@ -292,7 +290,7 @@ case class CarbonRelation(
 
   def addDecimalScaleAndPrecision(dimval: CarbonColumn, dataType: String): String = {
     var dType = dataType
-    if (dimval.getDataType == org.apache.carbondata.core.metadata.datatype.DataType.DECIMAL) {
+    if (dimval.getDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DECIMAL) {
       dType +=
           "(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")"
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 37505d0..c14a61a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.types._
 import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
 import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, ColumnIdentifier}
+import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
 import org.apache.carbondata.core.metadata.datatype.DataType
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
@@ -96,13 +97,13 @@ case class CarbonDictionaryDecoder(
   def convertCarbonToSparkDataType(carbonDimension: CarbonDimension,
       relation: CarbonRelation): types.DataType = {
     carbonDimension.getDataType match {
-      case DataType.STRING => StringType
-      case DataType.SHORT => ShortType
-      case DataType.INT => IntegerType
-      case DataType.LONG => LongType
-      case DataType.DOUBLE => DoubleType
-      case DataType.BOOLEAN => BooleanType
-      case DataType.DECIMAL =>
+      case CarbonDataTypes.STRING => StringType
+      case CarbonDataTypes.SHORT => ShortType
+      case CarbonDataTypes.INT => IntegerType
+      case CarbonDataTypes.LONG => LongType
+      case CarbonDataTypes.DOUBLE => DoubleType
+      case CarbonDataTypes.BOOLEAN => BooleanType
+      case CarbonDataTypes.DECIMAL =>
         val scale: Int = carbonDimension.getColumnSchema.getScale
         val precision: Int = carbonDimension.getColumnSchema.getPrecision
         if (scale == 0 && precision == 0) {
@@ -110,12 +111,12 @@ case class CarbonDictionaryDecoder(
         } else {
           DecimalType(precision, scale)
         }
-      case DataType.TIMESTAMP => TimestampType
-      case DataType.DATE => DateType
-      case DataType.STRUCT =>
+      case CarbonDataTypes.TIMESTAMP => TimestampType
+      case CarbonDataTypes.DATE => DateType
+      case CarbonDataTypes.STRUCT =>
         CarbonMetastoreTypes
           .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>")
-      case DataType.ARRAY =>
+      case CarbonDataTypes.ARRAY =>
         CarbonMetastoreTypes
           .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>")
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala
index c827827..bc62a55 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala
@@ -39,7 +39,7 @@ object CarbonSparkUtil {
       carbonTable.getDimensionByTableName(carbonTable.getFactTableName).asScala.map { f =>
         (f.getColName.toLowerCase,
             f.hasEncoding(Encoding.DICTIONARY) && !f.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
-                !CarbonUtil.hasComplexDataType(f.getDataType))
+                !f.getDataType.isComplexType)
       }
     CarbonMetaData(dimensionsAttr, measureAttr, carbonTable, DictionaryMap(dictionary.toMap))
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index 8fc367b..2e45954 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.optimizer.AttributeReferenceWrapper
 import org.apache.spark.sql.sources
 import org.apache.spark.sql.types._
 
-import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
 import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
@@ -120,9 +120,9 @@ object CarbonFilters {
     def getCarbonLiteralExpression(name: String, value: Any): CarbonExpression = {
       val dataTypeOfAttribute = CarbonScalaUtil.convertSparkToCarbonDataType(dataTypeOf(name))
       val dataType = if (Option(value).isDefined
-                         && dataTypeOfAttribute == DataType.STRING
+                         && dataTypeOfAttribute == CarbonDataTypes.STRING
                          && value.isInstanceOf[Double]) {
-        DataType.DOUBLE
+        CarbonDataTypes.DOUBLE
       } else {
         dataTypeOfAttribute
       }
@@ -410,11 +410,11 @@ object CarbonFilters {
     } else {
       carbonColumn = carbonTable.getMeasureByName(carbonTable.getFactTableName, column)
       carbonColumn.getDataType match {
-        case DataType.INT => DataType.INT
-        case DataType.SHORT => DataType.SHORT
-        case DataType.LONG => DataType.LONG
-        case DataType.DECIMAL => DataType.DECIMAL
-        case _ => DataType.DOUBLE
+        case CarbonDataTypes.INT => CarbonDataTypes.INT
+        case CarbonDataTypes.SHORT => CarbonDataTypes.SHORT
+        case CarbonDataTypes.LONG => CarbonDataTypes.LONG
+        case CarbonDataTypes.DECIMAL => CarbonDataTypes.DECIMAL
+        case _ => CarbonDataTypes.DOUBLE
       }
     }
     CarbonScalaUtil.convertCarbonToSparkDataType(dataType)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index e1946a9..1183d94 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.scan.executor.QueryExecutor;
 import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
@@ -198,28 +199,24 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
             null);
       } else {
         fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
-            CarbonScalaUtil.convertCarbonToSparkDataType(DataType.INT), true, null);
+            CarbonScalaUtil.convertCarbonToSparkDataType(DataTypes.INT), true, null);
       }
     }
 
     for (int i = 0; i < queryMeasures.size(); i++) {
       QueryMeasure msr = queryMeasures.get(i);
-      switch (msr.getMeasure().getDataType()) {
-        case SHORT:
-        case INT:
-        case LONG:
-          fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
-              CarbonScalaUtil.convertCarbonToSparkDataType(msr.getMeasure().getDataType()), true,
-              null);
-          break;
-        case DECIMAL:
-          fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
-              new DecimalType(msr.getMeasure().getPrecision(),
-                  msr.getMeasure().getScale()), true, null);
-          break;
-        default:
-          fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
-              CarbonScalaUtil.convertCarbonToSparkDataType(DataType.DOUBLE), true, null);
+      DataType dataType = msr.getMeasure().getDataType();
+      if (dataType == DataTypes.SHORT || dataType == DataTypes.INT || dataType == DataTypes.LONG) {
+        fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
+            CarbonScalaUtil.convertCarbonToSparkDataType(msr.getMeasure().getDataType()), true,
+            null);
+      } else if (dataType == DataTypes.DECIMAL) {
+        fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
+            new DecimalType(msr.getMeasure().getPrecision(), msr.getMeasure().getScale()), true,
+            null);
+      } else {
+        fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
+            CarbonScalaUtil.convertCarbonToSparkDataType(DataTypes.DOUBLE), true, null);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 5b76c79..87de8ae 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -46,7 +46,7 @@ import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier, ColumnarFormatVersion}
-import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
@@ -1233,7 +1233,7 @@ object CarbonDataRDDFactory {
     }
 
     val partitioner = PartitionFactory.getPartitioner(partitionInfo)
-    if (partitionColumnDataType == DataType.STRING) {
+    if (partitionColumnDataType == DataTypes.STRING) {
       if (partitionInfo.getPartitionType == PartitionType.RANGE) {
         inputRDD.map { row => (ByteUtil.toBytes(row._1), row._2) }
           .partitionBy(partitioner)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
index de7f3fb..5dd5983 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
@@ -40,7 +40,7 @@ object CarbonSparkUtil {
       carbonTable.getDimensionByTableName(carbonTable.getFactTableName).asScala.map { f =>
         (f.getColName.toLowerCase,
             f.hasEncoding(Encoding.DICTIONARY) && !f.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
-                !CarbonUtil.hasComplexDataType(f.getDataType))
+                !f.getDataType.isComplexType)
       }
     CarbonMetaData(dimensionsAttr, measureAttr, carbonTable, DictionaryMap(dictionary.toMap))
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/956833e5/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index a12d86b..98a37fa 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.types._
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonType}
+import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonType}
 import org.apache.carbondata.spark.CarbonOption
 
 class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {