You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/02/26 12:19:05 UTC
[5/9] carbondata git commit: [CARBONDATA-2187][PARTITION] Partition
restructure for new folder structure and supporting partition location
feature
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index bc84e04..20d3032 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -29,8 +29,10 @@ import org.apache.spark.sql.util.CarbonException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.CarbonTableIdentifier
-import org.apache.carbondata.core.metadata.PartitionMapFileStore.PartitionMapper
+import org.apache.carbondata.core.metadata.SegmentFileStore.SegmentFile
import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, DecimalType}
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema._
@@ -109,15 +111,14 @@ case class CarbonMergerMapping(
var mergedLoadName: String,
databaseName: String,
factTableName: String,
- validSegments: Array[String],
+ validSegments: Array[Segment],
tableId: String,
campactionType: CompactionType,
// maxSegmentColCardinality is Cardinality of last segment of compaction
var maxSegmentColCardinality: Array[Int],
// maxSegmentColumnSchemaList is list of column schema of last segment of compaction
var maxSegmentColumnSchemaList: List[ColumnSchema],
- currentPartitions: Seq[String],
- @transient partitionMapper: PartitionMapper)
+ currentPartitions: Option[Seq[PartitionSpec]])
case class NodeInfo(TaskId: String, noOfBlocks: Int)
@@ -133,20 +134,20 @@ case class UpdateTableModel(
isUpdate: Boolean,
updatedTimeStamp: Long,
var executorErrors: ExecutionErrors,
- deletedSegments: Seq[String])
+ deletedSegments: Seq[Segment])
case class CompactionModel(compactionSize: Long,
compactionType: CompactionType,
carbonTable: CarbonTable,
isDDLTrigger: Boolean,
- currentPartitions: Seq[String])
+ currentPartitions: Option[Seq[PartitionSpec]])
case class CompactionCallableModel(carbonLoadModel: CarbonLoadModel,
carbonTable: CarbonTable,
loadsToMerge: util.List[LoadMetadataDetails],
sqlContext: SQLContext,
compactionType: CompactionType,
- currentPartitions: Seq[String])
+ currentPartitions: Option[Seq[PartitionSpec]])
case class AlterPartitionModel(carbonLoadModel: CarbonLoadModel,
segmentId: String,
@@ -161,7 +162,7 @@ case class SplitPartitionCallableModel(carbonLoadModel: CarbonLoadModel,
sqlContext: SQLContext)
case class DropPartitionCallableModel(carbonLoadModel: CarbonLoadModel,
- segmentId: String,
+ segmentId: Segment,
partitionId: String,
oldPartitionIds: List[Int],
dropWithData: Boolean,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 8ed7623..1695a13 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -38,8 +38,8 @@ import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, NewHadoopRDD, RDD}
import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Row, SQLContext}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.{CompactionModel, ExecutionErrors, UpdateTableModel}
-import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.util.CarbonException
@@ -47,6 +47,7 @@ import org.apache.spark.sql.util.CarbonException
import org.apache.carbondata.common.constants.LoggerAction
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
import org.apache.carbondata.core.dictionary.server.DictionaryServer
import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
@@ -207,7 +208,9 @@ object CarbonDataRDDFactory {
compactionType,
table,
compactionModel.isDDLTrigger,
- CarbonFilters.getCurrentPartitions(sqlContext.sparkSession, table))
+ CarbonFilters.getCurrentPartitions(sqlContext.sparkSession,
+ TableIdentifier(table.getTableName,
+ Some(table.getDatabaseName))))
// proceed for compaction
try {
CompactionFactory.getCompactor(
@@ -395,26 +398,9 @@ object CarbonDataRDDFactory {
} catch {
case ex: Throwable =>
loadStatus = SegmentStatus.LOAD_FAILURE
- ex match {
- case sparkException: SparkException =>
- if (sparkException.getCause.isInstanceOf[DataLoadingException] ||
- sparkException.getCause.isInstanceOf[CarbonDataLoadingException]) {
- executorMessage = sparkException.getCause.getMessage
- errorMessage = errorMessage + ": " + executorMessage
- } else if (sparkException.getCause.isInstanceOf[TextParsingException]) {
- executorMessage = CarbonDataProcessorUtil
- .trimErrorMessage(sparkException.getCause.getMessage)
- errorMessage = errorMessage + " : " + executorMessage
- }
- case aex: AnalysisException =>
- LOGGER.error(aex.getMessage())
- throw aex
- case _ =>
- if (ex.getCause != null) {
- executorMessage = ex.getCause.getMessage
- errorMessage = errorMessage + ": " + executorMessage
- }
- }
+ val (extrMsgLocal, errorMsgLocal) = CarbonScalaUtil.retrieveAndLogErrorMsg(ex, LOGGER)
+ executorMessage = extrMsgLocal
+ errorMessage = errorMsgLocal
LOGGER.info(errorMessage)
LOGGER.error(ex)
} finally {
@@ -423,14 +409,7 @@ object CarbonDataRDDFactory {
// handle the status file updation for the update cmd.
if (updateModel.isDefined) {
if (loadStatus == SegmentStatus.LOAD_FAILURE) {
- if (updateModel.get.executorErrors.failureCauses == FailureCauses.NONE) {
- updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
- if (null != executorMessage && !executorMessage.isEmpty) {
- updateModel.get.executorErrors.errorMsg = executorMessage
- } else {
- updateModel.get.executorErrors.errorMsg = "Update failed as the data load has failed."
- }
- }
+ CarbonScalaUtil.updateErrorInUpdateModel(updateModel.get, executorMessage)
return
} else if (loadStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
updateModel.get.executorErrors.failureCauses == FailureCauses.BAD_RECORDS &&
@@ -441,12 +420,12 @@ object CarbonDataRDDFactory {
// success case.
// write the dictionary file in case of single_pass true
writeDictionary(carbonLoadModel, result, false)
- val segmentDetails = new util.HashSet[String]()
+ val segmentDetails = new util.HashSet[Segment]()
var resultSize = 0
res.foreach { resultOfSeg =>
resultSize = resultSize + resultOfSeg.size
resultOfSeg.foreach { resultOfBlock =>
- segmentDetails.add(resultOfBlock._2._1.getLoadName)
+ segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName, null))
}
}
@@ -462,7 +441,7 @@ object CarbonDataRDDFactory {
carbonTable,
updateModel.get.updatedTimeStamp + "",
true,
- new util.ArrayList[String](0))) {
+ new util.ArrayList[Segment](0))) {
LOGGER.audit("Data update is successful for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
} else {
@@ -744,7 +723,9 @@ object CarbonDataRDDFactory {
CompactionType.MINOR,
carbonTable,
isCompactionTriggerByDDl,
- CarbonFilters.getCurrentPartitions(sqlContext.sparkSession, carbonTable))
+ CarbonFilters.getCurrentPartitions(sqlContext.sparkSession,
+ TableIdentifier(carbonTable.getTableName,
+ Some(carbonTable.getDatabaseName))))
var storeLocation = ""
val configuredStore = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
if (null != configuredStore && configuredStore.nonEmpty) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index e7bdff8..07acaa5 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -18,26 +18,23 @@
package org.apache.carbondata.spark.rdd
import java.util
-import java.util.{List, Map}
+import java.util.List
import java.util.concurrent.ExecutorService
import scala.collection.JavaConverters._
-import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel, CompactionModel}
-import org.apache.carbondata.core.metadata.PartitionMapFileStore
-import org.apache.carbondata.core.metadata.PartitionMapFileStore.PartitionMapper
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events._
-import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostStatusUpdateEvent, LoadTablePreStatusUpdateEvent}
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.spark.MergeResultImpl
-import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory.LOGGER
import org.apache.carbondata.spark.util.CommonUtil
/**
@@ -136,39 +133,19 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
var finalMergeStatus = false
val databaseName: String = carbonLoadModel.getDatabaseName
val factTableName = carbonLoadModel.getTableName
- val validSegments: Array[String] = CarbonDataMergerUtil
- .getValidSegments(loadsToMerge).split(',')
- val partitionMapper = if (carbonTable.isHivePartitionTable) {
- var partitionMap: util.Map[String, util.List[String]] = null
- validSegments.foreach { segmentId =>
- val localMapper = new PartitionMapFileStore()
- localMapper.readAllPartitionsOfSegment(
- CarbonTablePath.getSegmentPath(carbonLoadModel.getTablePath, segmentId))
- if (partitionMap == null) {
- partitionMap = localMapper.getPartitionMap
- } else {
- partitionMap.putAll(localMapper.getPartitionMap)
- }
- }
- val mapper = new PartitionMapper()
- mapper.setPartitionMap(partitionMap)
- mapper
- } else {
- null
- }
+ val validSegments: List[Segment] = CarbonDataMergerUtil.getValidSegments(loadsToMerge)
val carbonMergerMapping = CarbonMergerMapping(
tablePath,
carbonTable.getMetaDataFilepath,
mergedLoadName,
databaseName,
factTableName,
- validSegments,
+ validSegments.asScala.toArray,
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
compactionType,
maxSegmentColCardinality = null,
maxSegmentColumnSchemaList = null,
- currentPartitions = partitions,
- partitionMapper)
+ currentPartitions = partitions)
carbonLoadModel.setTablePath(carbonMergerMapping.hdfsStoreLocation)
carbonLoadModel.setLoadMetadataDetails(
SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava)
@@ -221,11 +198,28 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
if (finalMergeStatus) {
val mergedLoadNumber = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName)
- new PartitionMapFileStore().mergePartitionMapFiles(
- CarbonTablePath.getSegmentPath(tablePath, mergedLoadNumber),
- carbonLoadModel.getFactTimeStamp + "")
+ var segmentFileName: String = null
+ if (carbonTable.isHivePartitionTable) {
+ val readPath =
+ CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath) +
+ CarbonCommonConstants.FILE_SEPARATOR + carbonLoadModel.getFactTimeStamp + ".tmp"
+ // Merge all partition files into a single file.
+ segmentFileName =
+ mergedLoadNumber + "_" + carbonLoadModel.getFactTimeStamp
+ val segmentFile = SegmentFileStore
+ .mergeSegmentFiles(readPath,
+ segmentFileName,
+ CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath))
+ if (segmentFile != null) {
+ SegmentFileStore
+ .moveFromTempFolder(segmentFile,
+ carbonLoadModel.getFactTimeStamp + ".tmp",
+ carbonLoadModel.getTablePath)
+ }
+ segmentFileName = segmentFileName + CarbonTablePath.SEGMENT_EXT
+ }
// trigger event for compaction
- val alterTableCompactionPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent =
+ val alterTableCompactionPreStatusUpdateEvent =
AlterTableCompactionPreStatusUpdateEvent(sc.sparkSession,
carbonTable,
carbonMergerMapping,
@@ -242,9 +236,13 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
.updateLoadMetadataIUDUpdateDeltaMergeStatus(loadsToMerge,
carbonTable.getMetaDataFilepath,
carbonLoadModel)) ||
- CarbonDataMergerUtil
- .updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath,
- mergedLoadNumber, carbonLoadModel, compactionType)
+ CarbonDataMergerUtil.updateLoadMetadataWithMergeStatus(
+ loadsToMerge,
+ carbonTable.getMetaDataFilepath,
+ mergedLoadNumber,
+ carbonLoadModel,
+ compactionType,
+ segmentFileName)
val compactionLoadStatusPostEvent = AlterTableCompactionPostStatusUpdateEvent(carbonTable,
carbonMergerMapping,
carbonLoadModel,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
index 833c6fe..c286c50 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
@@ -55,7 +55,9 @@ case class CarbonCountStar(
CarbonFilters.getPartitions(
Seq.empty,
sparkSession,
- TableIdentifier(carbonTable.getTableName, Some(carbonTable.getDatabaseName))).asJava),
+ TableIdentifier(
+ carbonTable.getTableName,
+ Some(carbonTable.getDatabaseName))).map(_.asJava).orNull),
absoluteTableIdentifier)
val value = new GenericInternalRow(Seq(Long.box(rowCount)).toArray.asInstanceOf[Array[Any]])
val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 0978fab..46905b8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CarbonException
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.scan.expression.Expression
@@ -67,7 +68,7 @@ case class CarbonDatasourceHadoopRelation(
def buildScan(requiredColumns: Array[String],
filters: Array[Filter],
- partitions: Seq[String]): RDD[InternalRow] = {
+ partitions: Seq[PartitionSpec]): RDD[InternalRow] = {
val filterExpression: Option[Expression] = filters.flatMap { filter =>
CarbonFilters.createCarbonFilter(schema, filter)
}.reduceOption(new AndExpression(_, _))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 667d550..7e3b699 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -206,7 +206,9 @@ case class CarbonAlterTableCompactionCommand(
compactionType,
carbonTable,
isCompactionTriggerByDDl,
- CarbonFilters.getCurrentPartitions(sqlContext.sparkSession, carbonTable)
+ CarbonFilters.getCurrentPartitions(sqlContext.sparkSession,
+ TableIdentifier(carbonTable.getTableName,
+ Some(carbonTable.getDatabaseName)))
)
val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index 4f90fb5..d2adc57 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.carbondata.api.CarbonStore
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.spark.exception.ConcurrentOperationException
@@ -89,14 +90,10 @@ case class CarbonCleanFilesCommand(
private def cleanGarbageData(sparkSession: SparkSession,
databaseNameOp: Option[String], tableName: String): Unit = {
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
- val partitions: Option[Seq[String]] = if (carbonTable.isHivePartitionTable) {
- Some(CarbonFilters.getPartitions(
- Seq.empty[Expression],
- sparkSession,
- TableIdentifier(tableName, databaseNameOp)))
- } else {
- None
- }
+ val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions(
+ Seq.empty[Expression],
+ sparkSession,
+ TableIdentifier(tableName, databaseNameOp))
CarbonStore.cleanFiles(
dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
tableName = tableName,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 9bdaddb..7800d3e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -42,6 +42,7 @@ import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFile
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.types._
+import org.apache.spark.storage.StorageLevel
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
@@ -51,11 +52,13 @@ import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.dictionary.server.{DictionaryServer, NonSecureDictionaryServer}
import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider
-import org.apache.carbondata.core.metadata.PartitionMapFileStore
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
-import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.SegmentStatus
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
@@ -72,7 +75,7 @@ import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServicePr
import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark
-import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD}
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, GlobalDictionaryUtil, SparkDataTypeConverterImpl}
case class CarbonLoadDataCommand(
@@ -97,6 +100,8 @@ case class CarbonLoadDataCommand(
var sizeInBytes: Long = _
+ var currPartitions: util.List[PartitionSpec] = _
+
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
@@ -123,6 +128,12 @@ case class CarbonLoadDataCommand(
case l: LogicalRelation => l
}.head
sizeInBytes = logicalPartitionRelation.relation.sizeInBytes
+ currPartitions = CarbonFilters.getCurrentPartitions(
+ sparkSession,
+ TableIdentifier(tableName, databaseNameOp)) match {
+ case Some(parts) => new util.ArrayList(parts.toList.asJava)
+ case _ => null
+ }
}
operationContext.setProperty("isOverwrite", isOverwriteTable)
if(CarbonUtil.hasAggregationDataMap(table)) {
@@ -182,8 +193,9 @@ case class CarbonLoadDataCommand(
options,
optionsFinal,
carbonLoadModel,
- hadoopConf
- )
+ hadoopConf,
+ partition,
+ dataFrame.isDefined)
// Delete stale segment folders that are not in table status but are physically present in
// the Fact folder
LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
@@ -215,7 +227,10 @@ case class CarbonLoadDataCommand(
// First system has to partition the data first and then call the load data
LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
// Clean up the old invalid segment data before creating a new entry for new load.
- DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, table)
+ DataLoadingUtil.deleteLoadsAndUpdateMetadata(
+ isForceDeletion = false,
+ table,
+ currPartitions)
// add the start entry for the new load in the table status file
if (updateModel.isEmpty && !table.isHivePartitionTable) {
CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
@@ -259,7 +274,8 @@ case class CarbonLoadDataCommand(
columnar,
partitionStatus,
hadoopConf,
- operationContext)
+ operationContext,
+ LOGGER)
} else {
loadData(
sparkSession,
@@ -267,7 +283,8 @@ case class CarbonLoadDataCommand(
columnar,
partitionStatus,
hadoopConf,
- operationContext)
+ operationContext,
+ LOGGER)
}
val loadTablePostExecutionEvent: LoadTablePostExecutionEvent =
new LoadTablePostExecutionEvent(
@@ -331,7 +348,9 @@ case class CarbonLoadDataCommand(
columnar: Boolean,
partitionStatus: SegmentStatus,
hadoopConf: Configuration,
- operationContext: OperationContext): Unit = {
+ operationContext: OperationContext,
+ LOGGER: LogService): Seq[Row] = {
+ var rows = Seq.empty[Row]
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
.getCarbonTableIdentifier
@@ -418,12 +437,13 @@ case class CarbonLoadDataCommand(
if (carbonTable.isHivePartitionTable) {
try {
- loadDataWithPartition(
+ rows = loadDataWithPartition(
sparkSession,
carbonLoadModel,
hadoopConf,
loadDataFrame,
- operationContext)
+ operationContext,
+ LOGGER)
} finally {
server match {
case Some(dictServer) =>
@@ -450,6 +470,7 @@ case class CarbonLoadDataCommand(
updateModel,
operationContext)
}
+ rows
}
private def loadData(
@@ -458,7 +479,9 @@ case class CarbonLoadDataCommand(
columnar: Boolean,
partitionStatus: SegmentStatus,
hadoopConf: Configuration,
- operationContext: OperationContext): Unit = {
+ operationContext: OperationContext,
+ LOGGER: LogService): Seq[Row] = {
+ var rows = Seq.empty[Row]
val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
val dataFrameWithTupleId: DataFrame = getDataFrameWithTupleID()
// getting all fields except tupleId field as it is not required in the value
@@ -478,12 +501,12 @@ case class CarbonLoadDataCommand(
dictionaryDataFrame)
}
if (table.isHivePartitionTable) {
- loadDataWithPartition(
+ rows = loadDataWithPartition(
sparkSession,
carbonLoadModel,
hadoopConf,
loadDataFrame,
- operationContext)
+ operationContext, LOGGER)
} else {
CarbonDataRDDFactory.loadCarbonData(
sparkSession.sqlContext,
@@ -497,6 +520,7 @@ case class CarbonLoadDataCommand(
updateModel,
operationContext)
}
+ rows
}
/**
@@ -504,24 +528,16 @@ case class CarbonLoadDataCommand(
* into partitoned data. The table relation would be converted to HadoopFSRelation to let spark
* handling the partitioning.
*/
- private def loadDataWithPartition(sparkSession: SparkSession,
+ private def loadDataWithPartition(
+ sparkSession: SparkSession,
carbonLoadModel: CarbonLoadModel,
hadoopConf: Configuration,
dataFrame: Option[DataFrame],
- operationContext: OperationContext): Unit = {
+ operationContext: OperationContext,
+ LOGGER: LogService): Seq[Row] = {
val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val identifier = TableIdentifier(table.getTableName, Some(table.getDatabaseName))
val catalogTable: CatalogTable = logicalPartitionRelation.catalogTable.get
- val currentPartitions =
- CarbonFilters.getPartitions(Seq.empty[Expression], sparkSession, identifier)
- // Clean up the alreday dropped partitioned data
- new PartitionMapFileStore().cleanSegments(table, currentPartitions.asJava, false)
- // Converts the data to carbon understandable format. The timestamp/date format data needs to
- // converted to hive standard fomat to let spark understand the data to partition.
- val serializationNullFormat =
- carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
- val badRecordAction =
- carbonLoadModel.getBadRecordsAction.split(",")(1)
var timeStampformatString = carbonLoadModel.getTimestampformat
if (timeStampformatString.isEmpty) {
timeStampformatString = carbonLoadModel.getDefaultTimestampFormat
@@ -532,127 +548,114 @@ case class CarbonLoadDataCommand(
dateFormatString = carbonLoadModel.getDefaultDateFormat
}
val dateFormat = new SimpleDateFormat(dateFormatString)
- CarbonSession.threadSet(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, dateFormatString)
- CarbonSession.threadSet(
- CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT,
- timeStampformatString)
- CarbonSession.threadSet(
- CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT,
- serializationNullFormat)
- CarbonSession.threadSet(
- CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
- badRecordAction)
- val isEmptyBadRecord = carbonLoadModel.getIsEmptyDataBadRecord.split(",")(1)
- CarbonSession.threadSet(
- CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
- isEmptyBadRecord)
+ // Clean up the alreday dropped partitioned data
+ SegmentFileStore.cleanSegments(table, null, false)
CarbonSession.threadSet("partition.operationcontext", operationContext)
// input data from csv files. Convert to logical plan
val allCols = new ArrayBuffer[String]()
allCols ++= table.getAllDimensions.asScala.map(_.getColName)
allCols ++= table.getAllMeasures.asScala.map(_.getColName)
var attributes =
- StructType(allCols.map(StructField(_, StringType))).toAttributes
+ StructType(
+ allCols.filterNot(_.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)).map(
+ StructField(_, StringType))).toAttributes
var partitionsLen = 0
val sortScope = CarbonDataProcessorUtil.getSortScope(carbonLoadModel.getSortScope)
- def transformQuery(rdd: RDD[Row], isDataFrame: Boolean) = {
- val updatedRdd = convertData(rdd, sparkSession, carbonLoadModel, isDataFrame)
- val catalogAttributes = catalogTable.schema.toAttributes
- attributes = attributes.map(a => {
- catalogAttributes.find(_.name.equalsIgnoreCase(a.name)).get
- })
- attributes = attributes.map { attr =>
- val column = table.getColumnByName(table.getTableName, attr.name)
- if (column.hasEncoding(Encoding.DICTIONARY)) {
- AttributeReference(
- attr.name,
- IntegerType,
- attr.nullable,
- attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated)
- } else if (attr.dataType == TimestampType || attr.dataType == DateType) {
- AttributeReference(
- attr.name,
- LongType,
- attr.nullable,
- attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated)
- } else {
- attr
- }
- }
- // Only select the required columns
- val output = if (partition.nonEmpty) {
- val lowerCasePartition = partition.map { case (key, value) => (key.toLowerCase, value) }
- catalogTable.schema.map { attr =>
- attributes.find(_.name.equalsIgnoreCase(attr.name)).get
- }.filter(attr => lowerCasePartition.getOrElse(attr.name.toLowerCase, None).isEmpty)
- } else {
- catalogTable.schema.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get)
- }
- partitionsLen = rdd.partitions.length
- val child = Project(output, LogicalRDD(attributes, updatedRdd)(sparkSession))
- if (sortScope == SortScopeOptions.SortScope.GLOBAL_SORT) {
- val sortColumns = table.getSortColumns(table.getTableName)
- Sort(output.filter(f => sortColumns.contains(f.name)).map(SortOrder(_, Ascending)),
- true,
- child)
- } else {
- child
- }
+ val partitionValues = if (partition.nonEmpty) {
+ partition.filter(_._2.nonEmpty).map{ case(col, value) =>
+ val field = catalogTable.schema.find(_.name.equalsIgnoreCase(col)).get
+ CarbonScalaUtil.convertToDateAndTimeFormats(
+ value.get,
+ field.dataType,
+ timeStampFormat,
+ dateFormat)
+ }.toArray
+ } else {
+ Array[String]()
}
-
+ var persistedRDD: Option[RDD[InternalRow]] = None
try {
val query: LogicalPlan = if (dataFrame.isDefined) {
- val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
- val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
- val dfAttributes =
- StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes
- val partitionValues = if (partition.nonEmpty) {
- partition.values.filter(_.nonEmpty).map(_.get).toArray
+ val (rdd, dfAttributes) = if (updateModel.isDefined) {
+ // Get the updated query plan in case of update scenario
+ val updatedFrame = Dataset.ofRows(
+ sparkSession,
+ getLogicalQueryForUpdate(
+ sparkSession,
+ catalogTable,
+ dataFrame.get,
+ carbonLoadModel))
+ (updatedFrame.rdd, updatedFrame.schema)
} else {
- Array[String]()
+ if (partition.nonEmpty) {
+ val headers = carbonLoadModel.getCsvHeaderColumns.dropRight(partition.size)
+ val updatedHeader = headers ++ partition.keys.map(_.toLowerCase)
+ carbonLoadModel.setCsvHeader(updatedHeader.mkString(","))
+ carbonLoadModel.setCsvHeaderColumns(carbonLoadModel.getCsvHeader.split(","))
+ }
+ (dataFrame.get.rdd, dataFrame.get.schema)
+ }
+
+ val expectedColumns = {
+ val staticPartCols = partition.filter(_._2.isDefined).keySet
+ attributes.filterNot(a => staticPartCols.contains(a.name))
+ }
+ if (expectedColumns.length != dfAttributes.length) {
+ throw new AnalysisException(
+ s"Cannot insert into table $tableName because the number of columns are different: " +
+ s"need ${expectedColumns.length} columns, " +
+ s"but query has ${dfAttributes.length} columns.")
+ }
+ val nonPartitionBounds = expectedColumns.zipWithIndex.map(_._2).toArray
+ val partitionBounds = new Array[Int](partitionValues.length)
+ if (partition.nonEmpty) {
+ val nonPartitionSchemaLen = attributes.length - partition.size
+ var i = nonPartitionSchemaLen
+ var index = 0
+ var partIndex = 0
+ partition.values.foreach { p =>
+ if (p.isDefined) {
+ partitionBounds(partIndex) = nonPartitionSchemaLen + index
+ partIndex = partIndex + 1
+ } else {
+ nonPartitionBounds(i) = nonPartitionSchemaLen + index
+ i = i + 1
+ }
+ index = index + 1
+ }
}
- val len = dfAttributes.length
- val rdd = dataFrame.get.rdd.map { f =>
+
+ val len = dfAttributes.length + partitionValues.length
+ val transRdd = rdd.map { f =>
val data = new Array[Any](len)
var i = 0
while (i < f.length) {
- data(i) =
- UTF8String.fromString(
- CarbonScalaUtil.getString(f.get(i),
- serializationNullFormat,
- delimiterLevel1,
- delimiterLevel2,
- timeStampFormat,
- dateFormat))
+ data(nonPartitionBounds(i)) = f.get(i)
i = i + 1
}
- if (partitionValues.length > 0) {
- var j = 0
- while (i < len) {
- data(i) = UTF8String.fromString(partitionValues(j))
- j = j + 1
- i = i + 1
- }
+ var j = 0
+ while (j < partitionBounds.length) {
+ data(partitionBounds(j)) = UTF8String.fromString(partitionValues(j))
+ j = j + 1
}
Row.fromSeq(data)
}
- val transRdd = if (updateModel.isDefined) {
- // Get the updated query plan in case of update scenario
- Dataset.ofRows(
+
+ val (transformedPlan, partitions, persistedRDDLocal) =
+ transformQuery(
+ transRdd,
sparkSession,
- getLogicalQueryForUpdate(
- sparkSession,
- catalogTable,
- dfAttributes,
- rdd.map(row => InternalRow.fromSeq(row.toSeq)),
- carbonLoadModel)).rdd
- } else {
- rdd
- }
- transformQuery(transRdd, true)
+ carbonLoadModel,
+ partitionValues,
+ catalogTable,
+ attributes,
+ sortScope,
+ isDataFrame = true)
+ partitionsLen = partitions
+ persistedRDD = persistedRDDLocal
+ transformedPlan
} else {
-
val rowDataTypes = attributes.map { attribute =>
catalogTable.schema.find(_.name.equalsIgnoreCase(attribute.name)) match {
case Some(attr) => attr.dataType
@@ -667,12 +670,29 @@ case class CarbonLoadDataCommand(
}
}
val columnCount = carbonLoadModel.getCsvHeaderColumns.length
- var rdd = DataLoadingUtil.csvFileScanRDD(
+ val rdd = DataLoadingUtil.csvFileScanRDD(
sparkSession,
model = carbonLoadModel,
hadoopConf).map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
- transformQuery(rdd.asInstanceOf[RDD[Row]], false)
+ val (transformedPlan, partitions, persistedRDDLocal) =
+ transformQuery(
+ rdd.asInstanceOf[RDD[Row]],
+ sparkSession,
+ carbonLoadModel,
+ partitionValues,
+ catalogTable,
+ attributes,
+ sortScope,
+ isDataFrame = false)
+ partitionsLen = partitions
+ persistedRDD = persistedRDDLocal
+ transformedPlan
+ }
+ if (updateModel.isDefined) {
+ carbonLoadModel.setFactTimeStamp(updateModel.get.updatedTimeStamp)
}
+ // Create and ddd the segment to the tablestatus.
+ CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(carbonLoadModel, isOverwriteTable)
val convertRelation = convertToLogicalRelation(
catalogTable,
sizeInBytes,
@@ -703,23 +723,37 @@ case class CarbonLoadDataCommand(
overwrite = false,
ifPartitionNotExists = false)
Dataset.ofRows(sparkSession, convertedPlan)
+ } catch {
+ case ex: Throwable =>
+ val (executorMessage, errorMessage) = CarbonScalaUtil.retrieveAndLogErrorMsg(ex, LOGGER)
+ if (updateModel.isDefined) {
+ CarbonScalaUtil.updateErrorInUpdateModel(updateModel.get, executorMessage)
+ }
+ LOGGER.info(errorMessage)
+ LOGGER.error(ex)
+ throw new Exception(errorMessage)
} finally {
- CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
- CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT)
- CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT)
- CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
- CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
CarbonSession.threadUnset("partition.operationcontext")
if (isOverwriteTable) {
DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier)
// Clean the overwriting segments if any.
- new PartitionMapFileStore().cleanSegments(
+ SegmentFileStore.cleanSegments(
table,
- CarbonFilters.getPartitions(Seq.empty, sparkSession, identifier).asJava,
+ null,
false)
}
+ if (partitionsLen > 1) {
+ // clean cache only if persisted and keeping unpersist non-blocking as non-blocking call
+ // will not have any functional impact as spark automatically monitors the cache usage on
+ // each node and drops out old data partitions in a least-recently used (LRU) fashion.
+ persistedRDD match {
+ case Some(rdd) => rdd.unpersist(false)
+ case _ =>
+ }
+ }
}
try {
+ carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
// Trigger auto compaction
CarbonDataRDDFactory.handleSegmentMerging(
sparkSession.sqlContext,
@@ -732,37 +766,140 @@ case class CarbonLoadDataCommand(
"Dataload is success. Auto-Compaction has failed. Please check logs.",
e)
}
+ val specs =
+ SegmentFileStore.getPartitionSpecs(carbonLoadModel.getSegmentId, carbonLoadModel.getTablePath)
+ if (specs != null) {
+ specs.asScala.map{ spec =>
+ Row(spec.getPartitions.asScala.mkString("/"), spec.getLocation.toString, spec.getUuid)
+ }
+ } else {
+ Seq.empty[Row]
+ }
}
+ /**
+ * Transform the rdd to logical plan as per the sortscope. If it is global sort scope then it
+ * will convert to sort logical plan otherwise project plan.
+ */
+ private def transformQuery(rdd: RDD[Row],
+ sparkSession: SparkSession,
+ loadModel: CarbonLoadModel,
+ partitionValues: Array[String],
+ catalogTable: CatalogTable,
+ curAttributes: Seq[AttributeReference],
+ sortScope: SortScopeOptions.SortScope,
+ isDataFrame: Boolean): (LogicalPlan, Int, Option[RDD[InternalRow]]) = {
+ // Converts the data as per the loading steps before give it to writer or sorter
+ val updatedRdd = convertData(
+ rdd,
+ sparkSession,
+ loadModel,
+ isDataFrame,
+ partitionValues)
+ val catalogAttributes = catalogTable.schema.toAttributes
+ var attributes = curAttributes.map(a => {
+ catalogAttributes.find(_.name.equalsIgnoreCase(a.name)).get
+ })
+ attributes = attributes.map { attr =>
+ // Update attribute datatypes in case of dictionary columns, in case of dictionary columns
+ // datatype is always int
+ val column = table.getColumnByName(table.getTableName, attr.name)
+ if (column.hasEncoding(Encoding.DICTIONARY)) {
+ AttributeReference(
+ attr.name,
+ IntegerType,
+ attr.nullable,
+ attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated)
+ } else if (attr.dataType == TimestampType || attr.dataType == DateType) {
+ AttributeReference(
+ attr.name,
+ LongType,
+ attr.nullable,
+ attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated)
+ } else {
+ attr
+ }
+ }
+ // Only select the required columns
+ val output = if (partition.nonEmpty) {
+ val lowerCasePartition = partition.map { case (key, value) => (key.toLowerCase, value) }
+ catalogTable.schema.map { attr =>
+ attributes.find(_.name.equalsIgnoreCase(attr.name)).get
+ }.filter(attr => lowerCasePartition.getOrElse(attr.name.toLowerCase, None).isEmpty)
+ } else {
+ catalogTable.schema.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get)
+ }
+ val partitionsLen = rdd.partitions.length
+
+ // If it is global sort scope then appl sort logical plan on the sort columns
+ if (sortScope == SortScopeOptions.SortScope.GLOBAL_SORT) {
+ // Because if the number of partitions greater than 1, there will be action operator(sample)
+ // in sortBy operator. So here we cache the rdd to avoid do input and convert again.
+ if (partitionsLen > 1) {
+ updatedRdd.persist(StorageLevel.fromString(
+ CarbonProperties.getInstance().getGlobalSortRddStorageLevel))
+ }
+ val child = Project(output, LogicalRDD(attributes, updatedRdd)(sparkSession))
+ val sortColumns = table.getSortColumns(table.getTableName)
+ val sortPlan =
+ Sort(
+ output.filter(f => sortColumns.contains(f.name)).map(SortOrder(_, Ascending)),
+ global = true,
+ child)
+ (sortPlan, partitionsLen, Some(updatedRdd))
+ } else {
+ (Project(output, LogicalRDD(attributes, updatedRdd)(sparkSession)), partitionsLen, None)
+ }
+ }
+
+ /**
+ * Convert the rdd as per steps of data loading inputprocessor step and coverter step
+ * @param originRDD
+ * @param sparkSession
+ * @param model
+ * @param isDataFrame
+ * @param partitionValues
+ * @return
+ */
private def convertData(
originRDD: RDD[Row],
sparkSession: SparkSession,
model: CarbonLoadModel,
- isDataFrame: Boolean): RDD[InternalRow] = {
+ isDataFrame: Boolean,
+ partitionValues: Array[String]): RDD[InternalRow] = {
model.setPartitionId("0")
val sc = sparkSession.sparkContext
+ val info =
+ model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo.getFactTable.getPartitionInfo
+ info.setColumnSchemaList(new util.ArrayList[ColumnSchema](info.getColumnSchemaList))
val modelBroadcast = sc.broadcast(model)
val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator")
val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator")
// 1. Input
- var convertRDD =
+ val convertRDD =
if (isDataFrame) {
originRDD.mapPartitions{rows =>
DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast)
}
} else {
- originRDD.map{row =>
- val array = new Array[AnyRef](row.length)
+ // Append the partition columns in case of static partition scenario
+ val partitionLen = partitionValues.length
+ val len = model.getCsvHeaderColumns.length - partitionLen
+ originRDD.map{ row =>
+ val array = new Array[AnyRef](len + partitionLen)
var i = 0
- while (i < array.length) {
+ while (i < len) {
array(i) = row.get(i).asInstanceOf[AnyRef]
i = i + 1
}
+ if (partitionLen > 0) {
+ System.arraycopy(partitionValues, 0, array, i, partitionLen)
+ }
array
}
}
- val finalRDD = convertRDD.mapPartitionsWithIndex { case (index, rows) =>
+ val finalRDD = convertRDD.mapPartitionsWithIndex {case(index, rows) =>
DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
DataLoadProcessorStepOnSpark.inputAndconvertFunc(
rows,
@@ -783,12 +920,11 @@ case class CarbonLoadDataCommand(
private def getLogicalQueryForUpdate(
sparkSession: SparkSession,
catalogTable: CatalogTable,
- attributes: Seq[AttributeReference],
- rdd: RDD[InternalRow],
+ df: DataFrame,
carbonLoadModel: CarbonLoadModel): LogicalPlan = {
sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
// In case of update, we don't need the segmrntid column in case of partitioning
- val dropAttributes = attributes.dropRight(1)
+ val dropAttributes = df.logicalPlan.output.dropRight(1)
val finalOutput = catalogTable.schema.map { attr =>
dropAttributes.find { d =>
val index = d.name.lastIndexOf("-updatedColumn")
@@ -801,7 +937,7 @@ case class CarbonLoadDataCommand(
}
carbonLoadModel.setCsvHeader(catalogTable.schema.map(_.name.toLowerCase).mkString(","))
carbonLoadModel.setCsvHeaderColumns(carbonLoadModel.getCsvHeader.split(","))
- Project(finalOutput, LogicalRDD(attributes, rdd)(sparkSession))
+ Project(finalOutput, df.logicalPlan)
}
private def convertToLogicalRelation(
@@ -855,9 +991,19 @@ case class CarbonLoadDataCommand(
if (updateModel.isDefined) {
options += (("updatetimestamp", updateModel.get.updatedTimeStamp.toString))
if (updateModel.get.deletedSegments.nonEmpty) {
- options += (("segmentsToBeDeleted", updateModel.get.deletedSegments.mkString(",")))
+ options += (("segmentsToBeDeleted",
+ updateModel.get.deletedSegments.map(_.getSegmentNo).mkString(",")))
}
}
+ if (currPartitions != null) {
+ val currPartStr = ObjectSerializationUtil.convertObjectToString(currPartitions)
+ options += (("currentpartition", currPartStr))
+ }
+ if (loadModel.getSegmentId != null) {
+ val currLoadEntry =
+ ObjectSerializationUtil.convertObjectToString(loadModel.getCurrentLoadMetadataDetail)
+ options += (("currentloadentry", currLoadEntry))
+ }
val hdfsRelation = HadoopFsRelation(
location = catalog,
partitionSchema = partitionSchema,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
index 72ed051..e3e4c7a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
@@ -28,8 +28,10 @@ import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, Me
import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier, PartitionMapFileStore}
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier, SegmentFileStore}
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, TableInfo}
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
@@ -228,10 +230,17 @@ case class RefreshCarbonTableCommand(
val allpartitions = metadataDetails.map{ metadata =>
if (metadata.getSegmentStatus == SegmentStatus.SUCCESS ||
metadata.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
- val mapper = new PartitionMapFileStore()
- mapper.readAllPartitionsOfSegment(
- CarbonTablePath.getSegmentPath(absIdentifier.getTablePath, metadata.getLoadName))
- Some(mapper.getPartitionMap.values().asScala)
+ val mapper = new SegmentFileStore(absIdentifier.getTablePath, metadata.getSegmentFile)
+ val specs = mapper.getLocationMap.asScala.map { case(location, fd) =>
+ var updatedLoc =
+ if (fd.isRelative) {
+ absIdentifier.getTablePath + CarbonCommonConstants.FILE_SEPARATOR + location
+ } else {
+ location
+ }
+ new PartitionSpec(fd.getPartitions, updatedLoc)
+ }
+ Some(specs)
} else {
None
}
@@ -240,14 +249,14 @@ case class RefreshCarbonTableCommand(
TableIdentifier(absIdentifier.getTableName, Some(absIdentifier.getDatabaseName))
// Register the partition information to the hive metastore
allpartitions.foreach { segPartitions =>
- val specs: Seq[TablePartitionSpec] = segPartitions.map { indexPartitions =>
- indexPartitions.asScala.map{ p =>
+ val specs: Seq[(TablePartitionSpec, Option[String])] = segPartitions.map { indexPartitions =>
+ (indexPartitions.getPartitions.asScala.map{ p =>
val spec = p.split("=")
(spec(0), spec(1))
- }.toMap
+ }.toMap, Some(indexPartitions.getLocation.toString))
}.toSeq
// Add partition information
- AlterTableAddPartitionCommand(identifier, specs.map((_, None)), true).run(sparkSession)
+ AlterTableAddPartitionCommand(identifier, specs, true).run(sparkSession)
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 756d120..4886676 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -27,6 +27,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
@@ -173,7 +174,7 @@ private[sql] case class CarbonProjectForUpdateCommand(
sparkSession: SparkSession,
currentTime: Long,
executorErrors: ExecutionErrors,
- deletedSegments: Seq[String]): Unit = {
+ deletedSegments: Seq[Segment]): Unit = {
def isDestinationRelation(relation: CarbonDatasourceHadoopRelation): Boolean = {
val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 1ac0b34..25d5e91 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum}
@@ -61,15 +62,20 @@ object DeleteExecution {
dataRdd: RDD[Row],
timestamp: String,
isUpdateOperation: Boolean,
- executorErrors: ExecutionErrors): Seq[String] = {
+ executorErrors: ExecutionErrors): Seq[Segment] = {
var res: Array[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))]] = null
val database = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
val carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
- val factPath = carbonTablePath.getFactDir
- var segmentsTobeDeleted = Seq.empty[String]
+ val isPartitionTable = carbonTable.isHivePartitionTable
+ val factPath = if (isPartitionTable) {
+ carbonTablePath.getPath
+ } else {
+ carbonTablePath.getFactDir
+ }
+ var segmentsTobeDeleted = Seq.empty[Segment]
val deleteRdd = if (isUpdateOperation) {
val schema =
@@ -104,7 +110,7 @@ object DeleteExecution {
CarbonFilters.getPartitions(
Seq.empty,
sparkSession,
- TableIdentifier(tableName, databaseNameOp)).asJava)
+ TableIdentifier(tableName, databaseNameOp)).map(_.asJava).orNull)
val segmentUpdateStatusMngr = new SegmentUpdateStatusManager(absoluteTableIdentifier)
CarbonUpdateUtil
.createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr)
@@ -144,12 +150,12 @@ object DeleteExecution {
// all or none : update status file, only if complete delete opeartion is successfull.
def checkAndUpdateStatusFiles(): Unit = {
val blockUpdateDetailsList = new util.ArrayList[SegmentUpdateDetails]()
- val segmentDetails = new util.HashSet[String]()
+ val segmentDetails = new util.HashSet[Segment]()
res.foreach(resultOfSeg => resultOfSeg.foreach(
resultOfBlock => {
if (resultOfBlock._1 == SegmentStatus.SUCCESS) {
blockUpdateDetailsList.add(resultOfBlock._2._1)
- segmentDetails.add(resultOfBlock._2._1.getSegmentName)
+ segmentDetails.add(new Segment(resultOfBlock._2._1.getSegmentName, null))
// if this block is invalid then decrement block count in map.
if (CarbonUpdateUtil.isBlockInvalid(resultOfBlock._2._1.getSegmentStatus)) {
CarbonUpdateUtil.decrementDeletedBlockCount(resultOfBlock._2._1,
@@ -250,7 +256,7 @@ object DeleteExecution {
countOfRows = countOfRows + 1
}
- val blockPath = CarbonUpdateUtil.getTableBlockPath(TID, factPath)
+ val blockPath = CarbonUpdateUtil.getTableBlockPath(TID, factPath, isPartitionTable)
val completeBlockName = CarbonTablePath
.addDataPartPrefix(CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.BLOCK_ID) +
CarbonCommonConstants.FACT_FILE_EXT)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index bdecac1..f88e767 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompact
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
@@ -113,7 +114,7 @@ object HorizontalCompaction {
absTableIdentifier: AbsoluteTableIdentifier,
segmentUpdateStatusManager: SegmentUpdateStatusManager,
factTimeStamp: Long,
- segLists: util.List[String]): Unit = {
+ segLists: util.List[Segment]): Unit = {
val db = carbonTable.getDatabaseName
val table = carbonTable.getTableName
// get the valid segments qualified for update compaction.
@@ -163,7 +164,7 @@ object HorizontalCompaction {
absTableIdentifier: AbsoluteTableIdentifier,
segmentUpdateStatusManager: SegmentUpdateStatusManager,
factTimeStamp: Long,
- segLists: util.List[String]): Unit = {
+ segLists: util.List[Segment]): Unit = {
val db = carbonTable.getDatabaseName
val table = carbonTable.getTableName
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
new file mode 100644
index 0000000..2aaecc7
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.partition
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, AtomicRunnableCommand}
+import org.apache.spark.sql.optimizer.CarbonFilters
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatus
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+
+/**
+ * Adding the partition to the hive and create a new segment if the location has data.
+ *
+ */
+case class CarbonAlterTableAddHivePartitionCommand(
+ tableName: TableIdentifier,
+ partitionSpecsAndLocs: Seq[(TablePartitionSpec, Option[String])],
+ ifNotExists: Boolean)
+ extends AtomicRunnableCommand {
+
+ var partitionSpecsAndLocsTobeAdded : util.List[PartitionSpec] = _
+ var table: CarbonTable = _
+
+ override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+ table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+ if (table.isHivePartitionTable) {
+ val partitionWithLoc = partitionSpecsAndLocs.filter(_._2.isDefined)
+ if (partitionWithLoc.nonEmpty) {
+ val partitionSpecs = partitionWithLoc.map{ case (part, location) =>
+ new PartitionSpec(
+ new util.ArrayList(part.map(p => p._1 + "=" + p._2).toList.asJava),
+ location.get)
+ }
+ // Get all the partitions which are not already present in hive.
+ val currParts = CarbonFilters.getCurrentPartitions(sparkSession, tableName).get
+ partitionSpecsAndLocsTobeAdded =
+ new util.ArrayList(partitionSpecs.filterNot { part =>
+ currParts.exists(p => part.equals(p))
+ }.asJava)
+ }
+ AlterTableAddPartitionCommand(tableName, partitionSpecsAndLocs, ifNotExists).run(sparkSession)
+ }
+ Seq.empty[Row]
+ }
+
+
+ override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = {
+ AlterTableDropPartitionCommand(tableName, partitionSpecsAndLocs.map(_._1), true, false, true)
+ val msg = s"Got exception $exception when processing data of add partition." +
+ "Dropping partitions to the metadata"
+ LogServiceFactory.getLogService(this.getClass.getCanonicalName).error(msg)
+ Seq.empty[Row]
+ }
+
+ override def processData(sparkSession: SparkSession): Seq[Row] = {
+ // Partitions with physical data should be registered to as a new segment.
+ if (partitionSpecsAndLocsTobeAdded != null && partitionSpecsAndLocsTobeAdded.size() > 0) {
+ val segmentFile = SegmentFileStore.getSegmentFileForPhysicalDataPartitions(table.getTablePath,
+ partitionSpecsAndLocsTobeAdded)
+ if (segmentFile != null) {
+ val loadModel = new CarbonLoadModel
+ loadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(table))
+ // Create new entry in tablestatus file
+ CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, false)
+ val newMetaEntry = loadModel.getCurrentLoadMetadataDetail
+ val segmentFileName =
+ loadModel.getSegmentId + "_" + loadModel.getFactTimeStamp + CarbonTablePath.SEGMENT_EXT
+ newMetaEntry.setSegmentFile(segmentFileName)
+ val segmentsLoc = CarbonTablePath.getSegmentFilesLocation(table.getTablePath)
+ CarbonUtil.checkAndCreateFolder(segmentsLoc)
+ val segmentPath = segmentsLoc + CarbonCommonConstants.FILE_SEPARATOR + segmentFileName
+ SegmentFileStore.writeSegmentFile(segmentFile, segmentPath)
+ CarbonLoaderUtil.populateNewLoadMetaEntry(
+ newMetaEntry,
+ SegmentStatus.SUCCESS,
+ loadModel.getFactTimeStamp,
+ true)
+ // Add size to the entry
+ CarbonLoaderUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId, table)
+ // Make the load as success in table status
+ CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false)
+ }
+ }
+ Seq.empty[Row]
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
index cb4dece..407057e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
@@ -25,17 +25,17 @@ import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, AtomicRunnableCommand}
-import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.util.AlterTableUtil
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.PartitionMapFileStore
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.spark.rdd.{CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD}
+import org.apache.carbondata.spark.rdd.CarbonDropPartitionRDD
/**
* Drop the partitions from hive and carbon store. It drops the partitions in following steps
@@ -59,16 +59,43 @@ case class CarbonAlterTableDropHivePartitionCommand(
retainData: Boolean)
extends AtomicRunnableCommand {
+ var carbonPartitionsTobeDropped : util.List[PartitionSpec] = _
+ var table: CarbonTable = _
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
- val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+ table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
if (CarbonUtil.hasAggregationDataMap(table)) {
throw new AnalysisException(
"Partition can not be dropped as it is mapped to Pre Aggregate table")
}
if (table.isHivePartitionTable) {
+ var locks = List.empty[ICarbonLock]
try {
- specs.flatMap(f => sparkSession.sessionState.catalog.listPartitions(tableName, Some(f)))
+ val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
+ LockUsage.COMPACTION_LOCK,
+ LockUsage.DELETE_SEGMENT_LOCK,
+ LockUsage.DROP_TABLE_LOCK,
+ LockUsage.CLEAN_FILES_LOCK,
+ LockUsage.ALTER_PARTITION_LOCK)
+ locks = AlterTableUtil.validateTableAndAcquireLock(
+ table.getDatabaseName,
+ table.getTableName,
+ locksToBeAcquired)(sparkSession)
+ val partitions =
+ specs.flatMap(f => sparkSession.sessionState.catalog.listPartitions(tableName, Some(f)))
+ val carbonPartitions = partitions.map { partition =>
+ new PartitionSpec(new util.ArrayList[String](
+ partition.spec.seq.map { case (column, value) => column + "=" + value }.toList.asJava),
+ partition.location)
+ }
+ carbonPartitionsTobeDropped = new util.ArrayList[PartitionSpec](carbonPartitions.asJava)
+ // Drop the partitions from hive.
+ AlterTableDropPartitionCommand(
+ tableName,
+ specs,
+ ifExists,
+ purge,
+ retainData).run(sparkSession)
} catch {
case e: Exception =>
if (!ifExists) {
@@ -77,15 +104,10 @@ case class CarbonAlterTableDropHivePartitionCommand(
log.warn(e.getMessage)
return Seq.empty[Row]
}
+ } finally {
+ AlterTableUtil.releaseLocks(locks)
}
- // Drop the partitions from hive.
- AlterTableDropPartitionCommand(
- tableName,
- specs,
- ifExists,
- purge,
- retainData).run(sparkSession)
}
Seq.empty[Row]
}
@@ -100,7 +122,6 @@ case class CarbonAlterTableDropHivePartitionCommand(
}
override def processData(sparkSession: SparkSession): Seq[Row] = {
- val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
var locks = List.empty[ICarbonLock]
val uniqueId = System.currentTimeMillis().toString
try {
@@ -119,48 +140,27 @@ case class CarbonAlterTableDropHivePartitionCommand(
}.toSet
val segments = new SegmentStatusManager(table.getAbsoluteTableIdentifier)
.getValidAndInvalidSegments.getValidSegments
- try {
- // First drop the partitions from partition mapper files of each segment
- new CarbonDropPartitionRDD(sparkSession.sparkContext,
- table.getTablePath,
- segments.asScala,
- partitionNames.toSeq,
- uniqueId,
- partialMatch = true).collect()
- } catch {
- case e: Exception =>
- // roll back the drop partitions from carbon store
- new CarbonDropPartitionCommitRDD(sparkSession.sparkContext,
- table.getTablePath,
- segments.asScala,
- false,
- uniqueId,
- partitionNames.toSeq).collect()
- throw e
- }
- // commit the drop partitions from carbon store
- new CarbonDropPartitionCommitRDD(sparkSession.sparkContext,
+ // First drop the partitions from partition mapper files of each segment
+ val tuples = new CarbonDropPartitionRDD(sparkSession.sparkContext,
table.getTablePath,
segments.asScala,
- true,
- uniqueId,
- partitionNames.toSeq).collect()
- // Update the loadstatus with update time to clear cache from driver.
- val segmentSet = new util.HashSet[String](new SegmentStatusManager(table
- .getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments)
- CarbonUpdateUtil.updateTableMetadataStatus(
- segmentSet,
- table,
- uniqueId,
- true,
- new util.ArrayList[String])
+ carbonPartitionsTobeDropped,
+ uniqueId).collect()
+ val tobeUpdatedSegs = new util.ArrayList[String]
+ val tobeDeletedSegs = new util.ArrayList[String]
+ tuples.foreach{case (tobeUpdated, tobeDeleted) =>
+ if (tobeUpdated.split(",").length > 0) {
+ tobeUpdatedSegs.add(tobeUpdated.split(",")(0))
+ }
+ if (tobeDeleted.split(",").length > 0) {
+ tobeDeletedSegs.add(tobeDeleted.split(",")(0))
+ }
+ }
+ SegmentFileStore.commitDropPartitions(table, uniqueId, tobeUpdatedSegs, tobeDeletedSegs)
DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier)
} finally {
AlterTableUtil.releaseLocks(locks)
- new PartitionMapFileStore().cleanSegments(
- table,
- new util.ArrayList(CarbonFilters.getPartitions(Seq.empty, sparkSession, tableName).asJava),
- false)
+ SegmentFileStore.cleanSegments(table, null, false)
}
Seq.empty[Row]
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
index 7fe2658..38ac58e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -31,7 +31,7 @@ import org.apache.spark.util.AlterTableUtil
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.cache.CacheProvider
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
@@ -204,7 +204,7 @@ case class CarbonAlterTableDropPartitionCommand(
val validSegments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala
val threadArray: Array[Thread] = new Array[Thread](validSegments.size)
var i = 0
- for (segmentId: String <- validSegments) {
+ for (segmentId: Segment <- validSegments) {
threadArray(i) = dropPartitionThread(sqlContext, carbonLoadModel, executor,
segmentId, partitionId, dropWithData, oldPartitionIds)
threadArray(i).start()
@@ -216,7 +216,7 @@ case class CarbonAlterTableDropPartitionCommand(
val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath,
carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
- refresher.refreshSegments(validSegments.asJava)
+ refresher.refreshSegments(validSegments.map(_.getSegmentNo).asJava)
} catch {
case e: Exception =>
LOGGER.error(s"Exception when dropping partition: ${ e.getMessage }")
@@ -238,7 +238,7 @@ case class CarbonAlterTableDropPartitionCommand(
case class dropPartitionThread(sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
executor: ExecutorService,
- segmentId: String,
+ segmentId: Segment,
partitionId: String,
dropWithData: Boolean,
oldPartitionIds: List[Int]) extends Thread {
@@ -259,7 +259,7 @@ case class dropPartitionThread(sqlContext: SQLContext,
private def executeDroppingPartition(sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
executor: ExecutorService,
- segmentId: String,
+ segmentId: Segment,
partitionId: String,
dropWithData: Boolean,
oldPartitionIds: List[Int]): Unit = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index 020a72c..7aefbbe 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -215,7 +215,7 @@ case class CarbonAlterTableSplitPartitionCommand(
var i = 0
validSegments.foreach { segmentId =>
threadArray(i) = SplitThread(sqlContext, carbonLoadModel, executor,
- segmentId, partitionId, oldPartitionIdList)
+ segmentId.getSegmentNo, partitionId, oldPartitionIdList)
threadArray(i).start()
i += 1
}
@@ -225,7 +225,7 @@ case class CarbonAlterTableSplitPartitionCommand(
val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath,
carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
- refresher.refreshSegments(validSegments.asJava)
+ refresher.refreshSegments(validSegments.map(_.getSegmentNo).asJava)
} catch {
case e: Exception =>
LOGGER.error(s"Exception when split partition: ${ e.getMessage }")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index d2acb00..59c43aa 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.command.datamap.CarbonDropDataMapCommand
import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
+import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -178,7 +179,11 @@ case class CreatePreAggregateTableCommand(
// This will be used to check if the parent table has any segments or not. If not then no
// need to fire load for pre-aggregate table. Therefore reading the load details for PARENT
// table.
- DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, parentTable)
+ DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false,
+ parentTable,
+ CarbonFilters.getCurrentPartitions(sparkSession,
+ TableIdentifier(parentTable.getTableName,
+ Some(parentTable.getDatabaseName))).map(_.asJava).orNull)
val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetaDataFilepath)
if (loadAvailable.exists(load => load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS ||
load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index ed6be97..657e0c5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompac
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable}
@@ -227,11 +228,17 @@ object LoadPostAggregateListener extends OperationEventListener {
operationContext.getProperty(
s"${parentTableDatabase}_${parentTableName}_Segment").toString)
} else {
- (TableIdentifier(table.getTableName, Some(table.getDatabaseName)),
- carbonLoadModel.getSegmentId)
+ val currentSegmentFile = operationContext.getProperty("current.segmentfile")
+ val segment = if (currentSegmentFile != null) {
+ new Segment(carbonLoadModel.getSegmentId, currentSegmentFile.toString)
+ } else {
+ Segment.toSegment(carbonLoadModel.getSegmentId)
+ }
+ (TableIdentifier(table.getTableName, Some(table.getDatabaseName)), segment.toString)
}
+
PreAggregateUtil.startDataLoadForDataMap(
- parentTableIdentifier,
+ parentTableIdentifier,
segmentToLoad,
validateSegments = false,
childLoadCommand,