You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/02/27 08:19:08 UTC
[11/16] carbondata git commit: [CARBONDATA-2187][PARTITION] Partition
restructure for new folder structure and supporting partition location
feature
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index fc7c13a..22dab27 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -28,6 +28,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.exception.InvalidConfigurationException
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.util.CarbonUtil
@@ -98,6 +99,18 @@ case class CarbonCreateTableCommand(
val partitionString =
if (partitionInfo != null &&
partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
+ // Restrict dictionary encoding on partition columns.
+ // TODO Need to decide wherher it is required
+ val dictionaryOnPartitionColumn =
+ partitionInfo.getColumnSchemaList.asScala.exists{p =>
+ p.hasEncoding(Encoding.DICTIONARY) && !p.hasEncoding(Encoding.DIRECT_DICTIONARY)
+ }
+ if (dictionaryOnPartitionColumn) {
+ throwMetadataException(
+ dbName,
+ tableName,
+ s"Dictionary include cannot be applied on partition columns")
+ }
s" PARTITIONED BY (${partitionInfo.getColumnSchemaList.asScala.map(
_.getColumnName).mkString(",")})"
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index d2c691b..bff65be 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.datasources
import java.io.File
-import java.text.SimpleDateFormat
import java.util
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
@@ -39,16 +38,18 @@ import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types._
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
-import org.apache.carbondata.core.metadata.PartitionMapFileStore
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore
+import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeConverterImpl, DataTypeUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat}
import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
-import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, Util}
@@ -66,6 +67,10 @@ with Serializable {
None
}
+ SparkSession.getActiveSession.get.sessionState.conf.setConfString(
+ "spark.sql.sources.commitProtocolClass",
+ "org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol")
+
override def prepareWrite(
sparkSession: SparkSession,
job: Job,
@@ -77,9 +82,6 @@ with Serializable {
classOf[CarbonOutputCommitter],
classOf[CarbonOutputCommitter])
conf.set("carbon.commit.protocol", "carbon.commit.protocol")
- sparkSession.sessionState.conf.setConfString(
- "spark.sql.sources.commitProtocolClass",
- "org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol")
job.setOutputFormatClass(classOf[CarbonTableOutputFormat])
val table = CarbonEnv.getCarbonTable(
TableIdentifier(options("tableName"), options.get("dbName")))(sparkSession)
@@ -114,13 +116,7 @@ with Serializable {
model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)
CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
model.setPartitionLoad(true)
- // Set the update timestamp if user sets in case of update query. It needs to be updated
- // in load status update time
- val updateTimeStamp = options.get("updatetimestamp")
- if (updateTimeStamp.isDefined) {
- conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp.get)
- model.setFactTimeStamp(updateTimeStamp.get.toLong)
- }
+
val staticPartition = options.getOrElse("staticpartition", null)
if (staticPartition != null) {
conf.set("carbon.staticpartition", staticPartition)
@@ -131,6 +127,30 @@ with Serializable {
if (segemntsTobeDeleted.isDefined) {
conf.set(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, segemntsTobeDeleted.get)
}
+
+ val currPartition = options.getOrElse("currentpartition", null)
+ if (currPartition != null) {
+ conf.set("carbon.currentpartition", currPartition)
+ }
+ // Update with the current in progress load.
+ val currEntry = options.getOrElse("currentloadentry", null)
+ if (currEntry != null) {
+ val loadEntry =
+ ObjectSerializationUtil.convertStringToObject(currEntry).asInstanceOf[LoadMetadataDetails]
+ val details =
+ SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(model.getTablePath))
+ model.setSegmentId(loadEntry.getLoadName)
+ model.setFactTimeStamp(loadEntry.getLoadStartTime)
+ val list = new util.ArrayList[LoadMetadataDetails](details.toList.asJava)
+ list.add(loadEntry)
+ model.setLoadMetadataDetails(list)
+ }
+ // Set the update timestamp if user sets in case of update query. It needs to be updated
+ // in load status update time
+ val updateTimeStamp = options.get("updatetimestamp")
+ if (updateTimeStamp.isDefined) {
+ conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp.get)
+ }
CarbonTableOutputFormat.setLoadModel(conf, model)
new OutputWriterFactory {
@@ -146,13 +166,14 @@ with Serializable {
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
+ val model = CarbonTableOutputFormat.getLoadModel(context.getConfiguration)
val isCarbonUseMultiDir = CarbonProperties.getInstance().isUseMultiTempDir
var storeLocation: Array[String] = Array[String]()
val isCarbonUseLocalDir = CarbonProperties.getInstance()
.getProperty("carbon.use.local.dir", "false").equalsIgnoreCase("true")
- val taskNumber = generateTaskNumber(path, context)
+ val taskNumber = generateTaskNumber(path, context, model.getSegmentId)
val tmpLocationSuffix =
File.separator + "carbon" + System.nanoTime() + File.separator + taskNumber
if (isCarbonUseLocalDir) {
@@ -174,7 +195,7 @@ with Serializable {
storeLocation :+ (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
}
CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, storeLocation)
- new CarbonOutputWriter(path, context, dataSchema.map(_.dataType), taskNumber)
+ new CarbonOutputWriter(path, context, dataSchema.map(_.dataType), taskNumber, model)
}
/**
@@ -182,7 +203,7 @@ with Serializable {
* of partition tables.
*/
private def generateTaskNumber(path: String,
- context: TaskAttemptContext): String = {
+ context: TaskAttemptContext, segmentId: String): String = {
var partitionNumber: java.lang.Long = taskIdMap.get(path)
if (partitionNumber == null) {
partitionNumber = counter.incrementAndGet()
@@ -190,8 +211,7 @@ with Serializable {
taskIdMap.put(path, partitionNumber)
}
val taskID = context.getTaskAttemptID.getTaskID.getId
- String.valueOf(Math.pow(10, 5).toInt + taskID) +
- String.valueOf(partitionNumber + Math.pow(10, 5).toInt)
+ CarbonScalaUtil.generateUniqueNumber(taskID, segmentId, partitionNumber)
}
override def getFileExtension(context: TaskAttemptContext): String = {
@@ -233,9 +253,12 @@ private trait AbstractCarbonOutputWriter {
private class CarbonOutputWriter(path: String,
context: TaskAttemptContext,
fieldTypes: Seq[DataType],
- taskNo : String)
+ taskNo : String,
+ model: CarbonLoadModel)
extends OutputWriter with AbstractCarbonOutputWriter {
- val model = CarbonTableOutputFormat.getLoadModel(context.getConfiguration)
+
+ val converter = new DataTypeConverterImpl
+
val partitions =
getPartitionsFromPath(path, context, model).map(ExternalCatalogUtils.unescapePathName)
val staticPartition: util.HashMap[String, Boolean] = {
@@ -247,47 +270,52 @@ private class CarbonOutputWriter(path: String,
null
}
}
- lazy val (updatedPartitions, partitionData) = if (partitions.nonEmpty) {
- val updatedPartitions = partitions.map{ p =>
- val value = p.substring(p.indexOf("=") + 1, p.length)
- val col = p.substring(0, p.indexOf("="))
- // NUll handling case. For null hive creates with this special name
- if (value.equals("__HIVE_DEFAULT_PARTITION__")) {
- (col, null)
- // we should replace back the special string with empty value.
- } else if (value.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
- (col, "")
- } else {
- (col, value)
- }
- }
-
- if (staticPartition != null) {
- val loadModel = recordWriter.getLoadModel
- val table = loadModel.getCarbonDataLoadSchema.getCarbonTable
- var timeStampformatString = loadModel.getTimestampformat
- if (timeStampformatString.isEmpty) {
- timeStampformatString = loadModel.getDefaultTimestampFormat
- }
- val timeFormat = new SimpleDateFormat(timeStampformatString)
- var dateFormatString = loadModel.getDateFormat
- if (dateFormatString.isEmpty) {
- dateFormatString = loadModel.getDefaultDateFormat
- }
- val dateFormat = new SimpleDateFormat(dateFormatString)
- val formattedPartitions = updatedPartitions.map {case (col, value) =>
- // Only convert the static partitions to the carbon format and use it while loading data
- // to carbon.
- (col, value)
- }
- (formattedPartitions, updatePartitions(formattedPartitions.map(_._2)))
+ lazy val currPartitions: util.List[indexstore.PartitionSpec] = {
+ val currParts = context.getConfiguration.get("carbon.currentpartition")
+ if (currParts != null) {
+ ObjectSerializationUtil.convertStringToObject(
+ currParts).asInstanceOf[util.List[indexstore.PartitionSpec]]
} else {
- (updatedPartitions, updatePartitions(updatedPartitions.map(_._2)))
+ new util.ArrayList[indexstore.PartitionSpec]()
}
+ }
+ var (updatedPartitions, partitionData) = if (partitions.nonEmpty) {
+ val updatedPartitions = partitions.map(splitPartition)
+ (updatedPartitions, updatePartitions(updatedPartitions.map(_._2)))
} else {
(Map.empty[String, String].toArray, Array.empty)
}
+ private def splitPartition(p: String) = {
+ val value = p.substring(p.indexOf("=") + 1, p.length)
+ val col = p.substring(0, p.indexOf("="))
+ // NUll handling case. For null hive creates with this special name
+ if (value.equals("__HIVE_DEFAULT_PARTITION__")) {
+ (col, null)
+ // we should replace back the special string with empty value.
+ } else if (value.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
+ (col, "")
+ } else {
+ (col, value)
+ }
+ }
+
+ lazy val writePath = {
+ val updatedPath = getPartitionPath(path, context, model)
+ // in case of partition location specified by user then search the partitions from the current
+ // partitions to get the corresponding partitions.
+ if (partitions.isEmpty) {
+ val writeSpec = new indexstore.PartitionSpec(null, updatedPath)
+ val index = currPartitions.indexOf(writeSpec)
+ if (index > -1) {
+ val spec = currPartitions.get(index)
+ updatedPartitions = spec.getPartitions.asScala.map(splitPartition).toArray
+ partitionData = updatePartitions(updatedPartitions.map(_._2))
+ }
+ }
+ updatedPath
+ }
+
val writable = new ObjectArrayWritable
private def updatePartitions(partitionData: Seq[String]): Array[AnyRef] = {
@@ -302,21 +330,30 @@ private class CarbonOutputWriter(path: String,
} else {
col.getDataType
}
- if (staticPartition != null) {
- DataTypeUtil.getDataBasedOnDataType(
+ if (staticPartition != null && staticPartition.get(col.getColumnName.toLowerCase)) {
+ val converetedVal =
CarbonScalaUtil.convertStaticPartitions(
partitionData(index),
col,
- model.getCarbonDataLoadSchema.getCarbonTable),
- dataType)
+ model.getCarbonDataLoadSchema.getCarbonTable)
+ if (col.hasEncoding(Encoding.DICTIONARY)) {
+ converetedVal.toInt.asInstanceOf[AnyRef]
+ } else {
+ DataTypeUtil.getDataBasedOnDataType(
+ converetedVal,
+ dataType,
+ converter)
+ }
} else {
- DataTypeUtil.getDataBasedOnDataType(partitionData(index), dataType)
+ DataTypeUtil.getDataBasedOnDataType(partitionData(index), dataType, converter)
}
}.toArray
}
private val recordWriter: CarbonRecordWriter = {
context.getConfiguration.set("carbon.outputformat.taskno", taskNo)
+ context.getConfiguration.set("carbon.outputformat.writepath",
+ writePath + "/" + model.getSegmentId + "_" + model.getFactTimeStamp + ".tmp")
new CarbonTableOutputFormat() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
new Path(path)
@@ -355,51 +392,54 @@ private class CarbonOutputWriter(path: String,
override def close(): Unit = {
recordWriter.close(context)
- val loadModel = recordWriter.getLoadModel
- val segmentPath = CarbonTablePath.getSegmentPath(loadModel.getTablePath, loadModel.getSegmentId)
- val table = loadModel.getCarbonDataLoadSchema.getCarbonTable
- var timeStampformatString = loadModel.getTimestampformat
- if (timeStampformatString.isEmpty) {
- timeStampformatString = loadModel.getDefaultTimestampFormat
- }
- val timeFormat = new SimpleDateFormat(timeStampformatString)
- var dateFormatString = loadModel.getDateFormat
- if (dateFormatString.isEmpty) {
- dateFormatString = loadModel.getDefaultDateFormat
- }
- val dateFormat = new SimpleDateFormat(dateFormatString)
- val serializeFormat =
- loadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
- val badRecordAction = loadModel.getBadRecordsAction.split(",")(1)
- val isEmptyBadRecord = loadModel.getIsEmptyDataBadRecord.split(",")(1).toBoolean
// write partition info to new file.
val partitonList = new util.ArrayList[String]()
val formattedPartitions =
// All dynamic partitions need to be converted to proper format
CarbonScalaUtil.updatePartitions(
updatedPartitions.toMap,
- table,
- timeFormat,
- dateFormat)
+ model.getCarbonDataLoadSchema.getCarbonTable)
formattedPartitions.foreach(p => partitonList.add(p._1 + "=" + p._2))
- new PartitionMapFileStore().writePartitionMapFile(
- segmentPath,
- loadModel.getTaskNo,
+ SegmentFileStore.writeSegmentFile(
+ model.getTablePath,
+ taskNo,
+ writePath,
+ model.getSegmentId + "_" + model.getFactTimeStamp + "",
partitonList)
}
+ def getPartitionPath(path: String,
+ attemptContext: TaskAttemptContext,
+ model: CarbonLoadModel): String = {
+ if (updatedPartitions.nonEmpty) {
+ val formattedPartitions =
+ // All dynamic partitions need to be converted to proper format
+ CarbonScalaUtil.updatePartitions(
+ updatedPartitions.toMap,
+ model.getCarbonDataLoadSchema.getCarbonTable)
+ val partitionstr = formattedPartitions.map{p =>
+ ExternalCatalogUtils.escapePathName(p._1) + "=" + ExternalCatalogUtils.escapePathName(p._2)
+ }.mkString(CarbonCommonConstants.FILE_SEPARATOR)
+ model.getCarbonDataLoadSchema.getCarbonTable.getTablePath +
+ CarbonCommonConstants.FILE_SEPARATOR + partitionstr
+ } else {
+ var updatedPath = FileFactory.getUpdatedFilePath(path)
+ updatedPath.substring(0, updatedPath.lastIndexOf("/"))
+ }
+ }
+
def getPartitionsFromPath(
path: String,
attemptContext: TaskAttemptContext,
model: CarbonLoadModel): Array[String] = {
var attemptId = attemptContext.getTaskAttemptID.toString + "/"
- if (path.indexOf(attemptId) <= 0) {
-
- attemptId = model.getTableName + "/"
- }
- val str = path.substring(path.indexOf(attemptId) + attemptId.length, path.lastIndexOf("/"))
- if (str.length > 0) {
- str.split("/")
+ if (path.indexOf(attemptId) > -1) {
+ val str = path.substring(path.indexOf(attemptId) + attemptId.length, path.lastIndexOf("/"))
+ if (str.length > 0) {
+ str.split("/")
+ } else {
+ Array.empty
+ }
} else {
Array.empty
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 544c494..48679b1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast}
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.schema.BucketingInfo
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
@@ -143,10 +144,10 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
projects: Seq[NamedExpression],
filterPredicates: Seq[Expression],
scanBuilder: (Seq[Attribute], Array[Filter],
- ArrayBuffer[AttributeReference], Seq[String]) => RDD[InternalRow]) = {
+ ArrayBuffer[AttributeReference], Seq[PartitionSpec]) => RDD[InternalRow]) = {
val names = relation.catalogTable.get.partitionColumnNames
// Get the current partitions from table.
- var partitions: Seq[String] = null
+ var partitions: Seq[PartitionSpec] = null
if (names.nonEmpty) {
val partitionSet = AttributeSet(names
.map(p => relation.output.find(_.name.equalsIgnoreCase(p)).get))
@@ -167,7 +168,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
CarbonFilters.getPartitions(
updatedPartitionFilters.toSeq,
SparkSession.getActiveSession.get,
- relation.catalogTable.get.identifier)
+ relation.catalogTable.get.identifier).orNull
}
pruneFilterProjectRaw(
relation,
@@ -199,9 +200,9 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
relation: LogicalRelation,
rawProjects: Seq[NamedExpression],
filterPredicates: Seq[Expression],
- partitions: Seq[String],
+ partitions: Seq[PartitionSpec],
scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter],
- ArrayBuffer[AttributeReference], Seq[String]) => RDD[InternalRow]) = {
+ ArrayBuffer[AttributeReference], Seq[PartitionSpec]) => RDD[InternalRow]) = {
val projects = rawProjects.map {p =>
p.transform {
case CustomDeterministicExpression(exp) => exp
@@ -350,9 +351,9 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
private def getDataSourceScan(relation: LogicalRelation,
output: Seq[Attribute],
- partitions: Seq[String],
+ partitions: Seq[PartitionSpec],
scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter],
- ArrayBuffer[AttributeReference], Seq[String]) => RDD[InternalRow],
+ ArrayBuffer[AttributeReference], Seq[PartitionSpec]) => RDD[InternalRow],
candidatePredicates: Seq[Expression],
pushedFilters: Seq[Filter],
metadata: Map[String, String],
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 0178716..f69ccc1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonInsertIntoCommand, CarbonLoadDataCommand, RefreshCarbonTableCommand}
-import org.apache.spark.sql.execution.command.partition.{CarbonAlterTableDropHivePartitionCommand, CarbonShowCarbonPartitionsCommand}
+import org.apache.spark.sql.execution.command.partition.{CarbonAlterTableAddHivePartitionCommand, CarbonAlterTableDropHivePartitionCommand, CarbonShowCarbonPartitionsCommand}
import org.apache.spark.sql.execution.command.schema._
import org.apache.spark.sql.execution.command.table.{CarbonDescribeFormattedCommand, CarbonDropTableCommand}
import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
@@ -247,16 +247,19 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
} else {
ExecutedCommandExec(rename) :: Nil
}
- case addPartition@AlterTableAddPartitionCommand(tableName, partitionSpecsAndLocs, _) =>
+ case addPart@AlterTableAddPartitionCommand(tableName, partitionSpecsAndLocs, ifNotExists) =>
val dbOption = tableName.database.map(_.toLowerCase)
val tableIdentifier = TableIdentifier(tableName.table.toLowerCase(), dbOption)
val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
.tableExists(tableIdentifier)(sparkSession)
- if (isCarbonTable && partitionSpecsAndLocs.exists(_._2.isDefined)) {
- throw new UnsupportedOperationException(
- "add partition with location is not supported")
+ if (isCarbonTable) {
+ ExecutedCommandExec(
+ CarbonAlterTableAddHivePartitionCommand(
+ tableName,
+ partitionSpecsAndLocs,
+ ifNotExists)) :: Nil
} else {
- ExecutedCommandExec(addPartition) :: Nil
+ ExecutedCommandExec(addPart) :: Nil
}
case RefreshTable(tableIdentifier) =>
RefreshCarbonTableCommand(tableIdentifier.database,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index b8608f4..7bf8536 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -29,11 +29,14 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Stati
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CarbonException
+import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
/**
@@ -209,9 +212,10 @@ case class CarbonRelation(
.getValidAndInvalidSegments.getValidSegments.isEmpty) {
sizeInBytesLocalValue = 0L
} else {
- val tablePath = CarbonStorePath.getCarbonTablePath(
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(
carbonTable.getTablePath,
- carbonTable.getCarbonTableIdentifier).getPath
+ carbonTable.getCarbonTableIdentifier)
+ val tablePath = carbonTablePath.getPath
val fileType = FileFactory.getFileType(tablePath)
if (FileFactory.isFileExist(tablePath, fileType)) {
// get the valid segments
@@ -220,8 +224,14 @@ case class CarbonRelation(
var size = 0L
// for each segment calculate the size
segments.foreach {validSeg =>
- size = size + FileFactory.getDirectorySize(
- CarbonTablePath.getSegmentPath(tablePath, validSeg))
+ if (validSeg.getSegmentFileName != null) {
+ val fileStore = new SegmentFileStore(tablePath, validSeg.getSegmentFileName)
+ size = size + CarbonUtil.getSizeOfSegment(
+ carbonTablePath, new Segment(validSeg.getSegmentNo, validSeg.getSegmentFileName))
+ } else {
+ size = size + FileFactory.getDirectorySize(
+ CarbonTablePath.getSegmentPath(tablePath, validSeg.getSegmentNo))
+ }
}
// update the new table status time
tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index c7767ce..e5eb53c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.optimizer
+import java.util
+
import scala.collection.JavaConverters._
import org.apache.spark.sql._
@@ -30,14 +32,15 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.CarbonSessionCatalog
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.PartitionMapFileStore
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
import org.apache.carbondata.core.scan.expression.conditional._
import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.spark.CarbonAliasDecoderRelation
import org.apache.carbondata.spark.util.CarbonScalaUtil
@@ -432,15 +435,11 @@ object CarbonFilters {
}
def getCurrentPartitions(sparkSession: SparkSession,
- carbonTable: CarbonTable): Seq[String] = {
- if (carbonTable.isHivePartitionTable) {
- CarbonFilters.getPartitions(
- Seq.empty,
- sparkSession,
- TableIdentifier(carbonTable.getTableName, Some(carbonTable.getDatabaseName)))
- } else {
- Seq.empty
- }
+ tableIdentifier: TableIdentifier): Option[Seq[PartitionSpec]] = {
+ CarbonFilters.getPartitions(
+ Seq.empty,
+ sparkSession,
+ tableIdentifier)
}
/**
@@ -452,11 +451,15 @@ object CarbonFilters {
*/
def getPartitions(partitionFilters: Seq[Expression],
sparkSession: SparkSession,
- identifier: TableIdentifier): Seq[String] = {
+ identifier: TableIdentifier): Option[Seq[PartitionSpec]] = {
+ val table = CarbonEnv.getCarbonTable(identifier)(sparkSession)
// first try to read partitions in case if the trigger comes from the aggregation table load.
- val partitionsForAggTable = getPartitionsForAggTable(sparkSession, identifier)
+ val partitionsForAggTable = getPartitionsForAggTable(sparkSession, table)
if (partitionsForAggTable.isDefined) {
- return partitionsForAggTable.get
+ return partitionsForAggTable
+ }
+ if (!table.isHivePartitionTable) {
+ return None
}
val partitions = {
try {
@@ -483,35 +486,33 @@ object CarbonFilters {
identifier)
}
}
- partitions.toList.flatMap { partition =>
- partition.spec.seq.map{case (column, value) => column + "=" + value}
- }.toSet.toSeq
+ Some(partitions.map { partition =>
+ new PartitionSpec(
+ new util.ArrayList[String]( partition.spec.seq.map{case (column, value) =>
+ column + "=" + value}.toList.asJava), partition.location)
+ })
}
/**
* In case of loading aggregate tables it needs to be get only from the main table load in
- * progress segment. So we should read from the partition map file of that segment.
+ * progress segment. So we should read from the segment file of that segment
*/
def getPartitionsForAggTable(sparkSession: SparkSession,
- identifier: TableIdentifier): Option[Seq[String]] = {
+ table: CarbonTable): Option[Seq[PartitionSpec]] = {
// when validate segments is disabled then only read from partitionmap
val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
if (carbonSessionInfo != null) {
val validateSegments = carbonSessionInfo.getSessionParams
.getProperty(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
- CarbonEnv.getDatabaseName(identifier.database)(sparkSession) + "." +
- identifier.table, "true").toBoolean
+ table.getDatabaseName + "." +
+ table.getTableName, "true").toBoolean
if (!validateSegments) {
val segmentNumbersFromProperty = CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
- CarbonEnv.getDatabaseName(identifier.database)(sparkSession)
- + "." + identifier.table)
- val carbonTable = CarbonEnv.getCarbonTable(identifier)(sparkSession)
- val segmentPath =
- CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNumbersFromProperty)
- val partitionMapper = new PartitionMapFileStore()
- partitionMapper.readAllPartitionsOfSegment(segmentPath)
- Some(partitionMapper.getPartitionMap.asScala.map(_._2).flatMap(_.asScala).toSet.toSeq)
+ table.getDatabaseName + "." + table.getTableName)
+ val segment = Segment.toSegment(segmentNumbersFromProperty)
+ val segmentFile = new SegmentFileStore(table.getTablePath, segment.getSegmentFileName)
+ Some(segmentFile.getPartitionSpecs.asScala)
} else {
None
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
index 0b62e10..d45020b 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, Experime
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.util.CarbonScalaUtil
/**
@@ -146,14 +146,8 @@ class CarbonSessionCatalog(
ignoreIfExists: Boolean): Unit = {
try {
val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
- // Get the properties from thread local
- val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
- if (carbonSessionInfo != null) {
- val updatedParts = CarbonScalaUtil.updatePartitions(carbonSessionInfo, parts, table)
- super.createPartitions(tableName, updatedParts, ignoreIfExists)
- } else {
- super.createPartitions(tableName, parts, ignoreIfExists)
- }
+ val updatedParts = CarbonScalaUtil.updatePartitions(parts, table)
+ super.createPartitions(tableName, updatedParts, ignoreIfExists)
} catch {
case e: Exception =>
super.createPartitions(tableName, parts, ignoreIfExists)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
index baadd04..b9425d6 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -46,7 +46,7 @@ import org.apache.spark.util.CarbonReflectionUtils
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.util.CarbonScalaUtil
@@ -144,14 +144,8 @@ class CarbonSessionCatalog(
ignoreIfExists: Boolean): Unit = {
try {
val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
- // Get the properties from thread local
- val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
- if (carbonSessionInfo != null) {
- val updatedParts = CarbonScalaUtil.updatePartitions(carbonSessionInfo, parts, table)
- super.createPartitions(tableName, updatedParts, ignoreIfExists)
- } else {
- super.createPartitions(tableName, parts, ignoreIfExists)
- }
+ val updatedParts = CarbonScalaUtil.updatePartitions(parts, table)
+ super.createPartitions(tableName, updatedParts, ignoreIfExists)
} catch {
case e: Exception =>
super.createPartitions(tableName, parts, ignoreIfExists)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 4b0113c..d739f8c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datamap.DataMapMeta;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.TableDataMap;
import org.apache.carbondata.core.datamap.dev.DataMapFactory;
import org.apache.carbondata.core.datamap.dev.DataMapWriter;
@@ -71,7 +72,7 @@ public class DataMapWriterListener {
}
List<String> columns = factory.getMeta().getIndexedColumns();
List<DataMapWriter> writers = registry.get(columns);
- DataMapWriter writer = factory.createWriter(segmentId);
+ DataMapWriter writer = factory.createWriter(new Segment(segmentId, null));
if (writers != null) {
writers.add(writer);
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index 7b1ab9d..b7270b9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -109,6 +109,11 @@ public class CarbonDataLoadConfiguration {
*/
private short writingCoresCount;
+ /**
+ * Flder path to where data should be written for this load.
+ */
+ private String dataWritePath;
+
public CarbonDataLoadConfiguration() {
}
@@ -363,4 +368,12 @@ public class CarbonDataLoadConfiguration {
public void setWritingCoresCount(short writingCoresCount) {
this.writingCoresCount = writingCoresCount;
}
+
+ public String getDataWritePath() {
+ return dataWritePath;
+ }
+
+ public void setDataWritePath(String dataWritePath) {
+ this.dataWritePath = dataWritePath;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index ba24d41..f5b29e7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -57,7 +57,8 @@ public final class DataLoadProcessBuilder {
CarbonIterator[] inputIterators) throws Exception {
CarbonDataLoadConfiguration configuration = createConfiguration(loadModel, storeLocation);
SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
- if (!configuration.isSortTable() || sortScope.equals(SortScopeOptions.SortScope.NO_SORT)) {
+ if ((!configuration.isSortTable() || sortScope.equals(SortScopeOptions.SortScope.NO_SORT))
+ && !loadModel.isPartitionLoad()) {
return buildInternalForNoSort(inputIterators, configuration);
} else if (configuration.getBucketingInfo() != null) {
return buildInternalForBucketing(inputIterators, configuration);
@@ -251,6 +252,7 @@ public final class DataLoadProcessBuilder {
configuration.setPreFetch(loadModel.isPreFetch());
configuration.setNumberOfSortColumns(carbonTable.getNumberOfSortColumns());
configuration.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns());
+ configuration.setDataWritePath(loadModel.getDataWritePath());
// For partition loading always use single core as it already runs in multiple
// threads per partition
if (carbonTable.isHivePartitionTable()) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
index 85eb19b..6664a2c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
@@ -89,7 +89,7 @@ public class MeasureFieldConverterImpl implements FieldConverter {
} else {
try {
if (dataField.isUseActualData()) {
- output = DataTypeUtil.getConvertedMeasureValueBasedOnDataType(value, dataType, measure);
+ output = DataTypeUtil.getMeasureValueBasedOnDataType(value, dataType, measure, true);
} else {
output = DataTypeUtil.getMeasureValueBasedOnDataType(value, dataType, measure);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 4c536ea..a17178a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -193,6 +193,11 @@ public class CarbonLoadModel implements Serializable {
*/
private boolean isPartitionLoad;
+ /**
+ * Flder path to where data should be written for this load.
+ */
+ private String dataWritePath;
+
public boolean isAggLoadRequest() {
return isAggLoadRequest;
}
@@ -870,4 +875,12 @@ public class CarbonLoadModel implements Serializable {
public void setPartitionLoad(boolean partitionLoad) {
isPartitionLoad = partitionLoad;
}
+
+ public String getDataWritePath() {
+ return dataWritePath;
+ }
+
+ public void setDataWritePath(String dataWritePath) {
+ this.dataWritePath = dataWritePath;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index be27866..3f1430d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -26,6 +26,7 @@ import java.util.*;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -42,6 +43,7 @@ import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl;
@@ -280,7 +282,7 @@ public final class CarbonDataMergerUtil {
*/
public static boolean updateLoadMetadataWithMergeStatus(List<LoadMetadataDetails> loadsToMerge,
String metaDataFilepath, String mergedLoadNumber, CarbonLoadModel carbonLoadModel,
- CompactionType compactionType) throws IOException {
+ CompactionType compactionType, String segmentFile) throws IOException {
boolean tableStatusUpdationStatus = false;
AbsoluteTableIdentifier absoluteTableIdentifier =
carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
@@ -325,6 +327,7 @@ public final class CarbonDataMergerUtil {
loadMetadataDetails.setLoadEndTime(loadEnddate);
CarbonTable carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable();
loadMetadataDetails.setLoadName(mergedLoadNumber);
+ loadMetadataDetails.setSegmentFile(segmentFile);
CarbonLoaderUtil
.addDataIndexSizeIntoMetaEntry(loadMetadataDetails, mergedLoadNumber, carbonTable);
loadMetadataDetails.setLoadStartTime(carbonLoadModel.getFactTimeStamp());
@@ -385,7 +388,7 @@ public final class CarbonDataMergerUtil {
*/
public static List<LoadMetadataDetails> identifySegmentsToBeMerged(
CarbonLoadModel carbonLoadModel, long compactionSize,
- List<LoadMetadataDetails> segments, CompactionType compactionType) {
+ List<LoadMetadataDetails> segments, CompactionType compactionType) throws IOException {
String tablePath = carbonLoadModel.getTablePath();
Map<String, String> tableLevelProperties = carbonLoadModel.getCarbonDataLoadSchema()
.getCarbonTable().getTableInfo().getFactTable().getTableProperties();
@@ -590,13 +593,13 @@ public final class CarbonDataMergerUtil {
*/
private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSize(
long compactionSize, List<LoadMetadataDetails> listOfSegmentsAfterPreserve,
- CarbonLoadModel carbonLoadModel, String tablePath) {
+ CarbonLoadModel carbonLoadModel, String tablePath) throws IOException {
List<LoadMetadataDetails> segmentsToBeMerged =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- CarbonTableIdentifier tableIdentifier =
- carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier();
+ CarbonTable carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable();
+ CarbonTableIdentifier tableIdentifier = carbonTable.getCarbonTableIdentifier();
// total length
@@ -612,8 +615,15 @@ public final class CarbonDataMergerUtil {
String segId = segment.getLoadName();
// variable to store one segment size across partition.
- long sizeOfOneSegmentAcrossPartition =
- getSizeOfSegment(tablePath, tableIdentifier, segId);
+ long sizeOfOneSegmentAcrossPartition;
+ if (segment.getSegmentFile() != null) {
+ CarbonTablePath carbonTablePath = CarbonStorePath
+ .getCarbonTablePath(carbonTable.getTablePath(), carbonTable.getCarbonTableIdentifier());
+ sizeOfOneSegmentAcrossPartition = CarbonUtil
+ .getSizeOfSegment(carbonTablePath, new Segment(segId, segment.getSegmentFile()));
+ } else {
+ sizeOfOneSegmentAcrossPartition = getSizeOfSegment(tablePath, tableIdentifier, segId);
+ }
// if size of a segment is greater than the Major compaction size. then ignore it.
if (sizeOfOneSegmentAcrossPartition > (compactionSize * 1024 * 1024)) {
@@ -863,18 +873,18 @@ public final class CarbonDataMergerUtil {
* @param loadMetadataDetails
* @return
*/
- public static String getValidSegments(List<LoadMetadataDetails> loadMetadataDetails) {
- StringBuilder builder = new StringBuilder();
+ public static List<Segment> getValidSegments(List<LoadMetadataDetails> loadMetadataDetails) {
+ List<Segment> segments = new ArrayList<>();
for (LoadMetadataDetails segment : loadMetadataDetails) {
//check if this load is an already merged load.
if (null != segment.getMergedLoadName()) {
- builder.append(segment.getMergedLoadName()).append(",");
+
+ segments.add(Segment.toSegment(segment.getMergedLoadName()));
} else {
- builder.append(segment.getLoadName()).append(",");
+ segments.add(Segment.toSegment(segment.getLoadName()));
}
}
- builder.deleteCharAt(builder.length() - 1);
- return builder.toString();
+ return segments;
}
/**
@@ -883,7 +893,7 @@ public final class CarbonDataMergerUtil {
* @param absoluteTableIdentifier
* @return
*/
- public static List<String> getValidSegmentList(AbsoluteTableIdentifier absoluteTableIdentifier)
+ public static List<Segment> getValidSegmentList(AbsoluteTableIdentifier absoluteTableIdentifier)
throws IOException {
SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments = null;
@@ -933,7 +943,8 @@ public final class CarbonDataMergerUtil {
int numberUpdateDeltaFilesThreshold =
CarbonProperties.getInstance().getNoUpdateDeltaFilesThresholdForIUDCompaction();
for (LoadMetadataDetails seg : segments) {
- if ((isSegmentValid(seg)) && checkUpdateDeltaFilesInSeg(seg.getLoadName(),
+ if ((isSegmentValid(seg)) && checkUpdateDeltaFilesInSeg(
+ new Segment(seg.getLoadName(), seg.getSegmentFile()),
absoluteTableIdentifier, carbonLoadModel.getSegmentUpdateStatusManager(),
numberUpdateDeltaFilesThreshold)) {
validSegments.add(seg);
@@ -950,12 +961,12 @@ public final class CarbonDataMergerUtil {
/**
* method gets the segments list which get qualified for IUD compaction.
- * @param Segments
+ * @param segments
* @param absoluteTableIdentifier
* @param compactionTypeIUD
* @return
*/
- public static List<String> getSegListIUDCompactionQualified(List<String> Segments,
+ public static List<String> getSegListIUDCompactionQualified(List<Segment> segments,
AbsoluteTableIdentifier absoluteTableIdentifier,
SegmentUpdateStatusManager segmentUpdateStatusManager, CompactionType compactionTypeIUD) {
@@ -964,8 +975,8 @@ public final class CarbonDataMergerUtil {
if (CompactionType.IUD_DELETE_DELTA == compactionTypeIUD) {
int numberDeleteDeltaFilesThreshold =
CarbonProperties.getInstance().getNoDeleteDeltaFilesThresholdForIUDCompaction();
- List<String> deleteSegments = new ArrayList<>();
- for (String seg : Segments) {
+ List<Segment> deleteSegments = new ArrayList<>();
+ for (Segment seg : segments) {
if (checkDeleteDeltaFilesInSeg(seg, segmentUpdateStatusManager,
numberDeleteDeltaFilesThreshold)) {
deleteSegments.add(seg);
@@ -975,7 +986,7 @@ public final class CarbonDataMergerUtil {
// This Code Block Append the Segname along with the Blocks selected for Merge instead of
// only taking the segment name. This will help to parallelize better for each block
// in case of Delete Horizontal Compaction.
- for (String segName : deleteSegments) {
+ for (Segment segName : deleteSegments) {
List<String> tempSegments = getDeleteDeltaFilesInSeg(segName, segmentUpdateStatusManager,
numberDeleteDeltaFilesThreshold);
validSegments.addAll(tempSegments);
@@ -984,10 +995,10 @@ public final class CarbonDataMergerUtil {
} else if (CompactionType.IUD_UPDDEL_DELTA == compactionTypeIUD) {
int numberUpdateDeltaFilesThreshold =
CarbonProperties.getInstance().getNoUpdateDeltaFilesThresholdForIUDCompaction();
- for (String seg : Segments) {
+ for (Segment seg : segments) {
if (checkUpdateDeltaFilesInSeg(seg, absoluteTableIdentifier, segmentUpdateStatusManager,
numberUpdateDeltaFilesThreshold)) {
- validSegments.add(seg);
+ validSegments.add(seg.getSegmentNo());
}
}
}
@@ -1027,7 +1038,7 @@ public final class CarbonDataMergerUtil {
* @param numberDeltaFilesThreshold
* @return
*/
- public static Boolean checkUpdateDeltaFilesInSeg(String seg,
+ public static Boolean checkUpdateDeltaFilesInSeg(Segment seg,
AbsoluteTableIdentifier absoluteTableIdentifier,
SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) {
@@ -1036,14 +1047,14 @@ public final class CarbonDataMergerUtil {
CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
- String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", seg);
+ String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", seg.getSegmentNo());
CarbonFile segDir =
FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
CarbonFile[] allSegmentFiles = segDir.listFiles();
updateDeltaFiles = segmentUpdateStatusManager
- .getUpdateDeltaFilesForSegment(seg, true, CarbonCommonConstants.UPDATE_DELTA_FILE_EXT,
- false, allSegmentFiles);
+ .getUpdateDeltaFilesForSegment(seg.getSegmentNo(), true,
+ CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, false, allSegmentFiles);
if (updateDeltaFiles == null) {
return false;
@@ -1079,11 +1090,12 @@ public final class CarbonDataMergerUtil {
* @param numberDeltaFilesThreshold
* @return
*/
- private static boolean checkDeleteDeltaFilesInSeg(String seg,
+ private static boolean checkDeleteDeltaFilesInSeg(Segment seg,
SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) {
Set<String> uniqueBlocks = new HashSet<String>();
- List<String> blockNameList = segmentUpdateStatusManager.getBlockNameFromSegment(seg);
+ List<String> blockNameList =
+ segmentUpdateStatusManager.getBlockNameFromSegment(seg.getSegmentNo());
for (final String blockName : blockNameList) {
@@ -1121,11 +1133,12 @@ public final class CarbonDataMergerUtil {
* @return
*/
- private static List<String> getDeleteDeltaFilesInSeg(String seg,
+ private static List<String> getDeleteDeltaFilesInSeg(Segment seg,
SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) {
List<String> blockLists = new ArrayList<>();
- List<String> blockNameList = segmentUpdateStatusManager.getBlockNameFromSegment(seg);
+ List<String> blockNameList =
+ segmentUpdateStatusManager.getBlockNameFromSegment(seg.getSegmentNo());
for (final String blockName : blockNameList) {
@@ -1133,7 +1146,7 @@ public final class CarbonDataMergerUtil {
segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName);
if (deleteDeltaFiles.length > numberDeltaFilesThreshold) {
- blockLists.add(seg + "/" + blockName);
+ blockLists.add(seg.getSegmentNo() + "/" + blockName);
}
}
return blockLists;
@@ -1177,7 +1190,7 @@ public final class CarbonDataMergerUtil {
segmentUpdateStatusManager.setUpdateStatusDetails(segmentUpdateDetails);
CarbonFile[] deleteDeltaFiles =
- segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName);
+ segmentUpdateStatusManager.getDeleteDeltaFilesList(new Segment(seg, null), blockName);
String destFileName =
blockName + "-" + timestamp.toString() + CarbonCommonConstants.DELETE_DELTA_FILE_EXT;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 2480a39..2fbdf4f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -26,7 +26,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.PartitionMapFileStore;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -35,7 +36,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
import org.apache.carbondata.processing.sort.sortdata.SingleThreadFinalSortFilesMerger;
@@ -129,19 +129,20 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
*/
private SortIntermediateFileMerger intermediateFileMerger;
- private List<String> partitionNames;
+ private PartitionSpec partitionSpec;
+
private SortParameters sortParameters;
public CompactionResultSortProcessor(CarbonLoadModel carbonLoadModel, CarbonTable carbonTable,
SegmentProperties segmentProperties, CompactionType compactionType, String tableName,
- List<String> partitionNames) {
+ PartitionSpec partitionSpec) {
this.carbonLoadModel = carbonLoadModel;
this.carbonTable = carbonTable;
this.segmentProperties = segmentProperties;
this.segmentId = carbonLoadModel.getSegmentId();
this.compactionType = compactionType;
this.tableName = tableName;
- this.partitionNames = partitionNames;
+ this.partitionSpec = partitionSpec;
}
/**
@@ -168,14 +169,12 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
} catch (Exception e) {
LOGGER.error(e, "Compaction failed: " + e.getMessage());
} finally {
- if (partitionNames != null) {
+ if (partitionSpec != null) {
try {
- new PartitionMapFileStore().writePartitionMapFile(
- CarbonTablePath.getSegmentPath(
- carbonLoadModel.getTablePath(),
- carbonLoadModel.getSegmentId()),
- carbonLoadModel.getTaskNo(),
- partitionNames);
+ SegmentFileStore
+ .writeSegmentFile(carbonLoadModel.getTablePath(), carbonLoadModel.getTaskNo(),
+ partitionSpec.getLocation().toString(), carbonLoadModel.getFactTimeStamp() + "",
+ partitionSpec.getPartitions());
} catch (IOException e) {
LOGGER.error(e, "Compaction failed: " + e.getMessage());
isCompactionSuccess = false;
@@ -401,9 +400,19 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
* initialise carbon data writer instance
*/
private void initDataHandler() throws Exception {
+ String carbonStoreLocation;
+ if (partitionSpec != null) {
+ carbonStoreLocation =
+ partitionSpec.getLocation().toString() + CarbonCommonConstants.FILE_SEPARATOR
+ + carbonLoadModel.getFactTimeStamp() + ".tmp";
+ } else {
+ carbonStoreLocation = CarbonDataProcessorUtil
+ .createCarbonStoreLocation(carbonTable.getTablePath(), carbonLoadModel.getDatabaseName(),
+ tableName, carbonLoadModel.getPartitionId(), carbonLoadModel.getSegmentId());
+ }
CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
.getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, segmentProperties, tableName,
- tempStoreLocation);
+ tempStoreLocation, carbonStoreLocation);
setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonTable,
carbonFactDataHandlerModel);
dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(carbonFactDataHandlerModel,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index 3d0700b..b41829f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -24,18 +24,19 @@ import java.util.PriorityQueue;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.metadata.CarbonMetadata;
-import org.apache.carbondata.core.metadata.PartitionMapFileStore;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
import org.apache.carbondata.core.util.ByteUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.exception.SliceMergerException;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
@@ -51,7 +52,7 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
private CarbonFactHandler dataHandler;
private SegmentProperties segprop;
private CarbonLoadModel loadModel;
- private List<String> partitionNames;
+ private PartitionSpec partitionSpec;
/**
* record holder heap
*/
@@ -62,16 +63,26 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
public RowResultMergerProcessor(String databaseName,
String tableName, SegmentProperties segProp, String[] tempStoreLocation,
- CarbonLoadModel loadModel, CompactionType compactionType, List<String> partitionNames) {
+ CarbonLoadModel loadModel, CompactionType compactionType, PartitionSpec partitionSpec) {
this.segprop = segProp;
- this.partitionNames = partitionNames;
+ this.partitionSpec = partitionSpec;
this.loadModel = loadModel;
CarbonDataProcessorUtil.createLocations(tempStoreLocation);
CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
+ String carbonStoreLocation;
+ if (partitionSpec != null) {
+ carbonStoreLocation =
+ partitionSpec.getLocation().toString() + CarbonCommonConstants.FILE_SEPARATOR + loadModel
+ .getFactTimeStamp() + ".tmp";
+ } else {
+ carbonStoreLocation = CarbonDataProcessorUtil
+ .createCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(),
+ tableName, loadModel.getPartitionId(), loadModel.getSegmentId());
+ }
CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
.getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName,
- tempStoreLocation);
+ tempStoreLocation, carbonStoreLocation);
setDataFileAttributesInModel(loadModel, compactionType, carbonTable,
carbonFactDataHandlerModel);
carbonFactDataHandlerModel.setCompactionFlow(true);
@@ -157,14 +168,13 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
if (isDataPresent) {
this.dataHandler.closeHandler();
}
- if (partitionNames != null) {
- new PartitionMapFileStore().writePartitionMapFile(
- CarbonTablePath.getSegmentPath(loadModel.getTablePath(), loadModel.getSegmentId()),
- loadModel.getTaskNo(),
- partitionNames);
+ if (partitionSpec != null) {
+ SegmentFileStore.writeSegmentFile(loadModel.getTablePath(), loadModel.getTaskNo(),
+ partitionSpec.getLocation().toString(), loadModel.getFactTimeStamp() + "",
+ partitionSpec.getPartitions());
}
} catch (CarbonDataWriterException | IOException e) {
- LOGGER.error(e,"Exception in compaction merger");
+ LOGGER.error(e, "Exception in compaction merger");
mergeStatus = false;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
index 68a212e..ff6ca93 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
@@ -47,9 +47,12 @@ public class RowResultProcessor {
CarbonDataProcessorUtil.createLocations(tempStoreLocation);
this.segmentProperties = segProp;
String tableName = carbonTable.getTableName();
+ String carbonStoreLocation = CarbonDataProcessorUtil
+ .createCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(),
+ tableName, loadModel.getPartitionId(), loadModel.getSegmentId());
CarbonFactDataHandlerModel carbonFactDataHandlerModel =
CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable,
- segProp, tableName, tempStoreLocation);
+ segProp, tableName, tempStoreLocation, carbonStoreLocation);
CarbonDataFileAttributes carbonDataFileAttributes =
new CarbonDataFileAttributes(Long.parseLong(loadModel.getTaskNo()),
loadModel.getFactTimeStamp());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index d15152c..d77fcab 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -275,7 +275,7 @@ public class CarbonFactDataHandlerModel {
*/
public static CarbonFactDataHandlerModel getCarbonFactDataHandlerModel(CarbonLoadModel loadModel,
CarbonTable carbonTable, SegmentProperties segmentProperties, String tableName,
- String[] tempStoreLocation) {
+ String[] tempStoreLocation, String carbonDataDirectoryPath) {
CarbonFactDataHandlerModel carbonFactDataHandlerModel = new CarbonFactDataHandlerModel();
carbonFactDataHandlerModel.setSchemaUpdatedTimeStamp(carbonTable.getTableLastUpdatedTime());
carbonFactDataHandlerModel.setDatabaseName(loadModel.getDatabaseName());
@@ -307,9 +307,7 @@ public class CarbonFactDataHandlerModel {
measureDataTypes[i++] = msr.getDataType();
}
carbonFactDataHandlerModel.setMeasureDataType(measureDataTypes);
- String carbonDataDirectoryPath = CarbonDataProcessorUtil
- .checkAndCreateCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(),
- tableName, loadModel.getPartitionId(), loadModel.getSegmentId());
+ CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
List<CarbonDimension> dimensionByTableName = carbonTable.getDimensionByTableName(tableName);
boolean[] isUseInvertedIndexes = new boolean[dimensionByTableName.size()];
@@ -334,6 +332,10 @@ public class CarbonFactDataHandlerModel {
* @return data directory path
*/
private static String getCarbonDataFolderLocation(CarbonDataLoadConfiguration configuration) {
+ if (configuration.getDataWritePath() != null) {
+ CarbonUtil.checkAndCreateFolder(configuration.getDataWritePath());
+ return configuration.getDataWritePath();
+ }
AbsoluteTableIdentifier absoluteTableIdentifier = configuration.getTableIdentifier();
CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
String carbonDataDirectoryPath = carbonTablePath
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 376a546..1e648e1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -302,7 +302,7 @@ public final class CarbonDataProcessorUtil {
}
public static boolean isHeaderValid(String tableName, String[] csvHeader,
- CarbonDataLoadSchema schema) {
+ CarbonDataLoadSchema schema, List<String> ignoreColumns) {
Iterator<String> columnIterator =
CarbonDataProcessorUtil.getSchemaColumnNames(schema, tableName).iterator();
Set<String> csvColumns = new HashSet<String>(csvHeader.length);
@@ -311,7 +311,8 @@ public final class CarbonDataProcessorUtil {
// file header should contain all columns of carbon table.
// So csvColumns should contain all elements of columnIterator.
while (columnIterator.hasNext()) {
- if (!csvColumns.contains(columnIterator.next().toLowerCase())) {
+ String column = columnIterator.next().toLowerCase();
+ if (!csvColumns.contains(column) && !ignoreColumns.contains(column)) {
return false;
}
}
@@ -377,7 +378,7 @@ public final class CarbonDataProcessorUtil {
*
* @return data directory path
*/
- public static String checkAndCreateCarbonStoreLocation(String factStoreLocation,
+ public static String createCarbonStoreLocation(String factStoreLocation,
String databaseName, String tableName, String partitionId, String segmentId) {
CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
@@ -385,7 +386,6 @@ public final class CarbonDataProcessorUtil {
CarbonStorePath.getCarbonTablePath(factStoreLocation, carbonTableIdentifier);
String carbonDataDirectoryPath =
carbonTablePath.getCarbonDataDirectoryPath(partitionId, segmentId);
- CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
return carbonDataDirectoryPath;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 00f13a5..32c72da 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.cache.CacheType;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.block.Distributable;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
@@ -166,6 +167,23 @@ public final class CarbonLoaderUtil {
public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry,
CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite, String uuid)
throws IOException {
+ return recordNewLoadMetadata(newMetaEntry, loadModel, loadStartEntry, insertOverwrite, uuid,
+ new ArrayList<Segment>(), new ArrayList<Segment>());
+ }
+
+ /**
+ * This API will write the load level metadata for the loadmanagement module inorder to
+ * manage the load and query execution management smoothly.
+ *
+ * @param newMetaEntry
+ * @param loadModel
+ * @param uuid
+ * @return boolean which determines whether status update is done or not.
+ * @throws IOException
+ */
+ public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry,
+ CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite, String uuid,
+ List<Segment> segmentsToBeDeleted, List<Segment> segmentFilesTobeUpdated) throws IOException {
boolean status = false;
AbsoluteTableIdentifier absoluteTableIdentifier =
loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
@@ -237,9 +255,11 @@ public final class CarbonLoaderUtil {
// existing entry needs to be overwritten as the entry will exist with some
// intermediate status
int indexToOverwriteNewMetaEntry = 0;
+ boolean found = false;
for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
if (entry.getLoadName().equals(newMetaEntry.getLoadName())
&& entry.getLoadStartTime() == newMetaEntry.getLoadStartTime()) {
+ found = true;
break;
}
indexToOverwriteNewMetaEntry++;
@@ -254,6 +274,10 @@ public final class CarbonLoaderUtil {
}
}
}
+ if (!found) {
+ LOGGER.error("Entry not found to update " + newMetaEntry + " From list :: "
+ + listOfLoadFolderDetails);
+ }
listOfLoadFolderDetails.set(indexToOverwriteNewMetaEntry, newMetaEntry);
}
// when no records are inserted then newSegmentEntry will be SegmentStatus.MARKED_FOR_DELETE
@@ -262,6 +286,17 @@ public final class CarbonLoaderUtil {
addToStaleFolders(carbonTablePath, staleFolders, newMetaEntry);
}
+ for (LoadMetadataDetails detail: listOfLoadFolderDetails) {
+ // if the segments is in the list of marked for delete then update the status.
+ if (segmentsToBeDeleted.contains(new Segment(detail.getLoadName(), null))) {
+ detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
+ } else if (segmentFilesTobeUpdated.contains(Segment.toSegment(detail.getLoadName()))) {
+ detail.setSegmentFile(
+ detail.getLoadName() + "_" + newMetaEntry.getUpdateStatusFileName()
+ + CarbonTablePath.SEGMENT_EXT);
+ }
+ }
+
SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails
.toArray(new LoadMetadataDetails[listOfLoadFolderDetails.size()]));
// Delete all old stale segment folders
@@ -907,8 +942,8 @@ public final class CarbonLoaderUtil {
String segmentId, CarbonTable carbonTable) throws IOException {
CarbonTablePath carbonTablePath =
CarbonStorePath.getCarbonTablePath((carbonTable.getAbsoluteTableIdentifier()));
- Map<String, Long> dataIndexSize =
- CarbonUtil.getDataSizeAndIndexSize(carbonTablePath, segmentId);
+ Map<String, Long> dataIndexSize = CarbonUtil.getDataSizeAndIndexSize(carbonTablePath,
+ new Segment(segmentId, loadMetadataDetails.getSegmentFile()));
Long dataSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE);
loadMetadataDetails.setDataSize(String.valueOf(dataSize));
Long indexSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
index 02ab1d8..52b9f52 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
@@ -18,16 +18,19 @@
package org.apache.carbondata.processing.util;
import java.io.IOException;
+import java.util.List;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.locks.CarbonLockFactory;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.locks.LockUsage;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
@@ -60,54 +63,61 @@ public final class DeleteLoadFolders {
}
public static void physicalFactAndMeasureMetadataDeletion(
- AbsoluteTableIdentifier absoluteTableIdentifier, String metadataPath, boolean isForceDelete) {
+ AbsoluteTableIdentifier absoluteTableIdentifier, String metadataPath, boolean isForceDelete,
+ List<PartitionSpec> specs) {
LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath);
for (LoadMetadataDetails oneLoad : currentDetails) {
if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) {
- String path = getSegmentPath(absoluteTableIdentifier, 0, oneLoad);
- boolean status = false;
try {
- if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
- CarbonFile file = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));
- CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() {
-
- @Override public boolean accept(CarbonFile file) {
- return (CarbonTablePath.isCarbonDataFile(file.getName())
- || CarbonTablePath.isCarbonIndexFile(file.getName())
- || CarbonTablePath.isPartitionMapFile(file.getName()));
- }
- });
+ if (oneLoad.getSegmentFile() != null) {
+ SegmentFileStore
+ .deleteSegment(absoluteTableIdentifier.getTablePath(), oneLoad.getSegmentFile(),
+ specs);
+ } else {
+ String path = getSegmentPath(absoluteTableIdentifier, 0, oneLoad);
+ boolean status = false;
+ if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
+ CarbonFile file = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));
+ CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() {
+
+ @Override public boolean accept(CarbonFile file) {
+ return (CarbonTablePath.isCarbonDataFile(file.getName()) ||
+ CarbonTablePath.isCarbonIndexFile(file.getName()));
+ }
+ });
- //if there are no fact and msr metadata files present then no need to keep
- //entry in metadata.
- if (filesToBeDeleted.length == 0) {
- status = true;
- } else {
+ //if there are no fact and msr metadata files present then no need to keep
+ //entry in metadata.
+ if (filesToBeDeleted.length == 0) {
+ status = true;
+ } else {
- for (CarbonFile eachFile : filesToBeDeleted) {
- if (!eachFile.delete()) {
- LOGGER.warn("Unable to delete the file as per delete command " + eachFile
- .getAbsolutePath());
- status = false;
- } else {
- status = true;
+ for (CarbonFile eachFile : filesToBeDeleted) {
+ if (!eachFile.delete()) {
+ LOGGER.warn("Unable to delete the file as per delete command " + eachFile
+ .getAbsolutePath());
+ status = false;
+ } else {
+ status = true;
+ }
}
}
- }
- // need to delete the complete folder.
- if (status) {
- if (!file.delete()) {
- LOGGER.warn(
- "Unable to delete the folder as per delete command " + file.getAbsolutePath());
+ // need to delete the complete folder.
+ if (status) {
+ if (!file.delete()) {
+ LOGGER.warn("Unable to delete the folder as per delete command " + file
+ .getAbsolutePath());
+ }
}
+
+ } else {
+ LOGGER.warn("Files are not found in segment " + path
+ + " it seems, files are already being deleted");
}
- } else {
- LOGGER.warn("Files are not found in segment " + path
- + " it seems, files are already being deleted");
}
} catch (IOException e) {
- LOGGER.warn("Unable to delete the file as per delete command " + path);
+ LOGGER.warn("Unable to delete the file as per delete command " + oneLoad.getLoadName());
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
index 63320ef..cd1e28a 100644
--- a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
+++ b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
@@ -83,7 +83,7 @@ public class BlockIndexStoreTest extends TestCase {
// assertTrue(false);
// }
// List<String> segmentIds = new ArrayList<>();
-// segmentIds.add(info.getSegmentId());
+// segmentIds.add(info.getSegment());
// cache.removeTableBlocks(segmentIds, absoluteTableIdentifier);
// }
//
@@ -151,7 +151,7 @@ public class BlockIndexStoreTest extends TestCase {
// }
// List<String> segmentIds = new ArrayList<>();
// for (TableBlockInfo tableBlockInfo : tableBlockInfos) {
-// segmentIds.add(tableBlockInfo.getSegmentId());
+// segmentIds.add(tableBlockInfo.getSegment());
// }
// cache.removeTableBlocks(segmentIds, absoluteTableIdentifier);
// }
@@ -223,7 +223,7 @@ public class BlockIndexStoreTest extends TestCase {
// }
// List<String> segmentIds = new ArrayList<>();
// for (TableBlockInfo tableBlockInfo : tableBlockInfos) {
-// segmentIds.add(tableBlockInfo.getSegmentId());
+// segmentIds.add(tableBlockInfo.getSegment());
// }
// cache.removeTableBlocks(segmentIds, absoluteTableIdentifier);
// }