You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/11/08 04:15:45 UTC
[11/24] carbondata git commit: [CARBONDATA-1594] Add precision and
scale to DecimalType
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index e5cfc84..285abf4 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.CarbonTableIdentifier
-import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
+import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, DecimalType}
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema._
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
@@ -207,16 +207,15 @@ class AlterTableColumnSchemaGenerator(
alterTableModel.dimCols.foreach(field => {
val encoders = new java.util.ArrayList[Encoding]()
encoders.add(Encoding.DICTIONARY)
- val columnSchema: ColumnSchema = getColumnSchema(
- DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
- field.name.getOrElse(field.column),
- isCol = true,
+ val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema(
+ field,
encoders,
isDimensionCol = true,
- -1,
field.precision,
field.scale,
field.schemaOrdinal + existingColsSize,
+ alterTableModel.highCardinalityDims,
+ alterTableModel.databaseName.getOrElse(dbName),
isSortColumn(field.name.getOrElse(field.column)))
allColumns ++= Seq(columnSchema)
newCols ++= Seq(columnSchema)
@@ -225,17 +224,16 @@ class AlterTableColumnSchemaGenerator(
allColumns ++= tableCols.filter(x => !x.isDimensionColumn)
alterTableModel.msrCols.foreach(field => {
val encoders = new java.util.ArrayList[Encoding]()
- val columnSchema: ColumnSchema = getColumnSchema(
- DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
- field.name.getOrElse(field.column),
- isCol = true,
+ val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema(
+ field,
encoders,
isDimensionCol = false,
- -1,
field.precision,
field.scale,
field.schemaOrdinal + existingColsSize,
- false)
+ alterTableModel.highCardinalityDims,
+ alterTableModel.databaseName.getOrElse(dbName)
+ )
allColumns ++= Seq(columnSchema)
newCols ++= Seq(columnSchema)
})
@@ -299,33 +297,49 @@ class AlterTableColumnSchemaGenerator(
newCols
}
- private def getColumnSchema(dataType: DataType, colName: String, isCol: Boolean,
- encoders: java.util.List[Encoding], isDimensionCol: Boolean,
- colGroup: Integer, precision: Integer, scale: Integer, schemaOrdinal: Int,
- isSortColumn: Boolean): ColumnSchema = {
+}
+
+// TODO: move this to carbon store API
+object TableNewProcessor {
+ def apply(cm: TableModel): TableInfo = {
+ new TableNewProcessor(cm).process
+ }
+
+ def createColumnSchema(
+ field: Field,
+ encoders: java.util.List[Encoding],
+ isDimensionCol: Boolean,
+ precision: Int,
+ scale: Int,
+ schemaOrdinal: Int,
+ highCardinalityDims: Seq[String],
+ databaseName: String,
+ isSortColumn: Boolean = false): ColumnSchema = {
+ val dataType = DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse(""))
+ if (DataTypes.isDecimal(dataType)) {
+ dataType.asInstanceOf[DecimalType].setPrecision(field.precision)
+ dataType.asInstanceOf[DecimalType].setScale(field.scale)
+ }
val columnSchema = new ColumnSchema()
columnSchema.setDataType(dataType)
+ val colName = field.name.getOrElse(field.column)
columnSchema.setColumnName(colName)
- if (alterTableModel.highCardinalityDims.contains(colName)) {
+ if (highCardinalityDims.contains(colName)) {
encoders.remove(Encoding.DICTIONARY)
}
if (dataType == DataTypes.DATE) {
encoders.add(Encoding.DIRECT_DICTIONARY)
}
- if (dataType == DataTypes.TIMESTAMP && !alterTableModel.highCardinalityDims.contains(colName)) {
+ if (dataType == DataTypes.TIMESTAMP && ! highCardinalityDims.contains(colName)) {
encoders.add(Encoding.DIRECT_DICTIONARY)
}
- val colPropMap = new java.util.HashMap[String, String]()
columnSchema.setEncodingList(encoders)
val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator
- val columnUniqueId = colUniqueIdGenerator.generateUniqueId(
- alterTableModel.databaseName.getOrElse(dbName),
- columnSchema)
+ val columnUniqueId = colUniqueIdGenerator.generateUniqueId(databaseName, columnSchema)
columnSchema.setColumnUniqueId(columnUniqueId)
columnSchema.setColumnReferenceId(columnUniqueId)
- columnSchema.setColumnar(isCol)
+ columnSchema.setColumnar(true)
columnSchema.setDimensionColumn(isDimensionCol)
- columnSchema.setColumnGroup(colGroup)
columnSchema.setPrecision(precision)
columnSchema.setScale(scale)
columnSchema.setSchemaOrdinal(schemaOrdinal)
@@ -334,31 +348,25 @@ class AlterTableColumnSchemaGenerator(
columnSchema
}
}
-object TableNewProcessor {
- def apply(cm: TableModel): TableInfo = {
- new TableNewProcessor(cm).process
- }
-}
class TableNewProcessor(cm: TableModel) {
- var index = 0
- var rowGroup = 0
-
def getAllChildren(fieldChildren: Option[List[Field]]): Seq[ColumnSchema] = {
var allColumns: Seq[ColumnSchema] = Seq[ColumnSchema]()
fieldChildren.foreach(fields => {
fields.foreach(field => {
val encoders = new java.util.ArrayList[Encoding]()
encoders.add(Encoding.DICTIONARY)
- val columnSchema: ColumnSchema = getColumnSchema(
- DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
- field.name.getOrElse(field.column), index,
- isCol = true, encoders, isDimensionCol = true, rowGroup, field.precision, field.scale,
- field.schemaOrdinal)
+ val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema(
+ field,
+ encoders,
+ isDimensionCol = true,
+ field.precision,
+ field.scale,
+ field.schemaOrdinal,
+ cm.highcardinalitydims.getOrElse(Seq()),
+ cm.databaseName)
allColumns ++= Seq(columnSchema)
- index = index + 1
- rowGroup = rowGroup + 1
if (field.children.get != null) {
columnSchema.setNumberOfChild(field.children.get.size)
allColumns ++= getAllChildren(field.children)
@@ -368,39 +376,6 @@ class TableNewProcessor(cm: TableModel) {
allColumns
}
- def getColumnSchema(dataType: DataType, colName: String, index: Integer, isCol: Boolean,
- encoders: java.util.List[Encoding], isDimensionCol: Boolean,
- colGroup: Integer, precision: Integer, scale: Integer, schemaOrdinal: Int): ColumnSchema = {
- val columnSchema = new ColumnSchema()
- columnSchema.setDataType(dataType)
- columnSchema.setColumnName(colName)
- val highCardinalityDims = cm.highcardinalitydims.getOrElse(Seq())
- if (highCardinalityDims.contains(colName)) {
- encoders.remove(Encoding.DICTIONARY)
- }
- if (dataType == DataTypes.DATE) {
- encoders.add(Encoding.DIRECT_DICTIONARY)
- }
- if (dataType == DataTypes.TIMESTAMP && !highCardinalityDims.contains(colName)) {
- encoders.add(Encoding.DIRECT_DICTIONARY)
- }
- columnSchema.setEncodingList(encoders)
- val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator
- val columnUniqueId = colUniqueIdGenerator.generateUniqueId(cm.databaseName,
- columnSchema)
- columnSchema.setColumnUniqueId(columnUniqueId)
- columnSchema.setColumnReferenceId(columnUniqueId)
- columnSchema.setColumnar(isCol)
- columnSchema.setDimensionColumn(isDimensionCol)
- columnSchema.setColumnGroup(colGroup)
- columnSchema.setPrecision(precision)
- columnSchema.setScale(scale)
- columnSchema.setSchemaOrdinal(schemaOrdinal)
- columnSchema.setSortColumn(false)
- // TODO: Need to fill RowGroupID, converted type
- // & Number of Children after DDL finalization
- columnSchema
- }
// process create dml fields and create wrapper TableInfo object
def process: TableInfo = {
@@ -414,38 +389,34 @@ class TableNewProcessor(cm: TableModel) {
val field = cm.dimCols.find(keyDim equals _.column).get
val encoders = new java.util.ArrayList[Encoding]()
encoders.add(Encoding.DICTIONARY)
- val columnSchema: ColumnSchema = getColumnSchema(
- DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
- field.name.getOrElse(field.column),
- index,
- isCol = true,
+ val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema(
+ field,
encoders,
isDimensionCol = true,
- -1,
field.precision,
field.scale,
- field.schemaOrdinal)
+ field.schemaOrdinal,
+ cm.highcardinalitydims.getOrElse(Seq()),
+ cm.databaseName)
columnSchema.setSortColumn(true)
allColumns :+= columnSchema
index = index + 1
}
- cm.dimCols.foreach(field => {
+ cm.dimCols.foreach { field =>
val sortField = cm.sortKeyDims.get.find(field.column equals _)
if (sortField.isEmpty) {
val encoders = new java.util.ArrayList[Encoding]()
encoders.add(Encoding.DICTIONARY)
- val columnSchema: ColumnSchema = getColumnSchema(
- DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
- field.name.getOrElse(field.column),
- index,
- isCol = true,
+ val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema(
+ field,
encoders,
isDimensionCol = true,
- -1,
field.precision,
field.scale,
- field.schemaOrdinal)
+ field.schemaOrdinal,
+ cm.highcardinalitydims.getOrElse(Seq()),
+ cm.databaseName)
allColumns :+= columnSchema
index = index + 1
if (field.children.isDefined && field.children.get != null) {
@@ -453,37 +424,37 @@ class TableNewProcessor(cm: TableModel) {
allColumns ++= getAllChildren(field.children)
}
}
- })
+ }
- cm.msrCols.foreach(field => {
+ cm.msrCols.foreach { field =>
val encoders = new java.util.ArrayList[Encoding]()
- val columnSchema: ColumnSchema = getColumnSchema(
- DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
- field.name.getOrElse(field.column),
- index,
- isCol = true,
+ val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema(
+ field,
encoders,
isDimensionCol = false,
- -1,
field.precision,
field.scale,
- field.schemaOrdinal)
+ field.schemaOrdinal,
+ cm.highcardinalitydims.getOrElse(Seq()),
+ cm.databaseName)
allColumns :+= columnSchema
index = index + 1
measureCount += 1
- })
+ }
// Check if there is any duplicate measures or dimensions.
// Its based on the dimension name and measure name
- allColumns.groupBy(_.getColumnName).foreach(f => if (f._2.size > 1) {
- val name = f._1
- LOGGER.error(s"Duplicate column found with name: $name")
- LOGGER.audit(
- s"Validation failed for Create/Alter Table Operation " +
- s"for ${ cm.databaseName }.${ cm.tableName }" +
- s"Duplicate column found with name: $name")
- sys.error(s"Duplicate dimensions found with name: $name")
- })
+ allColumns.groupBy(_.getColumnName).foreach { f =>
+ if (f._2.size > 1) {
+ val name = f._1
+ LOGGER.error(s"Duplicate column found with name: $name")
+ LOGGER.audit(
+ s"Validation failed for Create/Alter Table Operation " +
+ s"for ${ cm.databaseName }.${ cm.tableName }" +
+ s"Duplicate column found with name: $name")
+ sys.error(s"Duplicate dimensions found with name: $name")
+ }
+ }
val highCardinalityDims = cm.highcardinalitydims.getOrElse(Seq())
@@ -508,13 +479,21 @@ class TableNewProcessor(cm: TableModel) {
// Adding dummy measure if no measure is provided
if (measureCount == 0) {
val encoders = new java.util.ArrayList[Encoding]()
- val columnSchema: ColumnSchema = getColumnSchema(DataTypes.DOUBLE,
+ val field = Field(
CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE,
- index,
- true,
+ Some(DataTypes.DOUBLE.getName),
+ Some(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE),
+ None
+ )
+ val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema(
+ field,
encoders,
- false,
- -1, 0, 0, schemaOrdinal = -1)
+ isDimensionCol = false,
+ field.precision,
+ field.scale,
+ -1,
+ cm.highcardinalitydims.getOrElse(Seq()),
+ cm.databaseName)
columnSchema.setInvisible(true)
allColumns :+= columnSchema
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 1d3783e..64b440b 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -211,7 +211,7 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
CarbonScalaUtil.convertCarbonToSparkDataType(msr.getMeasure().getDataType()), true,
null);
- } else if (dataType == DataTypes.DECIMAL) {
+ } else if (DataTypes.isDecimal(dataType)) {
fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
new DecimalType(msr.getMeasure().getPrecision(), msr.getMeasure().getScale()), true,
null);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 98a37fa..44fbb37 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -151,8 +151,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
case DoubleType => CarbonType.DOUBLE.getName
case TimestampType => CarbonType.TIMESTAMP.getName
case DateType => CarbonType.DATE.getName
- case decimal: DecimalType => s"${CarbonType.DECIMAL.getName} (${decimal.precision}" +
- s", ${decimal.scale})"
+ case decimal: DecimalType => s"decimal(${decimal.precision}, ${decimal.scale})"
case other => sys.error(s"unsupported type: $other")
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index ef89771..91c07de 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -164,46 +164,49 @@ case class CarbonDictionaryDecoder(
|}
""".stripMargin
- val caseCode = getDictionaryColumnIds(index)._3.getDataType match {
- case CarbonDataTypes.INT =>
- s"""
- |int $value = Integer.parseInt(new String($valueIntern,
- |org.apache.carbondata.core.constants.CarbonCommonConstants
- |.DEFAULT_CHARSET_CLASS));
- """.stripMargin
- case CarbonDataTypes.SHORT =>
- s"""
- |short $value =
- |Short.parseShort(new String($valueIntern,
- |org.apache.carbondata.core.constants.CarbonCommonConstants
- |.DEFAULT_CHARSET_CLASS));
- """.stripMargin
- case CarbonDataTypes.DOUBLE =>
- s"""
- |double $value =
- |Double.parseDouble(new String($valueIntern,
- |org.apache.carbondata.core.constants.CarbonCommonConstants
- |.DEFAULT_CHARSET_CLASS));
- """.stripMargin
- case CarbonDataTypes.LONG =>
- s"""
- |long $value =
- |Long.parseLong(new String($valueIntern,
- |org.apache.carbondata.core.constants.CarbonCommonConstants
- |.DEFAULT_CHARSET_CLASS));
- """.stripMargin
- case CarbonDataTypes.DECIMAL =>
+ val caseCode =
+ if (CarbonDataTypes.isDecimal(getDictionaryColumnIds(index)._3.getDataType)) {
s"""
|org.apache.spark.sql.types.Decimal $value =
|Decimal.apply(new java.math.BigDecimal(
|new String($valueIntern, org.apache.carbondata.core.constants
|.CarbonCommonConstants.DEFAULT_CHARSET_CLASS)));
""".stripMargin
- case _ =>
- s"""
- | UTF8String $value = UTF8String.fromBytes($valueIntern);
+ } else {
+ getDictionaryColumnIds(index)._3.getDataType match {
+ case CarbonDataTypes.INT =>
+ s"""
+ |int $value = Integer.parseInt(new String($valueIntern,
+ |org.apache.carbondata.core.constants.CarbonCommonConstants
+ |.DEFAULT_CHARSET_CLASS));
""".stripMargin
- }
+ case CarbonDataTypes.SHORT =>
+ s"""
+ |short $value =
+ |Short.parseShort(new String($valueIntern,
+ |org.apache.carbondata.core.constants.CarbonCommonConstants
+ |.DEFAULT_CHARSET_CLASS));
+ """.stripMargin
+ case CarbonDataTypes.DOUBLE =>
+ s"""
+ |double $value =
+ |Double.parseDouble(new String($valueIntern,
+ |org.apache.carbondata.core.constants.CarbonCommonConstants
+ |.DEFAULT_CHARSET_CLASS));
+ """.stripMargin
+ case CarbonDataTypes.LONG =>
+ s"""
+ |long $value =
+ |Long.parseLong(new String($valueIntern,
+ |org.apache.carbondata.core.constants.CarbonCommonConstants
+ |.DEFAULT_CHARSET_CLASS));
+ """.stripMargin
+ case _ =>
+ s"""
+ | UTF8String $value = UTF8String.fromBytes($valueIntern);
+ """.stripMargin
+ }
+ }
code +=
s"""
|$caseCode
@@ -381,29 +384,31 @@ object CarbonDictionaryDecoder {
*/
def convertCarbonToSparkDataType(carbonDimension: CarbonDimension,
relation: CarbonRelation): types.DataType = {
- carbonDimension.getDataType match {
- case CarbonDataTypes.STRING => StringType
- case CarbonDataTypes.SHORT => ShortType
- case CarbonDataTypes.INT => IntegerType
- case CarbonDataTypes.LONG => LongType
- case CarbonDataTypes.DOUBLE => DoubleType
- case CarbonDataTypes.BOOLEAN => BooleanType
- case CarbonDataTypes.DECIMAL =>
- val scale: Int = carbonDimension.getColumnSchema.getScale
- val precision: Int = carbonDimension.getColumnSchema.getPrecision
- if (scale == 0 && precision == 0) {
- DecimalType(18, 2)
- } else {
- DecimalType(precision, scale)
- }
- case CarbonDataTypes.TIMESTAMP => TimestampType
- case CarbonDataTypes.DATE => DateType
- case CarbonDataTypes.STRUCT =>
- CarbonMetastoreTypes
- .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>")
- case CarbonDataTypes.ARRAY =>
- CarbonMetastoreTypes
- .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>")
+ if (CarbonDataTypes.isDecimal(carbonDimension.getDataType)) {
+ val scale: Int = carbonDimension.getColumnSchema.getScale
+ val precision: Int = carbonDimension.getColumnSchema.getPrecision
+ if (scale == 0 && precision == 0) {
+ DecimalType(18, 2)
+ } else {
+ DecimalType(precision, scale)
+ }
+ } else {
+ carbonDimension.getDataType match {
+ case CarbonDataTypes.STRING => StringType
+ case CarbonDataTypes.SHORT => ShortType
+ case CarbonDataTypes.INT => IntegerType
+ case CarbonDataTypes.LONG => LongType
+ case CarbonDataTypes.DOUBLE => DoubleType
+ case CarbonDataTypes.BOOLEAN => BooleanType
+ case CarbonDataTypes.TIMESTAMP => TimestampType
+ case CarbonDataTypes.DATE => DateType
+ case CarbonDataTypes.STRUCT =>
+ CarbonMetastoreTypes
+ .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>")
+ case CarbonDataTypes.ARRAY =>
+ CarbonMetastoreTypes
+ .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>")
+ }
}
}
@@ -464,34 +469,6 @@ class CarbonDecoderRDD(
}
}
- def convertCarbonToSparkDataType(carbonDimension: CarbonDimension,
- relation: CarbonRelation): types.DataType = {
- carbonDimension.getDataType match {
- case CarbonDataTypes.STRING => StringType
- case CarbonDataTypes.SHORT => ShortType
- case CarbonDataTypes.INT => IntegerType
- case CarbonDataTypes.LONG => LongType
- case CarbonDataTypes.DOUBLE => DoubleType
- case CarbonDataTypes.BOOLEAN => BooleanType
- case CarbonDataTypes.DECIMAL =>
- val scale: Int = carbonDimension.getColumnSchema.getScale
- val precision: Int = carbonDimension.getColumnSchema.getPrecision
- if (scale == 0 && precision == 0) {
- DecimalType(18, 2)
- } else {
- DecimalType(precision, scale)
- }
- case CarbonDataTypes.TIMESTAMP => TimestampType
- case CarbonDataTypes.DATE => DateType
- case CarbonDataTypes.STRUCT =>
- CarbonMetastoreTypes
- .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>")
- case CarbonDataTypes.ARRAY =>
- CarbonMetastoreTypes
- .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>")
- }
- }
-
val getDictionaryColumnIds = {
val dictIds: Array[(String, ColumnIdentifier, CarbonDimension)] = output.map { a =>
val attr = aliasMap.getOrElse(a, a)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index 6bac0da..2c476ed 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -16,7 +16,6 @@
*/
package org.apache.spark.sql.hive
-import java.util
import java.util.LinkedHashSet
import scala.Array.canBuildFrom
@@ -29,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Stati
import org.apache.spark.sql.types._
import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.datatype.DataTypes.DECIMAL
+import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.path.CarbonStorePath
@@ -164,7 +163,7 @@ case class CarbonRelation(
def addDecimalScaleAndPrecision(dimval: CarbonColumn, dataType: String): String = {
var dType = dataType
- if (dimval.getDataType == DECIMAL) {
+ if (DataTypes.isDecimal(dimval.getDataType)) {
dType +=
"(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")"
}
@@ -184,7 +183,7 @@ case class CarbonRelation(
def addDecimalScaleAndPrecision(dimval: CarbonDimension, dataType: String): String = {
var dType = dataType
- if (dimval.getDataType == DECIMAL) {
+ if (DataTypes.isDecimal(dimval.getDataType)) {
dType +=
"(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java
index a41f734..f24d24f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java
@@ -42,7 +42,7 @@ public class HashPartitionerImpl implements Partitioner<Object[]> {
if (dataType == DataTypes.SHORT || dataType == DataTypes.INT || dataType == DataTypes.LONG) {
hashes[i] = new IntegralHash(indexes.get(i));
} else if (dataType == DataTypes.DOUBLE || dataType == DataTypes.FLOAT ||
- dataType == DataTypes.DECIMAL) {
+ DataTypes.isDecimal(dataType)) {
hashes[i] = new DecimalHash(indexes.get(i));
} else {
hashes[i] = new StringHash(indexes.get(i));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
index 1d8f941..e5583c2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
@@ -147,7 +147,7 @@ public class UnsafeCarbonRowPage {
Double doubleVal = (Double) value;
CarbonUnsafe.getUnsafe().putDouble(baseObject, address + size, doubleVal);
size += 8;
- } else if (dataType == DataTypes.DECIMAL) {
+ } else if (DataTypes.isDecimal(dataType)) {
BigDecimal decimalVal = (BigDecimal) value;
byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(decimalVal);
CarbonUnsafe.getUnsafe()
@@ -233,7 +233,7 @@ public class UnsafeCarbonRowPage {
Double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
size += 8;
rowToFill[dimensionSize + mesCount] = doubleVal;
- } else if (dataType == DataTypes.DECIMAL) {
+ } else if (DataTypes.isDecimal(dataType)) {
short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
byte[] bigDecimalInBytes = new byte[aShort];
size += 2;
@@ -315,7 +315,7 @@ public class UnsafeCarbonRowPage {
double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
size += 8;
stream.writeDouble(doubleVal);
- } else if (dataType == DataTypes.DECIMAL) {
+ } else if (DataTypes.isDecimal(dataType)) {
short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
byte[] bigDecimalInBytes = new byte[aShort];
size += 2;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index 404a521..3972b1c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -335,7 +335,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
row[dimensionCount + mesCount] = stream.readLong();
} else if (dataType == DataTypes.DOUBLE) {
row[dimensionCount + mesCount] = stream.readDouble();
- } else if (dataType == DataTypes.DECIMAL) {
+ } else if (DataTypes.isDecimal(dataType)) {
short aShort = stream.readShort();
byte[] bigDecimalInBytes = new byte[aShort];
stream.readFully(bigDecimalInBytes);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index 9f7d6c3..0c71adc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -329,7 +329,7 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
} else if (dataType == DataTypes.DOUBLE) {
rowData.putDouble(size, (Double) value);
size += 8;
- } else if (dataType == DataTypes.DECIMAL) {
+ } else if (DataTypes.isDecimal(dataType)) {
byte[] bigDecimalInBytes = (byte[]) value;
rowData.putShort(size, (short) bigDecimalInBytes.length);
size += 2;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index a361f3a..187ba06 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -257,7 +257,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
* @return
*/
private Object getConvertedMeasureValue(Object value, DataType type) {
- if (type == DataTypes.DECIMAL) {
+ if (DataTypes.isDecimal(type)) {
if (value != null) {
value = ((Decimal) value).toJavaBigDecimal();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
index ebc811c..266e69a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
@@ -357,7 +357,7 @@ public class IntermediateFileMerger implements Callable<Void> {
stream.writeLong((long) NonDictionaryUtil.getMeasure(fieldIndex, row));
} else if (dataType == DataTypes.DOUBLE) {
stream.writeDouble((Double) NonDictionaryUtil.getMeasure(fieldIndex, row));
- } else if (dataType == DataTypes.DECIMAL) {
+ } else if (DataTypes.isDecimal(dataType)) {
byte[] bigDecimalInBytes = (byte[]) NonDictionaryUtil.getMeasure(fieldIndex, row);
stream.writeInt(bigDecimalInBytes.length);
stream.write(bigDecimalInBytes);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
index 8a60657..5b9e091 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
@@ -305,7 +305,7 @@ public class SortDataRows {
stream.writeLong((Long) value);
} else if (dataType == DataTypes.DOUBLE) {
stream.writeDouble((Double) value);
- } else if (dataType == DataTypes.DECIMAL) {
+ } else if (DataTypes.isDecimal(dataType)) {
BigDecimal val = (BigDecimal) value;
byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
stream.writeInt(bigDecimalInBytes.length);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
index 91bc83c..2f87cf7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
@@ -355,7 +355,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
measures[index++] = stream.readLong();
} else if (dataType == DataTypes.DOUBLE) {
measures[index++] = stream.readDouble();
- } else if (dataType == DataTypes.DECIMAL) {
+ } else if (DataTypes.isDecimal(dataType)) {
int len = stream.readInt();
byte[] buff = new byte[len];
stream.readFully(buff);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index fdf44cf..7882cd4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -504,7 +504,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
new ArrayList<Integer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
DataType[] type = model.getMeasureDataType();
for (int j = 0; j < type.length; j++) {
- if (type[j] != DataTypes.BYTE && type[j] != DataTypes.DECIMAL) {
+ if (type[j] != DataTypes.BYTE && !DataTypes.isDecimal(type[j])) {
otherMeasureIndexList.add(j);
} else {
customMeasureIndexList.add(j);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f209e8ee/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index d2cf1c4..6a9aba1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -114,14 +114,13 @@ public class TablePage {
for (int i = 0; i < measurePages.length; i++) {
TableSpec.MeasureSpec spec = model.getTableSpec().getMeasureSpec(i);
ColumnPage page;
- if (spec.getSchemaDataType() == DataTypes.DECIMAL) {
+ if (DataTypes.isDecimal(spec.getSchemaDataType())) {
page = ColumnPage.newDecimalPage(spec, dataTypes[i], pageSize);
} else {
page = ColumnPage.newPage(spec, dataTypes[i], pageSize);
}
page.setStatsCollector(
- PrimitivePageStatsCollector.newInstance(
- dataTypes[i], spec.getScale(), spec.getPrecision()));
+ PrimitivePageStatsCollector.newInstance(dataTypes[i]));
measurePages[i] = page;
}
boolean hasNoDictionary = noDictDimensionPages.length > 0;
@@ -183,7 +182,7 @@ public class TablePage {
// in compaction flow the measure with decimal type will come as Spark decimal.
// need to convert it to byte array.
- if (measurePages[i].getDataType() == DataTypes.DECIMAL &&
+ if (DataTypes.isDecimal(measurePages[i].getDataType()) &&
model.isCompactionFlow() &&
value != null) {
value = ((Decimal) value).toJavaBigDecimal();