You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/11/08 04:15:45 UTC

[11/24] carbondata git commit: [CARBONDATA-1594] Add precision and scale to DecimalType

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index e5cfc84..285abf4 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier
-import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
+import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, DecimalType}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema._
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
@@ -207,16 +207,15 @@ class AlterTableColumnSchemaGenerator(
     alterTableModel.dimCols.foreach(field => {
       val encoders = new java.util.ArrayList[Encoding]()
       encoders.add(Encoding.DICTIONARY)
-      val columnSchema: ColumnSchema = getColumnSchema(
-        DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
-        field.name.getOrElse(field.column),
-        isCol = true,
+      val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema(
+        field,
         encoders,
         isDimensionCol = true,
-        -1,
         field.precision,
         field.scale,
         field.schemaOrdinal + existingColsSize,
+        alterTableModel.highCardinalityDims,
+        alterTableModel.databaseName.getOrElse(dbName),
         isSortColumn(field.name.getOrElse(field.column)))
       allColumns ++= Seq(columnSchema)
       newCols ++= Seq(columnSchema)
@@ -225,17 +224,16 @@ class AlterTableColumnSchemaGenerator(
     allColumns ++= tableCols.filter(x => !x.isDimensionColumn)
     alterTableModel.msrCols.foreach(field => {
       val encoders = new java.util.ArrayList[Encoding]()
-      val columnSchema: ColumnSchema = getColumnSchema(
-        DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
-        field.name.getOrElse(field.column),
-        isCol = true,
+      val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema(
+        field,
         encoders,
         isDimensionCol = false,
-        -1,
         field.precision,
         field.scale,
         field.schemaOrdinal + existingColsSize,
-        false)
+        alterTableModel.highCardinalityDims,
+        alterTableModel.databaseName.getOrElse(dbName)
+      )
       allColumns ++= Seq(columnSchema)
       newCols ++= Seq(columnSchema)
     })
@@ -299,33 +297,49 @@ class AlterTableColumnSchemaGenerator(
     newCols
   }
 
-  private def getColumnSchema(dataType: DataType, colName: String, isCol: Boolean,
-      encoders: java.util.List[Encoding], isDimensionCol: Boolean,
-      colGroup: Integer, precision: Integer, scale: Integer, schemaOrdinal: Int,
-      isSortColumn: Boolean): ColumnSchema = {
+}
+
+// TODO: move this to carbon store API
+object TableNewProcessor {
+  def apply(cm: TableModel): TableInfo = {
+    new TableNewProcessor(cm).process
+  }
+
+  def createColumnSchema(
+      field: Field,
+      encoders: java.util.List[Encoding],
+      isDimensionCol: Boolean,
+      precision: Int,
+      scale: Int,
+      schemaOrdinal: Int,
+      highCardinalityDims: Seq[String],
+      databaseName: String,
+      isSortColumn: Boolean = false): ColumnSchema = {
+    val dataType = DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse(""))
+    if (DataTypes.isDecimal(dataType)) {
+      dataType.asInstanceOf[DecimalType].setPrecision(field.precision)
+      dataType.asInstanceOf[DecimalType].setScale(field.scale)
+    }
     val columnSchema = new ColumnSchema()
     columnSchema.setDataType(dataType)
+    val colName = field.name.getOrElse(field.column)
     columnSchema.setColumnName(colName)
-    if (alterTableModel.highCardinalityDims.contains(colName)) {
+    if (highCardinalityDims.contains(colName)) {
       encoders.remove(Encoding.DICTIONARY)
     }
     if (dataType == DataTypes.DATE) {
       encoders.add(Encoding.DIRECT_DICTIONARY)
     }
-    if (dataType == DataTypes.TIMESTAMP && !alterTableModel.highCardinalityDims.contains(colName)) {
+    if (dataType == DataTypes.TIMESTAMP && ! highCardinalityDims.contains(colName)) {
       encoders.add(Encoding.DIRECT_DICTIONARY)
     }
-    val colPropMap = new java.util.HashMap[String, String]()
     columnSchema.setEncodingList(encoders)
     val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator
-    val columnUniqueId = colUniqueIdGenerator.generateUniqueId(
-      alterTableModel.databaseName.getOrElse(dbName),
-      columnSchema)
+    val columnUniqueId = colUniqueIdGenerator.generateUniqueId(databaseName, columnSchema)
     columnSchema.setColumnUniqueId(columnUniqueId)
     columnSchema.setColumnReferenceId(columnUniqueId)
-    columnSchema.setColumnar(isCol)
+    columnSchema.setColumnar(true)
     columnSchema.setDimensionColumn(isDimensionCol)
-    columnSchema.setColumnGroup(colGroup)
     columnSchema.setPrecision(precision)
     columnSchema.setScale(scale)
     columnSchema.setSchemaOrdinal(schemaOrdinal)
@@ -334,31 +348,25 @@ class AlterTableColumnSchemaGenerator(
     columnSchema
   }
 }
-object TableNewProcessor {
-  def apply(cm: TableModel): TableInfo = {
-    new TableNewProcessor(cm).process
-  }
-}
 
 class TableNewProcessor(cm: TableModel) {
 
-  var index = 0
-  var rowGroup = 0
-
   def getAllChildren(fieldChildren: Option[List[Field]]): Seq[ColumnSchema] = {
     var allColumns: Seq[ColumnSchema] = Seq[ColumnSchema]()
     fieldChildren.foreach(fields => {
       fields.foreach(field => {
         val encoders = new java.util.ArrayList[Encoding]()
         encoders.add(Encoding.DICTIONARY)
-        val columnSchema: ColumnSchema = getColumnSchema(
-          DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
-          field.name.getOrElse(field.column), index,
-          isCol = true, encoders, isDimensionCol = true, rowGroup, field.precision, field.scale,
-          field.schemaOrdinal)
+        val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema(
+          field,
+          encoders,
+          isDimensionCol = true,
+          field.precision,
+          field.scale,
+          field.schemaOrdinal,
+          cm.highcardinalitydims.getOrElse(Seq()),
+          cm.databaseName)
         allColumns ++= Seq(columnSchema)
-        index = index + 1
-        rowGroup = rowGroup + 1
         if (field.children.get != null) {
           columnSchema.setNumberOfChild(field.children.get.size)
           allColumns ++= getAllChildren(field.children)
@@ -368,39 +376,6 @@ class TableNewProcessor(cm: TableModel) {
     allColumns
   }
 
-  def getColumnSchema(dataType: DataType, colName: String, index: Integer, isCol: Boolean,
-      encoders: java.util.List[Encoding], isDimensionCol: Boolean,
-      colGroup: Integer, precision: Integer, scale: Integer, schemaOrdinal: Int): ColumnSchema = {
-    val columnSchema = new ColumnSchema()
-    columnSchema.setDataType(dataType)
-    columnSchema.setColumnName(colName)
-    val highCardinalityDims = cm.highcardinalitydims.getOrElse(Seq())
-    if (highCardinalityDims.contains(colName)) {
-      encoders.remove(Encoding.DICTIONARY)
-    }
-    if (dataType == DataTypes.DATE) {
-      encoders.add(Encoding.DIRECT_DICTIONARY)
-    }
-    if (dataType == DataTypes.TIMESTAMP && !highCardinalityDims.contains(colName)) {
-      encoders.add(Encoding.DIRECT_DICTIONARY)
-    }
-    columnSchema.setEncodingList(encoders)
-    val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator
-    val columnUniqueId = colUniqueIdGenerator.generateUniqueId(cm.databaseName,
-      columnSchema)
-    columnSchema.setColumnUniqueId(columnUniqueId)
-    columnSchema.setColumnReferenceId(columnUniqueId)
-    columnSchema.setColumnar(isCol)
-    columnSchema.setDimensionColumn(isDimensionCol)
-    columnSchema.setColumnGroup(colGroup)
-    columnSchema.setPrecision(precision)
-    columnSchema.setScale(scale)
-    columnSchema.setSchemaOrdinal(schemaOrdinal)
-    columnSchema.setSortColumn(false)
-    // TODO: Need to fill RowGroupID, converted type
-    // & Number of Children after DDL finalization
-    columnSchema
-  }
 
   // process create dml fields and create wrapper TableInfo object
   def process: TableInfo = {
@@ -414,38 +389,34 @@ class TableNewProcessor(cm: TableModel) {
       val field = cm.dimCols.find(keyDim equals _.column).get
       val encoders = new java.util.ArrayList[Encoding]()
       encoders.add(Encoding.DICTIONARY)
-      val columnSchema: ColumnSchema = getColumnSchema(
-        DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
-        field.name.getOrElse(field.column),
-        index,
-        isCol = true,
+      val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema(
+        field,
         encoders,
         isDimensionCol = true,
-        -1,
         field.precision,
         field.scale,
-        field.schemaOrdinal)
+        field.schemaOrdinal,
+        cm.highcardinalitydims.getOrElse(Seq()),
+        cm.databaseName)
       columnSchema.setSortColumn(true)
       allColumns :+= columnSchema
       index = index + 1
     }
 
-    cm.dimCols.foreach(field => {
+    cm.dimCols.foreach { field =>
       val sortField = cm.sortKeyDims.get.find(field.column equals _)
       if (sortField.isEmpty) {
         val encoders = new java.util.ArrayList[Encoding]()
         encoders.add(Encoding.DICTIONARY)
-        val columnSchema: ColumnSchema = getColumnSchema(
-          DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
-          field.name.getOrElse(field.column),
-          index,
-          isCol = true,
+        val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema(
+          field,
           encoders,
           isDimensionCol = true,
-          -1,
           field.precision,
           field.scale,
-          field.schemaOrdinal)
+          field.schemaOrdinal,
+          cm.highcardinalitydims.getOrElse(Seq()),
+          cm.databaseName)
         allColumns :+= columnSchema
         index = index + 1
         if (field.children.isDefined && field.children.get != null) {
@@ -453,37 +424,37 @@ class TableNewProcessor(cm: TableModel) {
           allColumns ++= getAllChildren(field.children)
         }
       }
-    })
+    }
 
-    cm.msrCols.foreach(field => {
+    cm.msrCols.foreach { field =>
       val encoders = new java.util.ArrayList[Encoding]()
-      val columnSchema: ColumnSchema = getColumnSchema(
-        DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
-        field.name.getOrElse(field.column),
-        index,
-        isCol = true,
+      val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema(
+        field,
         encoders,
         isDimensionCol = false,
-        -1,
         field.precision,
         field.scale,
-        field.schemaOrdinal)
+        field.schemaOrdinal,
+        cm.highcardinalitydims.getOrElse(Seq()),
+        cm.databaseName)
       allColumns :+= columnSchema
       index = index + 1
       measureCount += 1
-    })
+    }
 
     // Check if there is any duplicate measures or dimensions.
     // Its based on the dimension name and measure name
-    allColumns.groupBy(_.getColumnName).foreach(f => if (f._2.size > 1) {
-      val name = f._1
-      LOGGER.error(s"Duplicate column found with name: $name")
-      LOGGER.audit(
-        s"Validation failed for Create/Alter Table Operation " +
-            s"for ${ cm.databaseName }.${ cm.tableName }" +
-            s"Duplicate column found with name: $name")
-      sys.error(s"Duplicate dimensions found with name: $name")
-    })
+    allColumns.groupBy(_.getColumnName).foreach { f =>
+      if (f._2.size > 1) {
+        val name = f._1
+        LOGGER.error(s"Duplicate column found with name: $name")
+        LOGGER.audit(
+          s"Validation failed for Create/Alter Table Operation " +
+          s"for ${ cm.databaseName }.${ cm.tableName }" +
+          s"Duplicate column found with name: $name")
+        sys.error(s"Duplicate dimensions found with name: $name")
+      }
+    }
 
     val highCardinalityDims = cm.highcardinalitydims.getOrElse(Seq())
 
@@ -508,13 +479,21 @@ class TableNewProcessor(cm: TableModel) {
     // Adding dummy measure if no measure is provided
     if (measureCount == 0) {
       val encoders = new java.util.ArrayList[Encoding]()
-      val columnSchema: ColumnSchema = getColumnSchema(DataTypes.DOUBLE,
+      val field = Field(
         CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE,
-        index,
-        true,
+        Some(DataTypes.DOUBLE.getName),
+        Some(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE),
+        None
+      )
+      val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema(
+        field,
         encoders,
-        false,
-        -1, 0, 0, schemaOrdinal = -1)
+        isDimensionCol = false,
+        field.precision,
+        field.scale,
+        -1,
+        cm.highcardinalitydims.getOrElse(Seq()),
+        cm.databaseName)
       columnSchema.setInvisible(true)
       allColumns :+= columnSchema
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 1d3783e..64b440b 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -211,7 +211,7 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
         fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
             CarbonScalaUtil.convertCarbonToSparkDataType(msr.getMeasure().getDataType()), true,
             null);
-      } else if (dataType == DataTypes.DECIMAL) {
+      } else if (DataTypes.isDecimal(dataType)) {
         fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
             new DecimalType(msr.getMeasure().getPrecision(), msr.getMeasure().getScale()), true,
             null);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 98a37fa..44fbb37 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -151,8 +151,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
       case DoubleType => CarbonType.DOUBLE.getName
       case TimestampType => CarbonType.TIMESTAMP.getName
       case DateType => CarbonType.DATE.getName
-      case decimal: DecimalType => s"${CarbonType.DECIMAL.getName} (${decimal.precision}" +
-                                   s", ${decimal.scale})"
+      case decimal: DecimalType => s"decimal(${decimal.precision}, ${decimal.scale})"
       case other => sys.error(s"unsupported type: $other")
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index ef89771..91c07de 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -164,46 +164,49 @@ case class CarbonDictionaryDecoder(
              |}
              """.stripMargin
 
-            val caseCode = getDictionaryColumnIds(index)._3.getDataType match {
-              case CarbonDataTypes.INT =>
-                s"""
-                   |int $value = Integer.parseInt(new String($valueIntern,
-                   |org.apache.carbondata.core.constants.CarbonCommonConstants
-                   |.DEFAULT_CHARSET_CLASS));
-                 """.stripMargin
-              case CarbonDataTypes.SHORT =>
-                s"""
-                   |short $value =
-                   |Short.parseShort(new String($valueIntern,
-                   |org.apache.carbondata.core.constants.CarbonCommonConstants
-                   |.DEFAULT_CHARSET_CLASS));
-                 """.stripMargin
-              case CarbonDataTypes.DOUBLE =>
-                s"""
-                   |double $value =
-                   |Double.parseDouble(new String($valueIntern,
-                   |org.apache.carbondata.core.constants.CarbonCommonConstants
-                   |.DEFAULT_CHARSET_CLASS));
-                 """.stripMargin
-              case CarbonDataTypes.LONG =>
-                s"""
-                   |long $value =
-                   |Long.parseLong(new String($valueIntern,
-                   |org.apache.carbondata.core.constants.CarbonCommonConstants
-                   |.DEFAULT_CHARSET_CLASS));
-                 """.stripMargin
-              case CarbonDataTypes.DECIMAL =>
+            val caseCode =
+              if (CarbonDataTypes.isDecimal(getDictionaryColumnIds(index)._3.getDataType)) {
                 s"""
                    |org.apache.spark.sql.types.Decimal $value =
                    |Decimal.apply(new java.math.BigDecimal(
                    |new String($valueIntern, org.apache.carbondata.core.constants
                    |.CarbonCommonConstants.DEFAULT_CHARSET_CLASS)));
                  """.stripMargin
-              case _ =>
-                s"""
-                   | UTF8String $value = UTF8String.fromBytes($valueIntern);
+              } else {
+                getDictionaryColumnIds(index)._3.getDataType match {
+                  case CarbonDataTypes.INT =>
+                    s"""
+                       |int $value = Integer.parseInt(new String($valueIntern,
+                       |org.apache.carbondata.core.constants.CarbonCommonConstants
+                       |.DEFAULT_CHARSET_CLASS));
                  """.stripMargin
-            }
+                  case CarbonDataTypes.SHORT =>
+                    s"""
+                       |short $value =
+                       |Short.parseShort(new String($valueIntern,
+                       |org.apache.carbondata.core.constants.CarbonCommonConstants
+                       |.DEFAULT_CHARSET_CLASS));
+                 """.stripMargin
+                  case CarbonDataTypes.DOUBLE =>
+                    s"""
+                       |double $value =
+                       |Double.parseDouble(new String($valueIntern,
+                       |org.apache.carbondata.core.constants.CarbonCommonConstants
+                       |.DEFAULT_CHARSET_CLASS));
+                 """.stripMargin
+                  case CarbonDataTypes.LONG =>
+                    s"""
+                       |long $value =
+                       |Long.parseLong(new String($valueIntern,
+                       |org.apache.carbondata.core.constants.CarbonCommonConstants
+                       |.DEFAULT_CHARSET_CLASS));
+                 """.stripMargin
+                  case _ =>
+                    s"""
+                       | UTF8String $value = UTF8String.fromBytes($valueIntern);
+                 """.stripMargin
+                }
+              }
           code +=
             s"""
                |$caseCode
@@ -381,29 +384,31 @@ object CarbonDictionaryDecoder {
    */
   def convertCarbonToSparkDataType(carbonDimension: CarbonDimension,
       relation: CarbonRelation): types.DataType = {
-    carbonDimension.getDataType match {
-      case CarbonDataTypes.STRING => StringType
-      case CarbonDataTypes.SHORT => ShortType
-      case CarbonDataTypes.INT => IntegerType
-      case CarbonDataTypes.LONG => LongType
-      case CarbonDataTypes.DOUBLE => DoubleType
-      case CarbonDataTypes.BOOLEAN => BooleanType
-      case CarbonDataTypes.DECIMAL =>
-        val scale: Int = carbonDimension.getColumnSchema.getScale
-        val precision: Int = carbonDimension.getColumnSchema.getPrecision
-        if (scale == 0 && precision == 0) {
-          DecimalType(18, 2)
-        } else {
-          DecimalType(precision, scale)
-        }
-      case CarbonDataTypes.TIMESTAMP => TimestampType
-      case CarbonDataTypes.DATE => DateType
-      case CarbonDataTypes.STRUCT =>
-        CarbonMetastoreTypes
-          .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>")
-      case CarbonDataTypes.ARRAY =>
-        CarbonMetastoreTypes
-          .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>")
+    if (CarbonDataTypes.isDecimal(carbonDimension.getDataType)) {
+      val scale: Int = carbonDimension.getColumnSchema.getScale
+      val precision: Int = carbonDimension.getColumnSchema.getPrecision
+      if (scale == 0 && precision == 0) {
+        DecimalType(18, 2)
+      } else {
+        DecimalType(precision, scale)
+      }
+    } else {
+      carbonDimension.getDataType match {
+        case CarbonDataTypes.STRING => StringType
+        case CarbonDataTypes.SHORT => ShortType
+        case CarbonDataTypes.INT => IntegerType
+        case CarbonDataTypes.LONG => LongType
+        case CarbonDataTypes.DOUBLE => DoubleType
+        case CarbonDataTypes.BOOLEAN => BooleanType
+        case CarbonDataTypes.TIMESTAMP => TimestampType
+        case CarbonDataTypes.DATE => DateType
+        case CarbonDataTypes.STRUCT =>
+          CarbonMetastoreTypes
+            .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>")
+        case CarbonDataTypes.ARRAY =>
+          CarbonMetastoreTypes
+            .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>")
+      }
     }
   }
 
@@ -464,34 +469,6 @@ class CarbonDecoderRDD(
     }
   }
 
-  def convertCarbonToSparkDataType(carbonDimension: CarbonDimension,
-      relation: CarbonRelation): types.DataType = {
-    carbonDimension.getDataType match {
-      case CarbonDataTypes.STRING => StringType
-      case CarbonDataTypes.SHORT => ShortType
-      case CarbonDataTypes.INT => IntegerType
-      case CarbonDataTypes.LONG => LongType
-      case CarbonDataTypes.DOUBLE => DoubleType
-      case CarbonDataTypes.BOOLEAN => BooleanType
-      case CarbonDataTypes.DECIMAL =>
-        val scale: Int = carbonDimension.getColumnSchema.getScale
-        val precision: Int = carbonDimension.getColumnSchema.getPrecision
-        if (scale == 0 && precision == 0) {
-          DecimalType(18, 2)
-        } else {
-          DecimalType(precision, scale)
-        }
-      case CarbonDataTypes.TIMESTAMP => TimestampType
-      case CarbonDataTypes.DATE => DateType
-      case CarbonDataTypes.STRUCT =>
-        CarbonMetastoreTypes
-          .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>")
-      case CarbonDataTypes.ARRAY =>
-        CarbonMetastoreTypes
-          .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>")
-    }
-  }
-
   val getDictionaryColumnIds = {
     val dictIds: Array[(String, ColumnIdentifier, CarbonDimension)] = output.map { a =>
       val attr = aliasMap.getOrElse(a, a)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index 6bac0da..2c476ed 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -16,7 +16,6 @@
  */
 package org.apache.spark.sql.hive
 
-import java.util
 import java.util.LinkedHashSet
 
 import scala.Array.canBuildFrom
@@ -29,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Stati
 import org.apache.spark.sql.types._
 
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.datatype.DataTypes.DECIMAL
+import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.path.CarbonStorePath
@@ -164,7 +163,7 @@ case class CarbonRelation(
 
   def addDecimalScaleAndPrecision(dimval: CarbonColumn, dataType: String): String = {
     var dType = dataType
-    if (dimval.getDataType == DECIMAL) {
+    if (DataTypes.isDecimal(dimval.getDataType)) {
       dType +=
       "(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")"
     }
@@ -184,7 +183,7 @@ case class CarbonRelation(
 
   def addDecimalScaleAndPrecision(dimval: CarbonDimension, dataType: String): String = {
     var dType = dataType
-    if (dimval.getDataType == DECIMAL) {
+    if (DataTypes.isDecimal(dimval.getDataType)) {
       dType +=
       "(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")"
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java
index a41f734..f24d24f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java
@@ -42,7 +42,7 @@ public class HashPartitionerImpl implements Partitioner<Object[]> {
       if (dataType == DataTypes.SHORT || dataType == DataTypes.INT || dataType == DataTypes.LONG) {
         hashes[i] = new IntegralHash(indexes.get(i));
       } else if (dataType == DataTypes.DOUBLE || dataType == DataTypes.FLOAT ||
-          dataType == DataTypes.DECIMAL) {
+          DataTypes.isDecimal(dataType)) {
         hashes[i] = new DecimalHash(indexes.get(i));
       } else {
         hashes[i] = new StringHash(indexes.get(i));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
index 1d8f941..e5583c2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
@@ -147,7 +147,7 @@ public class UnsafeCarbonRowPage {
           Double doubleVal = (Double) value;
           CarbonUnsafe.getUnsafe().putDouble(baseObject, address + size, doubleVal);
           size += 8;
-        } else if (dataType == DataTypes.DECIMAL) {
+        } else if (DataTypes.isDecimal(dataType)) {
           BigDecimal decimalVal = (BigDecimal) value;
           byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(decimalVal);
           CarbonUnsafe.getUnsafe()
@@ -233,7 +233,7 @@ public class UnsafeCarbonRowPage {
           Double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
           size += 8;
           rowToFill[dimensionSize + mesCount] = doubleVal;
-        } else if (dataType == DataTypes.DECIMAL) {
+        } else if (DataTypes.isDecimal(dataType)) {
           short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
           byte[] bigDecimalInBytes = new byte[aShort];
           size += 2;
@@ -315,7 +315,7 @@ public class UnsafeCarbonRowPage {
           double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
           size += 8;
           stream.writeDouble(doubleVal);
-        } else if (dataType == DataTypes.DECIMAL) {
+        } else if (DataTypes.isDecimal(dataType)) {
           short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
           byte[] bigDecimalInBytes = new byte[aShort];
           size += 2;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index 404a521..3972b1c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -335,7 +335,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
             row[dimensionCount + mesCount] = stream.readLong();
           } else if (dataType == DataTypes.DOUBLE) {
             row[dimensionCount + mesCount] = stream.readDouble();
-          } else if (dataType == DataTypes.DECIMAL) {
+          } else if (DataTypes.isDecimal(dataType)) {
             short aShort = stream.readShort();
             byte[] bigDecimalInBytes = new byte[aShort];
             stream.readFully(bigDecimalInBytes);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index 9f7d6c3..0c71adc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -329,7 +329,7 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
         } else if (dataType == DataTypes.DOUBLE) {
           rowData.putDouble(size, (Double) value);
           size += 8;
-        } else if (dataType == DataTypes.DECIMAL) {
+        } else if (DataTypes.isDecimal(dataType)) {
           byte[] bigDecimalInBytes = (byte[]) value;
           rowData.putShort(size, (short) bigDecimalInBytes.length);
           size += 2;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index a361f3a..187ba06 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -257,7 +257,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
    * @return
    */
   private Object getConvertedMeasureValue(Object value, DataType type) {
-    if (type == DataTypes.DECIMAL) {
+    if (DataTypes.isDecimal(type)) {
       if (value != null) {
         value = ((Decimal) value).toJavaBigDecimal();
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
index ebc811c..266e69a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
@@ -357,7 +357,7 @@ public class IntermediateFileMerger implements Callable<Void> {
             stream.writeLong((long) NonDictionaryUtil.getMeasure(fieldIndex, row));
           } else if (dataType == DataTypes.DOUBLE) {
             stream.writeDouble((Double) NonDictionaryUtil.getMeasure(fieldIndex, row));
-          } else if (dataType == DataTypes.DECIMAL) {
+          } else if (DataTypes.isDecimal(dataType)) {
             byte[] bigDecimalInBytes = (byte[]) NonDictionaryUtil.getMeasure(fieldIndex, row);
             stream.writeInt(bigDecimalInBytes.length);
             stream.write(bigDecimalInBytes);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
index 8a60657..5b9e091 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
@@ -305,7 +305,7 @@ public class SortDataRows {
               stream.writeLong((Long) value);
             } else if (dataType == DataTypes.DOUBLE) {
               stream.writeDouble((Double) value);
-            } else if (dataType == DataTypes.DECIMAL) {
+            } else if (DataTypes.isDecimal(dataType)) {
               BigDecimal val = (BigDecimal) value;
               byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
               stream.writeInt(bigDecimalInBytes.length);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
index 91bc83c..2f87cf7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
@@ -355,7 +355,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
             measures[index++] = stream.readLong();
           } else if (dataType == DataTypes.DOUBLE) {
             measures[index++] = stream.readDouble();
-          } else if (dataType == DataTypes.DECIMAL) {
+          } else if (DataTypes.isDecimal(dataType)) {
             int len = stream.readInt();
             byte[] buff = new byte[len];
             stream.readFully(buff);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index fdf44cf..7882cd4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -504,7 +504,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
         new ArrayList<Integer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     DataType[] type = model.getMeasureDataType();
     for (int j = 0; j < type.length; j++) {
-      if (type[j] != DataTypes.BYTE && type[j] != DataTypes.DECIMAL) {
+      if (type[j] != DataTypes.BYTE && !DataTypes.isDecimal(type[j])) {
         otherMeasureIndexList.add(j);
       } else {
         customMeasureIndexList.add(j);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index d2cf1c4..6a9aba1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -114,14 +114,13 @@ public class TablePage {
     for (int i = 0; i < measurePages.length; i++) {
       TableSpec.MeasureSpec spec = model.getTableSpec().getMeasureSpec(i);
       ColumnPage page;
-      if (spec.getSchemaDataType() == DataTypes.DECIMAL) {
+      if (DataTypes.isDecimal(spec.getSchemaDataType())) {
         page = ColumnPage.newDecimalPage(spec, dataTypes[i], pageSize);
       } else {
         page = ColumnPage.newPage(spec, dataTypes[i], pageSize);
       }
       page.setStatsCollector(
-          PrimitivePageStatsCollector.newInstance(
-              dataTypes[i], spec.getScale(), spec.getPrecision()));
+          PrimitivePageStatsCollector.newInstance(dataTypes[i]));
       measurePages[i] = page;
     }
     boolean hasNoDictionary = noDictDimensionPages.length > 0;
@@ -183,7 +182,7 @@ public class TablePage {
 
       // in compaction flow the measure with decimal type will come as Spark decimal.
       // need to convert it to byte array.
-      if (measurePages[i].getDataType() == DataTypes.DECIMAL &&
+      if (DataTypes.isDecimal(measurePages[i].getDataType()) &&
           model.isCompactionFlow() &&
           value != null) {
         value = ((Decimal) value).toJavaBigDecimal();