You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/02/27 08:19:05 UTC
[08/16] carbondata git commit: [CARBONDATA-2168] Support global sort
for standard hive partitioning
http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
index 4806f9f..9ea58a9 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
@@ -17,6 +17,8 @@
package org.apache.carbondata.spark.rdd
+import java.util
+
import scala.collection.JavaConverters._
import org.apache.spark.{Partition, SparkContext, TaskContext}
@@ -59,10 +61,12 @@ class CarbonDropPartitionRDD(
val iter = new Iterator[String] {
val split = theSplit.asInstanceOf[CarbonDropPartition]
logInfo("Dropping partition information from : " + split.segmentPath)
-
+ partitions.toList.asJava
+ val partitionList = new util.ArrayList[util.List[String]]()
+ partitionList.add(partitions.toList.asJava)
new PartitionMapFileStore().dropPartitions(
split.segmentPath,
- partitions.toList.asJava,
+ partitionList,
uniqueId,
partialMatch)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 748945d..73be3c8 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -26,14 +26,18 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.command.DataTypeInfo
import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
import org.apache.carbondata.common.constants.LoggerAction
+import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
+import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory
import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, StructField => CarbonStructField}
+import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
-import org.apache.carbondata.core.util.{ByteUtil, CarbonSessionInfo}
-import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
+import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, ColumnSchema}
+import org.apache.carbondata.core.util.{CarbonSessionInfo, DataTypeUtil}
object CarbonScalaUtil {
def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {
@@ -196,21 +200,85 @@ object CarbonScalaUtil {
/**
* Converts incoming value to String after converting data as per the data type.
* @param value Input value to convert
- * @param dataType Datatype to convert and then convert to String
- * @param timeStampFormat Timestamp format to convert in case of timestamp datatypes
- * @param dateFormat DataFormat to convert in case of DateType datatype
+ * @param column column which it value belongs to
* @return converted String
*/
- def convertToCarbonFormat(value: String,
- dataType: DataType,
- timeStampFormat: SimpleDateFormat,
- dateFormat: SimpleDateFormat): String = {
+ def convertToCarbonFormat(
+ value: String,
+ column: CarbonColumn,
+ forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary],
+ table: CarbonTable): String = {
+ if (column.hasEncoding(Encoding.DICTIONARY)) {
+ if (column.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+ if (column.getDataType.equals(CarbonDataTypes.TIMESTAMP)) {
+ val time = DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator(
+ column.getDataType,
+ CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
+ ).getValueFromSurrogate(value.toInt).toString
+ return DateTimeUtils.timestampToString(time.toLong * 1000)
+ } else if (column.getDataType.equals(CarbonDataTypes.DATE)) {
+ val date = DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator(
+ column.getDataType,
+ CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT
+ ).getValueFromSurrogate(value.toInt).toString
+ return DateTimeUtils.dateToString(date.toInt)
+ }
+ }
+ val dictionaryPath =
+ table.getTableInfo.getFactTable.getTableProperties.get(
+ CarbonCommonConstants.DICTIONARY_PATH)
+ val dictionaryColumnUniqueIdentifier = new DictionaryColumnUniqueIdentifier(
+ table.getAbsoluteTableIdentifier,
+ column.getColumnIdentifier, column.getDataType,
+ dictionaryPath)
+ return forwardDictionaryCache.get(
+ dictionaryColumnUniqueIdentifier).getDictionaryValueForKey(value.toInt)
+ }
try {
- dataType match {
- case TimestampType =>
- timeStampFormat.format(DateTimeUtils.stringToTime(value))
- case DateType =>
- dateFormat.format(DateTimeUtils.stringToTime(value))
+ column.getDataType match {
+ case CarbonDataTypes.TIMESTAMP =>
+ DateTimeUtils.timestampToString(value.toLong * 1000)
+ case CarbonDataTypes.DATE =>
+ DateTimeUtils.dateToString(DateTimeUtils.millisToDays(value.toLong))
+ case _ => value
+ }
+ } catch {
+ case e: Exception =>
+ value
+ }
+ }
+
+ /**
+ * Converts incoming value to String after converting data as per the data type.
+ * @param value Input value to convert
+ * @param column column which it value belongs to
+ * @return converted String
+ */
+ def convertStaticPartitions(
+ value: String,
+ column: ColumnSchema,
+ table: CarbonTable): String = {
+ try {
+ if (column.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+ if (column.getDataType.equals(CarbonDataTypes.TIMESTAMP)) {
+ return DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator(
+ column.getDataType,
+ CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
+ ).generateDirectSurrogateKey(value).toString
+ } else if (column.getDataType.equals(CarbonDataTypes.DATE)) {
+ return DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator(
+ column.getDataType,
+ CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT
+ ).generateDirectSurrogateKey(value).toString
+ }
+ }
+ column.getDataType match {
+ case CarbonDataTypes.TIMESTAMP =>
+ DataTypeUtil.getDataDataTypeForNoDictionaryColumn(value,
+ column.getDataType,
+ CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT).toString
+ case CarbonDataTypes.DATE =>
+ DateTimeUtils.stringToDate(UTF8String.fromString(value)).get.toString
case _ => value
}
} catch {
@@ -229,11 +297,11 @@ object CarbonScalaUtil {
partitionSpec: Map[String, String],
table: CarbonTable,
timeFormat: SimpleDateFormat,
- dateFormat: SimpleDateFormat,
- serializationNullFormat: String,
- badRecordAction: String,
- isEmptyBadRecord: Boolean): Map[String, String] = {
+ dateFormat: SimpleDateFormat): Map[String, String] = {
val hivedefaultpartition = "__HIVE_DEFAULT_PARTITION__"
+ val cacheProvider: CacheProvider = CacheProvider.getInstance
+ val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
+ cacheProvider.createCache(CacheType.FORWARD_DICTIONARY)
partitionSpec.map{ case (col, pvalue) =>
// replace special string with empty value.
val value = if (pvalue == null) {
@@ -246,17 +314,15 @@ object CarbonScalaUtil {
val carbonColumn = table.getColumnByName(table.getTableName, col.toLowerCase)
val dataType = CarbonScalaUtil.convertCarbonToSparkDataType(carbonColumn.getDataType)
try {
- if (isEmptyBadRecord && value.length == 0 &&
- badRecordAction.equalsIgnoreCase(LoggerAction.IGNORE.toString) &&
- dataType != StringType) {
- (col, hiveignorepartition)
- } else if (!isEmptyBadRecord && value.length == 0 && dataType != StringType) {
- (col, hivedefaultpartition)
- } else if (value.equals(hivedefaultpartition)) {
+ if (value.equals(hivedefaultpartition)) {
(col, value)
} else {
- val convertedString = CarbonScalaUtil.convertToString(
- value, dataType, timeFormat, dateFormat, serializationNullFormat)
+ val convertedString =
+ CarbonScalaUtil.convertToCarbonFormat(
+ value,
+ carbonColumn,
+ forwardDictionaryCache,
+ table)
if (convertedString == null) {
(col, hivedefaultpartition)
} else {
@@ -265,13 +331,7 @@ object CarbonScalaUtil {
}
} catch {
case e: Exception =>
- // If it is bad record ignore case then add with special string so that it will be
- // filtered after this.
- if (badRecordAction.equalsIgnoreCase(LoggerAction.IGNORE.toString)) {
- (col, hiveignorepartition)
- } else {
- (col, hivedefaultpartition)
- }
+ (col, value)
}
}
}
@@ -306,10 +366,7 @@ object CarbonScalaUtil {
f.spec,
table,
timeFormat,
- dateFormat,
- serializeFormat,
- badRecordAction,
- isEmptyBadRecord)
+ dateFormat)
f.copy(spec = changedSpec)
}.filterNot{ p =>
// Filter the special bad record ignore case string
http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 7d49c11..9bdaddb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -23,6 +23,7 @@ import java.util.UUID
import scala.collection.JavaConverters._
import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
@@ -32,15 +33,15 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, GenericInternalRow}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, Expression, GenericInternalRow, SortOrder}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping, UpdateTableModel}
import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFileIndex, FindDataSourceTable, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.optimizer.CarbonFilters
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
@@ -55,7 +56,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
import org.apache.carbondata.events.exception.PreEventException
@@ -65,12 +66,14 @@ import org.apache.carbondata.processing.loading.TableProcessingOperations
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
import org.apache.carbondata.processing.loading.exception.NoRetryException
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.processing.loading.sort.SortScopeOptions
+import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark
import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, GlobalDictionaryUtil}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, GlobalDictionaryUtil, SparkDataTypeConverterImpl}
case class CarbonLoadDataCommand(
databaseNameOp: Option[String],
@@ -505,7 +508,7 @@ case class CarbonLoadDataCommand(
carbonLoadModel: CarbonLoadModel,
hadoopConf: Configuration,
dataFrame: Option[DataFrame],
- operationContext: OperationContext) = {
+ operationContext: OperationContext): Unit = {
val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val identifier = TableIdentifier(table.getTableName, Some(table.getDatabaseName))
val catalogTable: CatalogTable = logicalPartitionRelation.catalogTable.get
@@ -544,17 +547,76 @@ case class CarbonLoadDataCommand(
CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
isEmptyBadRecord)
CarbonSession.threadSet("partition.operationcontext", operationContext)
+ // input data from csv files. Convert to logical plan
+ val allCols = new ArrayBuffer[String]()
+ allCols ++= table.getAllDimensions.asScala.map(_.getColName)
+ allCols ++= table.getAllMeasures.asScala.map(_.getColName)
+ var attributes =
+ StructType(allCols.map(StructField(_, StringType))).toAttributes
+
+ var partitionsLen = 0
+ val sortScope = CarbonDataProcessorUtil.getSortScope(carbonLoadModel.getSortScope)
+ def transformQuery(rdd: RDD[Row], isDataFrame: Boolean) = {
+ val updatedRdd = convertData(rdd, sparkSession, carbonLoadModel, isDataFrame)
+ val catalogAttributes = catalogTable.schema.toAttributes
+ attributes = attributes.map(a => {
+ catalogAttributes.find(_.name.equalsIgnoreCase(a.name)).get
+ })
+ attributes = attributes.map { attr =>
+ val column = table.getColumnByName(table.getTableName, attr.name)
+ if (column.hasEncoding(Encoding.DICTIONARY)) {
+ AttributeReference(
+ attr.name,
+ IntegerType,
+ attr.nullable,
+ attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated)
+ } else if (attr.dataType == TimestampType || attr.dataType == DateType) {
+ AttributeReference(
+ attr.name,
+ LongType,
+ attr.nullable,
+ attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated)
+ } else {
+ attr
+ }
+ }
+ // Only select the required columns
+ val output = if (partition.nonEmpty) {
+ val lowerCasePartition = partition.map { case (key, value) => (key.toLowerCase, value) }
+ catalogTable.schema.map { attr =>
+ attributes.find(_.name.equalsIgnoreCase(attr.name)).get
+ }.filter(attr => lowerCasePartition.getOrElse(attr.name.toLowerCase, None).isEmpty)
+ } else {
+ catalogTable.schema.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get)
+ }
+ partitionsLen = rdd.partitions.length
+ val child = Project(output, LogicalRDD(attributes, updatedRdd)(sparkSession))
+ if (sortScope == SortScopeOptions.SortScope.GLOBAL_SORT) {
+ val sortColumns = table.getSortColumns(table.getTableName)
+ Sort(output.filter(f => sortColumns.contains(f.name)).map(SortOrder(_, Ascending)),
+ true,
+ child)
+ } else {
+ child
+ }
+ }
+
try {
val query: LogicalPlan = if (dataFrame.isDefined) {
val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
- val attributes =
+ val dfAttributes =
StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes
- val len = attributes.length
+ val partitionValues = if (partition.nonEmpty) {
+ partition.values.filter(_.nonEmpty).map(_.get).toArray
+ } else {
+ Array[String]()
+ }
+ val len = dfAttributes.length
val rdd = dataFrame.get.rdd.map { f =>
val data = new Array[Any](len)
var i = 0
- while (i < len) {
+ while (i < f.length) {
data(i) =
UTF8String.fromString(
CarbonScalaUtil.getString(f.get(i),
@@ -565,20 +627,32 @@ case class CarbonLoadDataCommand(
dateFormat))
i = i + 1
}
- InternalRow.fromSeq(data)
+ if (partitionValues.length > 0) {
+ var j = 0
+ while (i < len) {
+ data(i) = UTF8String.fromString(partitionValues(j))
+ j = j + 1
+ i = i + 1
+ }
+ }
+ Row.fromSeq(data)
}
- if (updateModel.isDefined) {
+ val transRdd = if (updateModel.isDefined) {
// Get the updated query plan in case of update scenario
- getLogicalQueryForUpdate(sparkSession, catalogTable, attributes, rdd)
+ Dataset.ofRows(
+ sparkSession,
+ getLogicalQueryForUpdate(
+ sparkSession,
+ catalogTable,
+ dfAttributes,
+ rdd.map(row => InternalRow.fromSeq(row.toSeq)),
+ carbonLoadModel)).rdd
} else {
- LogicalRDD(attributes, rdd)(sparkSession)
+ rdd
}
-
+ transformQuery(transRdd, true)
} else {
- // input data from csv files. Convert to logical plan
- val attributes =
- StructType(carbonLoadModel.getCsvHeaderColumns.map(
- StructField(_, StringType))).toAttributes
+
val rowDataTypes = attributes.map { attribute =>
catalogTable.schema.find(_.name.equalsIgnoreCase(attribute.name)) match {
case Some(attr) => attr.dataType
@@ -592,41 +666,12 @@ case class CarbonLoadDataCommand(
case _ => false
}
}
- val len = rowDataTypes.length
- var rdd =
- DataLoadingUtil.csvFileScanRDD(
- sparkSession,
- model = carbonLoadModel,
- hadoopConf)
- .map { row =>
- val data = new Array[Any](len)
- var i = 0
- val input = row.asInstanceOf[GenericInternalRow].values.asInstanceOf[Array[String]]
- val inputLen = Math.min(input.length, len)
- while (i < inputLen) {
- data(i) = UTF8String.fromString(input(i))
- // If partition column then update empty value with special string otherwise spark
- // makes it as null so we cannot internally handle badrecords.
- if (partitionColumns(i)) {
- if (input(i) != null && input(i).isEmpty) {
- data(i) = UTF8String.fromString(CarbonCommonConstants.MEMBER_DEFAULT_VAL)
- }
- }
- i = i + 1
- }
- InternalRow.fromSeq(data)
-
- }
- // Only select the required columns
- val output = if (partition.nonEmpty) {
- val lowerCasePartition = partition.map{case(key, value) => (key.toLowerCase, value)}
- catalogTable.schema.map { attr =>
- attributes.find(_.name.equalsIgnoreCase(attr.name)).get
- }.filter(attr => lowerCasePartition.getOrElse(attr.name.toLowerCase, None).isEmpty)
- } else {
- catalogTable.schema.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get)
- }
- Project(output, LogicalRDD(attributes, rdd)(sparkSession))
+ val columnCount = carbonLoadModel.getCsvHeaderColumns.length
+ var rdd = DataLoadingUtil.csvFileScanRDD(
+ sparkSession,
+ model = carbonLoadModel,
+ hadoopConf).map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
+ transformQuery(rdd.asInstanceOf[RDD[Row]], false)
}
val convertRelation = convertToLogicalRelation(
catalogTable,
@@ -635,24 +680,29 @@ case class CarbonLoadDataCommand(
carbonLoadModel,
sparkSession,
operationContext)
+ val logicalPlan = if (sortScope == SortScopeOptions.SortScope.GLOBAL_SORT) {
+ var numPartitions =
+ CarbonDataProcessorUtil.getGlobalSortPartitions(carbonLoadModel.getGlobalSortPartitions)
+ if (numPartitions <= 0) {
+ numPartitions = partitionsLen
+ }
+ if (numPartitions > 0) {
+ Dataset.ofRows(sparkSession, query).repartition(numPartitions).logicalPlan
+ } else {
+ query
+ }
+ } else {
+ query
+ }
+
val convertedPlan =
CarbonReflectionUtils.getInsertIntoCommand(
table = convertRelation,
partition = partition,
- query = query,
+ query = logicalPlan,
overwrite = false,
ifPartitionNotExists = false)
- if (isOverwriteTable && partition.nonEmpty) {
- overwritePartition(
- sparkSession,
- table,
- convertedPlan,
- serializationNullFormat,
- badRecordAction,
- isEmptyBadRecord.toBoolean)
- } else {
- Dataset.ofRows(sparkSession, convertedPlan)
- }
+ Dataset.ofRows(sparkSession, convertedPlan)
} finally {
CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT)
@@ -660,6 +710,14 @@ case class CarbonLoadDataCommand(
CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
CarbonSession.threadUnset("partition.operationcontext")
+ if (isOverwriteTable) {
+ DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier)
+ // Clean the overwriting segments if any.
+ new PartitionMapFileStore().cleanSegments(
+ table,
+ CarbonFilters.getPartitions(Seq.empty, sparkSession, identifier).asJava,
+ false)
+ }
}
try {
// Trigger auto compaction
@@ -676,6 +734,48 @@ case class CarbonLoadDataCommand(
}
}
+ private def convertData(
+ originRDD: RDD[Row],
+ sparkSession: SparkSession,
+ model: CarbonLoadModel,
+ isDataFrame: Boolean): RDD[InternalRow] = {
+ model.setPartitionId("0")
+ val sc = sparkSession.sparkContext
+ val modelBroadcast = sc.broadcast(model)
+ val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator")
+
+ val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator")
+ // 1. Input
+ var convertRDD =
+ if (isDataFrame) {
+ originRDD.mapPartitions{rows =>
+ DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast)
+ }
+ } else {
+ originRDD.map{row =>
+ val array = new Array[AnyRef](row.length)
+ var i = 0
+ while (i < array.length) {
+ array(i) = row.get(i).asInstanceOf[AnyRef]
+ i = i + 1
+ }
+ array
+ }
+ }
+ val finalRDD = convertRDD.mapPartitionsWithIndex { case (index, rows) =>
+ DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
+ DataLoadProcessorStepOnSpark.inputAndconvertFunc(
+ rows,
+ index,
+ modelBroadcast,
+ partialSuccessAccum,
+ inputStepRowCounter,
+ keepActualData = true)
+ }.filter(_ != null).map(row => InternalRow.fromSeq(row.getData))
+
+ finalRDD
+ }
+
/**
* Create the logical plan for update scenario. Here we should drop the segmentid column from the
* input rdd.
@@ -684,7 +784,8 @@ case class CarbonLoadDataCommand(
sparkSession: SparkSession,
catalogTable: CatalogTable,
attributes: Seq[AttributeReference],
- rdd: RDD[InternalRow]): LogicalPlan = {
+ rdd: RDD[InternalRow],
+ carbonLoadModel: CarbonLoadModel): LogicalPlan = {
sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
// In case of update, we don't need the segmrntid column in case of partitioning
val dropAttributes = attributes.dropRight(1)
@@ -698,6 +799,8 @@ case class CarbonLoadDataCommand(
}
}.get
}
+ carbonLoadModel.setCsvHeader(catalogTable.schema.map(_.name.toLowerCase).mkString(","))
+ carbonLoadModel.setCsvHeaderColumns(carbonLoadModel.getCsvHeader.split(","))
Project(finalOutput, LogicalRDD(attributes, rdd)(sparkSession))
}
@@ -709,7 +812,16 @@ case class CarbonLoadDataCommand(
sparkSession: SparkSession,
operationContext: OperationContext): LogicalRelation = {
val table = loadModel.getCarbonDataLoadSchema.getCarbonTable
- val metastoreSchema = StructType(catalogTable.schema.fields.map(_.copy(dataType = StringType)))
+ val metastoreSchema = StructType(catalogTable.schema.fields.map{f =>
+ val column = table.getColumnByName(table.getTableName, f.name)
+ if (column.hasEncoding(Encoding.DICTIONARY)) {
+ f.copy(dataType = IntegerType)
+ } else if (f.dataType == TimestampType || f.dataType == DateType) {
+ f.copy(dataType = LongType)
+ } else {
+ f
+ }
+ })
val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
val catalog = new CatalogFileIndex(
sparkSession, catalogTable, sizeInBytes)
@@ -718,20 +830,18 @@ case class CarbonLoadDataCommand(
} else {
catalog.filterPartitions(Nil) // materialize all the partitions in memory
}
- val partitionSchema =
+ var partitionSchema =
StructType(table.getPartitionInfo(table.getTableName).getColumnSchemaList.asScala.map(field =>
metastoreSchema.fields.find(_.name.equalsIgnoreCase(field.getColumnName))).map(_.get))
- val overWriteLocal = if (overWrite && partition.nonEmpty) {
- false
- } else {
- overWrite
- }
val dataSchema =
StructType(metastoreSchema
- .filterNot(field => partitionSchema.contains(field.name)))
+ .filterNot(field => partitionSchema.contains(field)))
+ if (partition.nonEmpty) {
+ partitionSchema = StructType(partitionSchema.fields.map(_.copy(dataType = StringType)))
+ }
val options = new mutable.HashMap[String, String]()
options ++= catalogTable.storage.properties
- options += (("overwrite", overWriteLocal.toString))
+ options += (("overwrite", overWrite.toString))
options += (("onepass", loadModel.getUseOnePass.toString))
options += (("dicthost", loadModel.getDictionaryServerHost))
options += (("dictport", loadModel.getDictionaryServerPort.toString))
@@ -761,108 +871,6 @@ case class CarbonLoadDataCommand(
Some(catalogTable))
}
- /**
- * Overwrite the partition data if static partitions are specified.
- * @param sparkSession
- * @param table
- * @param logicalPlan
- */
- private def overwritePartition(
- sparkSession: SparkSession,
- table: CarbonTable,
- logicalPlan: LogicalPlan,
- serializationNullFormat: String,
- badRecordAction: String,
- isEmptyBadRecord: Boolean): Unit = {
- val identifier = TableIdentifier(table.getTableName, Some(table.getDatabaseName))
-
- // Update the partitions as per the datatype expect for time and datetype as we
- // expect user provides the format in standard spark/hive formats.
- val updatedPartitions = CarbonScalaUtil.updatePartitions(
- partition.filter(_._2.isDefined).map(f => (f._1, f._2.get)),
- table,
- timeFormat = null,
- dateFormat = null,
- serializationNullFormat,
- badRecordAction,
- isEmptyBadRecord)
- val existingPartitions = sparkSession.sessionState.catalog.listPartitions(
- identifier,
- Some(updatedPartitions))
- val partitionNames = existingPartitions.toList.flatMap { partition =>
- partition.spec.seq.map{case (column, value) => column + "=" + value}
- }.toSet
- val uniqueId = System.currentTimeMillis().toString
- val segments = new SegmentStatusManager(
- table.getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments
- // If any existing partitions need to be overwritten then drop from partitionmap
- if (partitionNames.nonEmpty) {
- try {
- // First drop the partitions from partition mapper files of each segment
- new CarbonDropPartitionRDD(
- sparkSession.sparkContext,
- table.getTablePath,
- segments.asScala,
- partitionNames.toSeq,
- uniqueId,
- partialMatch = false).collect()
- } catch {
- case e: Exception =>
- // roll back the drop partitions from carbon store
- new CarbonDropPartitionCommitRDD(
- sparkSession.sparkContext,
- table.getTablePath,
- segments.asScala,
- success = false,
- uniqueId,
- partitionNames.toSeq).collect()
- throw e
- }
-
- try {
- Dataset.ofRows(sparkSession, logicalPlan)
- } catch {
- case e: Exception =>
- // roll back the drop partitions from carbon store
- new CarbonDropPartitionCommitRDD(
- sparkSession.sparkContext,
- table.getTablePath,
- segments.asScala,
- success = false,
- uniqueId,
- partitionNames.toSeq).collect()
- throw e
- }
- // Commit the removed partitions in carbon store.
- new CarbonDropPartitionCommitRDD(
- sparkSession.sparkContext,
- table.getTablePath,
- segments.asScala,
- success = true,
- uniqueId,
- partitionNames.toSeq).collect()
- // get valid segments
- val validsegments =
- new SegmentStatusManager(
- table.getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments
- // Update the loadstatus with update time to clear cache from driver.
- CarbonUpdateUtil.updateTableMetadataStatus(
- new util.HashSet[String](validsegments),
- table,
- uniqueId,
- true,
- new util.ArrayList[String])
- DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier)
- // Clean the overwriting segments if any.
- new PartitionMapFileStore().cleanSegments(
- table,
- CarbonFilters.getPartitions(Seq.empty, sparkSession, identifier).asJava,
- false)
- } else {
- // Otherwise its a normal load
- Dataset.ofRows(sparkSession, logicalPlan)
- }
- }
def getDataFrameWithTupleID(): DataFrame = {
val fields = dataFrame.get.schema.fields
http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index 17749c8..d2c691b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -36,14 +36,17 @@ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.DataSourceRegister
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types._
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
import org.apache.carbondata.core.metadata.PartitionMapFileStore
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat}
import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
+import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -110,6 +113,7 @@ with Serializable {
model.setDictionaryServerHost(options.getOrElse("dicthost", null))
model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)
CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
+ model.setPartitionLoad(true)
// Set the update timestamp if user sets in case of update query. It needs to be updated
// in load status update time
val updateTimeStamp = options.get("updatetimestamp")
@@ -231,7 +235,9 @@ private class CarbonOutputWriter(path: String,
fieldTypes: Seq[DataType],
taskNo : String)
extends OutputWriter with AbstractCarbonOutputWriter {
- val partitions = getPartitionsFromPath(path, context).map(ExternalCatalogUtils.unescapePathName)
+ val model = CarbonTableOutputFormat.getLoadModel(context.getConfiguration)
+ val partitions =
+ getPartitionsFromPath(path, context, model).map(ExternalCatalogUtils.unescapePathName)
val staticPartition: util.HashMap[String, Boolean] = {
val staticPart = context.getConfiguration.get("carbon.staticpartition")
if (staticPart != null) {
@@ -272,24 +278,42 @@ private class CarbonOutputWriter(path: String,
val formattedPartitions = updatedPartitions.map {case (col, value) =>
// Only convert the static partitions to the carbon format and use it while loading data
// to carbon.
- if (staticPartition.asScala.getOrElse(col, false)) {
- (col, CarbonScalaUtil.convertToCarbonFormat(value,
- CarbonScalaUtil.convertCarbonToSparkDataType(
- table.getColumnByName(table.getTableName, col).getDataType),
- timeFormat,
- dateFormat))
- } else {
- (col, value)
- }
+ (col, value)
}
- (formattedPartitions, formattedPartitions.map(_._2))
+ (formattedPartitions, updatePartitions(formattedPartitions.map(_._2)))
} else {
- (updatedPartitions, updatedPartitions.map(_._2))
+ (updatedPartitions, updatePartitions(updatedPartitions.map(_._2)))
}
} else {
(Map.empty[String, String].toArray, Array.empty)
}
- val writable = new StringArrayWritable()
+
+ val writable = new ObjectArrayWritable
+
+ private def updatePartitions(partitionData: Seq[String]): Array[AnyRef] = {
+ model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo.getFactTable.getPartitionInfo
+ .getColumnSchemaList.asScala.zipWithIndex.map { case (col, index) =>
+
+ val dataType = if (col.hasEncoding(Encoding.DICTIONARY)) {
+ DataTypes.INT
+ } else if (col.getDataType.equals(DataTypes.TIMESTAMP) ||
+ col.getDataType.equals(DataTypes.DATE)) {
+ DataTypes.LONG
+ } else {
+ col.getDataType
+ }
+ if (staticPartition != null) {
+ DataTypeUtil.getDataBasedOnDataType(
+ CarbonScalaUtil.convertStaticPartitions(
+ partitionData(index),
+ col,
+ model.getCarbonDataLoadSchema.getCarbonTable),
+ dataType)
+ } else {
+ DataTypeUtil.getDataBasedOnDataType(partitionData(index), dataType)
+ }
+ }.toArray
+ }
private val recordWriter: CarbonRecordWriter = {
context.getConfiguration.set("carbon.outputformat.taskno", taskNo)
@@ -302,11 +326,18 @@ private class CarbonOutputWriter(path: String,
// TODO Implement writesupport interface to support writing Row directly to recordwriter
def writeCarbon(row: InternalRow): Unit = {
- val data = new Array[String](fieldTypes.length + partitionData.length)
+ val data = new Array[AnyRef](fieldTypes.length + partitionData.length)
var i = 0
while (i < fieldTypes.length) {
if (!row.isNullAt(i)) {
- data(i) = row.getString(i)
+ fieldTypes(i) match {
+ case StringType =>
+ data(i) = row.getString(i)
+ case d: DecimalType =>
+ data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+ case other =>
+ data(i) = row.get(i, other)
+ }
}
i += 1
}
@@ -349,10 +380,7 @@ private class CarbonOutputWriter(path: String,
updatedPartitions.toMap,
table,
timeFormat,
- dateFormat,
- serializeFormat,
- badRecordAction,
- isEmptyBadRecord)
+ dateFormat)
formattedPartitions.foreach(p => partitonList.add(p._1 + "=" + p._2))
new PartitionMapFileStore().writePartitionMapFile(
segmentPath,
@@ -360,10 +388,13 @@ private class CarbonOutputWriter(path: String,
partitonList)
}
- def getPartitionsFromPath(path: String, attemptContext: TaskAttemptContext): Array[String] = {
+ def getPartitionsFromPath(
+ path: String,
+ attemptContext: TaskAttemptContext,
+ model: CarbonLoadModel): Array[String] = {
var attemptId = attemptContext.getTaskAttemptID.toString + "/"
if (path.indexOf(attemptId) <= 0) {
- val model = CarbonTableOutputFormat.getLoadModel(attemptContext.getConfiguration)
+
attemptId = model.getTableName + "/"
}
val str = path.substring(path.indexOf(attemptId) + attemptId.length, path.lastIndexOf("/"))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java
index fb78deb..dc2fbbb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java
@@ -37,6 +37,8 @@ public class DataField implements Serializable {
private String timestampFormat;
+ private boolean useActualData;
+
public boolean hasDictionaryEncoding() {
return column.hasEncoding(Encoding.DICTIONARY);
}
@@ -60,4 +62,12 @@ public class DataField implements Serializable {
public void setTimestampFormat(String timestampFormat) {
this.timestampFormat = timestampFormat;
}
+
+ public boolean isUseActualData() {
+ return useActualData;
+ }
+
+ public void setUseActualData(boolean useActualData) {
+ this.useActualData = useActualData;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index f7eff81..ba24d41 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -41,6 +41,7 @@ import org.apache.carbondata.processing.loading.steps.DataConverterProcessorStep
import org.apache.carbondata.processing.loading.steps.DataConverterProcessorWithBucketingStepImpl;
import org.apache.carbondata.processing.loading.steps.DataWriterBatchProcessorStepImpl;
import org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImpl;
+import org.apache.carbondata.processing.loading.steps.InputProcessorStepForPartitionImpl;
import org.apache.carbondata.processing.loading.steps.InputProcessorStepImpl;
import org.apache.carbondata.processing.loading.steps.SortProcessorStepImpl;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
@@ -62,6 +63,8 @@ public final class DataLoadProcessBuilder {
return buildInternalForBucketing(inputIterators, configuration);
} else if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) {
return buildInternalForBatchSort(inputIterators, configuration);
+ } else if (loadModel.isPartitionLoad()) {
+ return buildInternalForPartitionLoad(inputIterators, configuration, sortScope);
} else {
return buildInternal(inputIterators, configuration);
}
@@ -96,6 +99,32 @@ public final class DataLoadProcessBuilder {
return new CarbonRowDataWriterProcessorStepImpl(configuration, converterProcessorStep);
}
+ /**
+ * Build pipe line for partition load
+ */
+ private AbstractDataLoadProcessorStep buildInternalForPartitionLoad(
+ CarbonIterator[] inputIterators, CarbonDataLoadConfiguration configuration,
+ SortScopeOptions.SortScope sortScope) {
+ // Wraps with dummy processor.
+ AbstractDataLoadProcessorStep inputProcessorStep =
+ new InputProcessorStepForPartitionImpl(configuration, inputIterators);
+ if (sortScope.equals(SortScopeOptions.SortScope.LOCAL_SORT)) {
+ AbstractDataLoadProcessorStep sortProcessorStep =
+ new SortProcessorStepImpl(configuration, inputProcessorStep);
+ // Writes the sorted data in carbondata format.
+ return new DataWriterProcessorStepImpl(configuration, sortProcessorStep);
+ } else if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) {
+ // Sorts the data by SortColumn or not
+ AbstractDataLoadProcessorStep sortProcessorStep =
+ new SortProcessorStepImpl(configuration, inputProcessorStep);
+ // Writes the sorted data in carbondata format.
+ return new DataWriterBatchProcessorStepImpl(configuration, sortProcessorStep);
+ } else {
+ // In all other cases like global sort and no sort uses this step
+ return new CarbonRowDataWriterProcessorStepImpl(configuration, inputProcessorStep);
+ }
+ }
+
private AbstractDataLoadProcessorStep buildInternalForBatchSort(CarbonIterator[] inputIterators,
CarbonDataLoadConfiguration configuration) {
// 1. Reads the data input iterators and parses the data.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
index 2d70f03..85eb19b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
@@ -47,6 +47,8 @@ public class MeasureFieldConverterImpl implements FieldConverter {
private boolean isEmptyBadRecord;
+ private DataField dataField;
+
public MeasureFieldConverterImpl(DataField dataField, String nullformat, int index,
boolean isEmptyBadRecord) {
this.dataType = dataField.getColumn().getDataType();
@@ -54,6 +56,7 @@ public class MeasureFieldConverterImpl implements FieldConverter {
this.nullformat = nullformat;
this.index = index;
this.isEmptyBadRecord = isEmptyBadRecord;
+ this.dataField = dataField;
}
@Override
@@ -85,7 +88,11 @@ public class MeasureFieldConverterImpl implements FieldConverter {
row.update(null, index);
} else {
try {
- output = DataTypeUtil.getMeasureValueBasedOnDataType(value, dataType, measure);
+ if (dataField.isUseActualData()) {
+ output = DataTypeUtil.getConvertedMeasureValueBasedOnDataType(value, dataType, measure);
+ } else {
+ output = DataTypeUtil.getMeasureValueBasedOnDataType(value, dataType, measure);
+ }
row.update(output, index);
} catch (NumberFormatException e) {
LOGGER.warn(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
index ced37dd..9cf7fe4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
@@ -68,14 +68,25 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter {
dateFormat = dataField.getTimestampFormat();
}
try {
- byte[] value = DataTypeUtil
- .getBytesBasedOnDataTypeForNoDictionaryColumn(dimensionValue, dataType, dateFormat);
- if (dataType == DataTypes.STRING
- && value.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
- throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed "
- + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes");
+ if (!dataField.isUseActualData()) {
+ byte[] value = DataTypeUtil
+ .getBytesBasedOnDataTypeForNoDictionaryColumn(dimensionValue, dataType, dateFormat);
+ if (dataType == DataTypes.STRING
+ && value.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
+ throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed "
+ + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes");
+ }
+ row.update(value, index);
+ } else {
+ Object value = DataTypeUtil
+ .getDataDataTypeForNoDictionaryColumn(dimensionValue, dataType, dateFormat);
+ if (dataType == DataTypes.STRING
+ && value.toString().length() > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
+ throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed "
+ + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes");
+ }
+ row.update(value, index);
}
- row.update(value, index);
} catch (CarbonDataLoadingException e) {
throw e;
} catch (Throwable ex) {
@@ -99,7 +110,9 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter {
}
private void updateWithNullValue(CarbonRow row) {
- if (dataType == DataTypes.STRING) {
+ if (dataField.isUseActualData()) {
+ row.update(null, index);
+ } else if (dataType == DataTypes.STRING) {
row.update(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, index);
} else {
row.update(CarbonCommonConstants.EMPTY_BYTE_ARRAY, index);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
index 66943c8..9229598 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
@@ -29,7 +29,7 @@ import org.apache.commons.logging.LogFactory;
* It is wrapper class to hold the rows in batches when record writer writes the data and allows
* to iterate on it during data load. It uses blocking queue to coordinate between read and write.
*/
-public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> {
+public class CarbonOutputIteratorWrapper extends CarbonIterator<Object[]> {
private static final Log LOG = LogFactory.getLog(CarbonOutputIteratorWrapper.class);
@@ -46,7 +46,7 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> {
private ArrayBlockingQueue<RowBatch> queue = new ArrayBlockingQueue<>(10);
- public void write(String[] row) throws InterruptedException {
+ public void write(Object[] row) throws InterruptedException {
if (!loadBatch.addRow(row)) {
loadBatch.readyRead();
queue.put(loadBatch);
@@ -78,7 +78,7 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> {
}
@Override
- public String[] next() {
+ public Object[] next() {
return readBatch.next();
}
@@ -100,16 +100,16 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> {
}
}
- private static class RowBatch extends CarbonIterator<String[]> {
+ private static class RowBatch extends CarbonIterator<Object[]> {
private int counter;
- private String[][] batch;
+ private Object[][] batch;
private int size;
private RowBatch(int size) {
- batch = new String[size][];
+ batch = new Object[size][];
this.size = size;
}
@@ -118,7 +118,7 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> {
* @param row
* @return false if the row cannot be added as batch is full.
*/
- public boolean addRow(String[] row) {
+ public boolean addRow(Object[] row) {
batch[counter++] = row;
return counter < size;
}
@@ -134,7 +134,7 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> {
}
@Override
- public String[] next() {
+ public Object[] next() {
assert (counter < size);
return batch[counter++];
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index d41455f..4c536ea 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -188,6 +188,11 @@ public class CarbonLoadModel implements Serializable {
private boolean isAggLoadRequest;
+ /**
+ * It directly writes data directly to nosort processor bypassing all other processors.
+ */
+ private boolean isPartitionLoad;
+
public boolean isAggLoadRequest() {
return isAggLoadRequest;
}
@@ -401,6 +406,7 @@ public class CarbonLoadModel implements Serializable {
copy.batchSortSizeInMb = batchSortSizeInMb;
copy.badRecordsLocation = badRecordsLocation;
copy.isAggLoadRequest = isAggLoadRequest;
+ copy.isPartitionLoad = isPartitionLoad;
return copy;
}
@@ -454,6 +460,7 @@ public class CarbonLoadModel implements Serializable {
copy.batchSortSizeInMb = batchSortSizeInMb;
copy.isAggLoadRequest = isAggLoadRequest;
copy.badRecordsLocation = badRecordsLocation;
+ copy.isPartitionLoad = isPartitionLoad;
return copy;
}
@@ -855,4 +862,12 @@ public class CarbonLoadModel implements Serializable {
public void setSkipEmptyLine(String skipEmptyLine) {
this.skipEmptyLine = skipEmptyLine;
}
+
+ public boolean isPartitionLoad() {
+ return isPartitionLoad;
+ }
+
+ public void setPartitionLoad(boolean partitionLoad) {
+ isPartitionLoad = partitionLoad;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java
new file mode 100644
index 0000000..1dc9b27
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.steps;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It reads data from record reader and sends data to next step.
+ */
+public class InputProcessorStepForPartitionImpl extends AbstractDataLoadProcessorStep {
+
+ private CarbonIterator<Object[]>[] inputIterators;
+
+ private boolean[] noDictionaryMapping;
+
+ private DataType[] dataTypes;
+
+ private int[] orderOfData;
+
+ public InputProcessorStepForPartitionImpl(CarbonDataLoadConfiguration configuration,
+ CarbonIterator<Object[]>[] inputIterators) {
+ super(configuration, null);
+ this.inputIterators = inputIterators;
+ }
+
+ @Override public DataField[] getOutput() {
+ return configuration.getDataFields();
+ }
+
+ @Override public void initialize() throws IOException {
+ super.initialize();
+ // if logger is enabled then raw data will be required.
+ RowConverterImpl rowConverter =
+ new RowConverterImpl(configuration.getDataFields(), configuration, null);
+ rowConverter.initialize();
+ configuration.setCardinalityFinder(rowConverter);
+ noDictionaryMapping =
+ CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields());
+ dataTypes = new DataType[configuration.getDataFields().length];
+ for (int i = 0; i < dataTypes.length; i++) {
+ if (configuration.getDataFields()[i].getColumn().hasEncoding(Encoding.DICTIONARY)) {
+ dataTypes[i] = DataTypes.INT;
+ } else {
+ dataTypes[i] = configuration.getDataFields()[i].getColumn().getDataType();
+ }
+ }
+ orderOfData = arrangeData(configuration.getDataFields(), configuration.getHeader());
+ }
+
+ private int[] arrangeData(DataField[] dataFields, String[] header) {
+ int[] data = new int[dataFields.length];
+ for (int i = 0; i < dataFields.length; i++) {
+ for (int j = 0; j < header.length; j++) {
+ if (dataFields[i].getColumn().getColName().equalsIgnoreCase(header[j])) {
+ data[i] = j;
+ break;
+ }
+ }
+ }
+ return data;
+ }
+
+ @Override public Iterator<CarbonRowBatch>[] execute() {
+ int batchSize = CarbonProperties.getInstance().getBatchSize();
+ List<CarbonIterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
+ Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
+ for (int i = 0; i < outIterators.length; i++) {
+ outIterators[i] =
+ new InputProcessorIterator(readerIterators[i], batchSize, configuration.isPreFetch(),
+ rowCounter, orderOfData, noDictionaryMapping, dataTypes);
+ }
+ return outIterators;
+ }
+
+ /**
+ * Partition input iterators equally as per the number of threads.
+ *
+ * @return
+ */
+ private List<CarbonIterator<Object[]>>[] partitionInputReaderIterators() {
+ // Get the number of cores configured in property.
+ int numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
+ // Get the minimum of number of cores and iterators size to get the number of parallel threads
+ // to be launched.
+ int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores);
+
+ List<CarbonIterator<Object[]>>[] iterators = new List[parallelThreadNumber];
+ for (int i = 0; i < parallelThreadNumber; i++) {
+ iterators[i] = new ArrayList<>();
+ }
+ // Equally partition the iterators as per number of threads
+ for (int i = 0; i < inputIterators.length; i++) {
+ iterators[i % parallelThreadNumber].add(inputIterators[i]);
+ }
+ return iterators;
+ }
+
+ @Override protected CarbonRow processRow(CarbonRow row) {
+ return null;
+ }
+
+ @Override public void close() {
+ if (!closed) {
+ super.close();
+ for (CarbonIterator inputIterator : inputIterators) {
+ inputIterator.close();
+ }
+ }
+ }
+
+ @Override protected String getStepName() {
+ return "Input Processor";
+ }
+
+ /**
+ * This iterator wraps the list of iterators and it starts iterating the each
+ * iterator of the list one by one. It also parse the data while iterating it.
+ */
+ private static class InputProcessorIterator extends CarbonIterator<CarbonRowBatch> {
+
+ private List<CarbonIterator<Object[]>> inputIterators;
+
+ private CarbonIterator<Object[]> currentIterator;
+
+ private int counter;
+
+ private int batchSize;
+
+ private boolean nextBatch;
+
+ private boolean firstTime;
+
+ private AtomicLong rowCounter;
+
+ private boolean[] noDictionaryMapping;
+
+ private DataType[] dataTypes;
+
+ private int[] orderOfData;
+
+ public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators, int batchSize,
+ boolean preFetch, AtomicLong rowCounter, int[] orderOfData, boolean[] noDictionaryMapping,
+ DataType[] dataTypes) {
+ this.inputIterators = inputIterators;
+ this.batchSize = batchSize;
+ this.counter = 0;
+ // Get the first iterator from the list.
+ currentIterator = inputIterators.get(counter++);
+ this.rowCounter = rowCounter;
+ this.nextBatch = false;
+ this.firstTime = true;
+ this.noDictionaryMapping = noDictionaryMapping;
+ this.dataTypes = dataTypes;
+ this.orderOfData = orderOfData;
+ }
+
+ @Override public boolean hasNext() {
+ return nextBatch || internalHasNext();
+ }
+
+ private boolean internalHasNext() {
+ if (firstTime) {
+ firstTime = false;
+ currentIterator.initialize();
+ }
+ boolean hasNext = currentIterator.hasNext();
+ // If iterator is finished then check for next iterator.
+ if (!hasNext) {
+ currentIterator.close();
+ // Check next iterator is available in the list.
+ if (counter < inputIterators.size()) {
+ // Get the next iterator from the list.
+ currentIterator = inputIterators.get(counter++);
+ // Initialize the new iterator
+ currentIterator.initialize();
+ hasNext = internalHasNext();
+ }
+ }
+ return hasNext;
+ }
+
+ @Override public CarbonRowBatch next() {
+ return getBatch();
+ }
+
+ private CarbonRowBatch getBatch() {
+ // Create batch and fill it.
+ CarbonRowBatch carbonRowBatch = new CarbonRowBatch(batchSize);
+ int count = 0;
+ while (internalHasNext() && count < batchSize) {
+ carbonRowBatch.addRow(new CarbonRow(convertToNoDictionaryToBytes(currentIterator.next())));
+ count++;
+ }
+ rowCounter.getAndAdd(carbonRowBatch.getSize());
+ return carbonRowBatch;
+ }
+
+ private Object[] convertToNoDictionaryToBytes(Object[] data) {
+ Object[] newData = new Object[data.length];
+ for (int i = 0; i < noDictionaryMapping.length; i++) {
+ if (noDictionaryMapping[i]) {
+ newData[i] = DataTypeUtil
+ .getBytesDataDataTypeForNoDictionaryColumn(data[orderOfData[i]], dataTypes[i]);
+ } else {
+ newData[i] = data[orderOfData[i]];
+ }
+ }
+ if (newData.length > noDictionaryMapping.length) {
+ for (int i = noDictionaryMapping.length; i < newData.length; i++) {
+ newData[i] = data[orderOfData[i]];
+ }
+ }
+ // System.out.println(Arrays.toString(data));
+ return newData;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 06522a4..b795696 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -558,6 +558,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
@Override public Void call() throws Exception {
try {
TablePage tablePage = processDataRows(dataRows);
+ dataRows = null;
tablePage.setIsLastPage(isLastPage);
// insert the object in array according to sequence number
int indexInNodeHolderArray = (pageId - 1) % numberOfCores;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 2a4cc00..376a546 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -482,22 +482,19 @@ public final class CarbonDataProcessorUtil {
/**
* Get the number of partitions in global sort
- * @param configuration
+ * @param globalSortPartitions
* @return the number of partitions
*/
- public static int getGlobalSortPartitions(CarbonDataLoadConfiguration configuration) {
+ public static int getGlobalSortPartitions(Object globalSortPartitions) {
int numPartitions;
try {
// First try to get the number from ddl, otherwise get it from carbon properties.
- if (configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS)
- == null) {
+ if (globalSortPartitions == null) {
numPartitions = Integer.parseInt(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT));
} else {
- numPartitions = Integer.parseInt(
- configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS)
- .toString());
+ numPartitions = Integer.parseInt(globalSortPartitions.toString());
}
} catch (Exception e) {
numPartitions = 0;