You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/09/27 06:34:08 UTC
[5/6] carbondata git commit: [CARBONDATA-1151] Refactor all carbon
command to separate file in spark2 integration
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
deleted file mode 100644
index 5820b9d..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
+++ /dev/null
@@ -1,857 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command
-
-import java.util
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum}
-import org.apache.carbondata.core.mutate.data.RowCountDetailsVO
-import org.apache.carbondata.core.statusmanager.{SegmentStatusManager, SegmentUpdateStatusManager}
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
-import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
-import org.apache.carbondata.hadoop.CarbonInputFormat
-import org.apache.carbondata.processing.exception.MultipleMatchingException
-import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CarbonDataMergerUtilResult, CompactionType}
-import org.apache.carbondata.spark.DeleteDelataResultImpl
-import org.apache.carbondata.spark.load.FailureCauses
-import org.apache.carbondata.spark.util.QueryPlanUtil
-
-
-/**
- * IUD update delete and compaction framework.
- *
- */
-
-private[sql] case class ProjectForDeleteCommand(
- plan: LogicalPlan,
- identifier: Seq[String],
- timestamp: String) extends RunnableCommand with DataProcessCommand {
-
- var horizontalCompactionFailed = false
-
- override def run(sparkSession: SparkSession): Seq[Row] = {
- processData(sparkSession)
- }
-
- override def processData(sparkSession: SparkSession): Seq[Row] = {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- val dataFrame = Dataset.ofRows(sparkSession, plan)
- // dataFrame.show(truncate = false)
- // dataFrame.collect().foreach(println)
- val dataRdd = dataFrame.rdd
-
- val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .lookupRelation(deleteExecution.getTableIdentifier(identifier))(sparkSession).
- asInstanceOf[CarbonRelation]
- val carbonTable = relation.tableMeta.carbonTable
- val metadataLock = CarbonLockFactory
- .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
- LockUsage.METADATA_LOCK)
- var lockStatus = false
- try {
- lockStatus = metadataLock.lockWithRetries()
- LOGGER.audit(s" Delete data request has been received " +
- s"for ${ relation.databaseName }.${ relation.tableName }.")
- if (lockStatus) {
- LOGGER.info("Successfully able to get the table metadata file lock")
- }
- else {
- throw new Exception("Table is locked for deletion. Please try after some time")
- }
- val tablePath = CarbonStorePath.getCarbonTablePath(
- carbonTable.getStorePath,
- carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier)
- var executorErrors = new ExecutionErrors(FailureCauses.NONE, "")
-
- // handle the clean up of IUD.
- CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
-
- if (deleteExecution
- .deleteDeltaExecution(identifier, sparkSession, dataRdd, timestamp, relation,
- false, executorErrors)) {
- // call IUD Compaction.
- IUDCommon.tryHorizontalCompaction(sparkSession, relation, isUpdateOperation = false)
- }
- } catch {
- case e: HorizontalCompactionException =>
- LOGGER.error("Delete operation passed. Exception in Horizontal Compaction." +
- " Please check logs. " + e.getMessage)
- CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
-
- case e: Exception =>
- LOGGER.error(e, "Exception in Delete data operation " + e.getMessage)
- // ****** start clean up.
- // In case of failure , clean all related delete delta files
- CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
-
- // clean up. Null check is required as for executor error some times message is null
- if (null != e.getMessage) {
- sys.error("Delete data operation is failed. " + e.getMessage)
- }
- else {
- sys.error("Delete data operation is failed. Please check logs.")
- }
- } finally {
- if (lockStatus) {
- CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK)
- }
- }
- Seq.empty
- }
-}
-
-private[sql] case class ProjectForUpdateCommand(
- plan: LogicalPlan, tableIdentifier: Seq[String]) extends RunnableCommand
- with DataProcessCommand {
-
- override def run(sparkSession: SparkSession): Seq[Row] = {
- processData(sparkSession)
- }
-
- override def processData(sparkSession: SparkSession): Seq[Row] = {
- val LOGGER = LogServiceFactory.getLogService(ProjectForUpdateCommand.getClass.getName)
-
- // sqlContext.sparkContext.setLocalProperty(org.apache.spark.sql.execution.SQLExecution
- // .EXECUTION_ID_KEY, null)
- // DataFrame(sqlContext, plan).show(truncate = false)
- // return Seq.empty
-
-
- val res = plan find {
- case relation: LogicalRelation if relation.relation
- .isInstanceOf[CarbonDatasourceHadoopRelation] =>
- true
- case _ => false
- }
-
- if (res.isEmpty) {
- return Seq.empty
- }
- val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .lookupRelation(deleteExecution.getTableIdentifier(tableIdentifier))(sparkSession).
- asInstanceOf[CarbonRelation]
- // val relation = CarbonEnv.get.carbonMetastore
- // .lookupRelation1(deleteExecution.getTableIdentifier(tableIdentifier))(sqlContext).
- // asInstanceOf[CarbonRelation]
- val carbonTable = relation.tableMeta.carbonTable
- val metadataLock = CarbonLockFactory
- .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
- LockUsage.METADATA_LOCK)
- var lockStatus = false
- // get the current time stamp which should be same for delete and update.
- val currentTime = CarbonUpdateUtil.readCurrentTime
- // var dataFrame: DataFrame = null
- var dataSet: DataFrame = null
- var isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset()
- try {
- lockStatus = metadataLock.lockWithRetries()
- if (lockStatus) {
- logInfo("Successfully able to get the table metadata file lock")
- }
- else {
- throw new Exception("Table is locked for updation. Please try after some time")
- }
- val tablePath = CarbonStorePath.getCarbonTablePath(
- carbonTable.getStorePath,
- carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier)
- // Get RDD.
-
- dataSet = if (isPersistEnabled) {
- Dataset.ofRows(sparkSession, plan).persist(StorageLevel.fromString(
- CarbonProperties.getInstance.getUpdateDatasetStorageLevel()))
- }
- else {
- Dataset.ofRows(sparkSession, plan)
- }
- var executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
-
-
- // handle the clean up of IUD.
- CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
-
- // do delete operation.
- deleteExecution.deleteDeltaExecution(tableIdentifier, sparkSession, dataSet.rdd,
- currentTime + "",
- relation, isUpdateOperation = true, executionErrors)
-
- if(executionErrors.failureCauses != FailureCauses.NONE) {
- throw new Exception(executionErrors.errorMsg)
- }
-
- // do update operation.
- UpdateExecution.performUpdate(dataSet, tableIdentifier, plan,
- sparkSession, currentTime, executionErrors)
-
- if(executionErrors.failureCauses != FailureCauses.NONE) {
- throw new Exception(executionErrors.errorMsg)
- }
-
- // Do IUD Compaction.
- IUDCommon.tryHorizontalCompaction(sparkSession, relation, isUpdateOperation = true)
- } catch {
- case e: HorizontalCompactionException =>
- LOGGER.error(
- "Update operation passed. Exception in Horizontal Compaction. Please check logs." + e)
- // In case of failure , clean all related delta files
- CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
-
- case e: Exception =>
- LOGGER.error("Exception in update operation" + e)
- // ****** start clean up.
- // In case of failure , clean all related delete delta files
- CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, currentTime + "")
-
- // *****end clean up.
- if (null != e.getMessage) {
- sys.error("Update operation failed. " + e.getMessage)
- }
- if (null != e.getCause && null != e.getCause.getMessage) {
- sys.error("Update operation failed. " + e.getCause.getMessage)
- }
- sys.error("Update operation failed. please check logs.")
- }
- finally {
- if (null != dataSet && isPersistEnabled) {
- dataSet.unpersist()
- }
- if (lockStatus) {
- CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK)
- }
- }
- Seq.empty
- }
-}
-
-object IUDCommon {
-
- val LOG = LogServiceFactory.getLogService(this.getClass.getName)
-
- /**
- * The method does horizontal compaction. After Update and Delete completion
- * tryHorizontal compaction will be called. In case this method is called after
- * Update statement then Update Compaction followed by Delete Compaction will be
- * processed whereas for tryHorizontalCompaction called after Delete statement
- * then only Delete Compaction will be processed.
- *
- * @param sparkSession
- * @param carbonRelation
- * @param isUpdateOperation
- */
- def tryHorizontalCompaction(sparkSession: SparkSession,
- carbonRelation: CarbonRelation,
- isUpdateOperation: Boolean): Unit = {
-
- var ishorizontalCompaction = CarbonDataMergerUtil.isHorizontalCompactionEnabled()
-
- if (ishorizontalCompaction == false) {
- return
- }
-
- var compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
- val carbonTable = carbonRelation.tableMeta.carbonTable
- val (db, table) = (carbonTable.getDatabaseName, carbonTable.getFactTableName)
- val absTableIdentifier = carbonTable.getAbsoluteTableIdentifier
- val updateTimeStamp = System.currentTimeMillis()
- // To make sure that update and delete timestamps are not same,
- // required to commit to status metadata and cleanup
- val deleteTimeStamp = updateTimeStamp + 1
-
- // get the valid segments
- var segLists = CarbonDataMergerUtil.getValidSegmentList(absTableIdentifier)
-
- if (segLists == null || segLists.size() == 0) {
- return
- }
-
- // Should avoid reading Table Status file from Disk every time. Better to load it
- // in-memory at the starting and pass it along the routines. The constructor of
- // SegmentUpdateStatusManager reads the Table Status File and Table Update Status
- // file and save the content in segmentDetails and updateDetails respectively.
- val segmentUpdateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
- absTableIdentifier)
-
- if (isUpdateOperation == true) {
-
- // This is only update operation, perform only update compaction.
- compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
- performUpdateDeltaCompaction(sparkSession,
- compactionTypeIUD,
- carbonTable,
- absTableIdentifier,
- segmentUpdateStatusManager,
- updateTimeStamp,
- segLists)
- }
-
- // After Update Compaction perform delete compaction
- compactionTypeIUD = CompactionType.IUD_DELETE_DELTA_COMPACTION
- segLists = CarbonDataMergerUtil.getValidSegmentList(absTableIdentifier)
- if (segLists == null || segLists.size() == 0) {
- return
- }
-
- // Delete Compaction
- performDeleteDeltaCompaction(sparkSession,
- compactionTypeIUD,
- carbonTable,
- absTableIdentifier,
- segmentUpdateStatusManager,
- deleteTimeStamp,
- segLists)
- }
-
- /**
- * Update Delta Horizontal Compaction.
- *
- * @param sparkSession
- * @param compactionTypeIUD
- * @param carbonTable
- * @param absTableIdentifier
- * @param segLists
- */
- private def performUpdateDeltaCompaction(sparkSession: SparkSession,
- compactionTypeIUD: CompactionType,
- carbonTable: CarbonTable,
- absTableIdentifier: AbsoluteTableIdentifier,
- segmentUpdateStatusManager: SegmentUpdateStatusManager,
- factTimeStamp: Long,
- segLists: util.List[String]): Unit = {
- val db = carbonTable.getDatabaseName
- val table = carbonTable.getFactTableName
- // get the valid segments qualified for update compaction.
- val validSegList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists,
- absTableIdentifier,
- segmentUpdateStatusManager,
- compactionTypeIUD)
-
- if (validSegList.size() == 0) {
- return
- }
-
- LOG.info(s"Horizontal Update Compaction operation started for [${db}.${table}].")
- LOG.audit(s"Horizontal Update Compaction operation started for [${db}.${table}].")
-
- try {
- // Update Compaction.
- val altertablemodel = AlterTableModel(Option(carbonTable.getDatabaseName),
- carbonTable.getFactTableName,
- Some(segmentUpdateStatusManager),
- CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString,
- Some(factTimeStamp),
- "")
-
- AlterTableCompaction(altertablemodel).run(sparkSession)
- }
- catch {
- case e: Exception =>
- val msg = if (null != e.getMessage) {
- e.getMessage
- } else {
- "Please check logs for more info"
- }
- throw new HorizontalCompactionException(
- s"Horizontal Update Compaction Failed for [${ db }.${ table }]. " + msg, factTimeStamp)
- }
- LOG.info(s"Horizontal Update Compaction operation completed for [${ db }.${ table }].")
- LOG.audit(s"Horizontal Update Compaction operation completed for [${ db }.${ table }].")
- }
-
- /**
- * Delete Delta Horizontal Compaction.
- *
- * @param sparkSession
- * @param compactionTypeIUD
- * @param carbonTable
- * @param absTableIdentifier
- * @param segLists
- */
- private def performDeleteDeltaCompaction(sparkSession: SparkSession,
- compactionTypeIUD: CompactionType,
- carbonTable: CarbonTable,
- absTableIdentifier: AbsoluteTableIdentifier,
- segmentUpdateStatusManager: SegmentUpdateStatusManager,
- factTimeStamp: Long,
- segLists: util.List[String]): Unit = {
-
- val db = carbonTable.getDatabaseName
- val table = carbonTable.getFactTableName
- val deletedBlocksList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists,
- absTableIdentifier,
- segmentUpdateStatusManager,
- compactionTypeIUD)
-
- if (deletedBlocksList.size() == 0) {
- return
- }
-
- LOG.info(s"Horizontal Delete Compaction operation started for [${db}.${table}].")
- LOG.audit(s"Horizontal Delete Compaction operation started for [${db}.${table}].")
-
- try {
-
- // Delete Compaction RDD
- val rdd1 = sparkSession.sparkContext
- .parallelize(deletedBlocksList.asScala.toSeq, deletedBlocksList.size())
-
- val timestamp = factTimeStamp
- val updateStatusDetails = segmentUpdateStatusManager.getUpdateStatusDetails
- val result = rdd1.mapPartitions(iter =>
- new Iterator[Seq[CarbonDataMergerUtilResult]] {
- override def hasNext: Boolean = iter.hasNext
-
- override def next(): Seq[CarbonDataMergerUtilResult] = {
- val segmentAndBlocks = iter.next
- val segment = segmentAndBlocks.substring(0, segmentAndBlocks.lastIndexOf("/"))
- val blockName = segmentAndBlocks
- .substring(segmentAndBlocks.lastIndexOf("/") + 1, segmentAndBlocks.length)
-
- val result = CarbonDataMergerUtil.compactBlockDeleteDeltaFiles(segment, blockName,
- absTableIdentifier,
- updateStatusDetails,
- timestamp)
-
- result.asScala.toList
-
- }
- }).collect
-
- val resultList = ListBuffer[CarbonDataMergerUtilResult]()
- result.foreach(x => {
- x.foreach(y => {
- resultList += y
- })
- })
-
- val updateStatus = CarbonDataMergerUtil.updateStatusFile(resultList.toList.asJava,
- carbonTable,
- timestamp.toString,
- segmentUpdateStatusManager)
- if (updateStatus == false) {
- LOG.audit(s"Delete Compaction data operation is failed for [${db}.${table}].")
- LOG.error("Delete Compaction data operation is failed.")
- throw new HorizontalCompactionException(
- s"Horizontal Delete Compaction Failed for [${db}.${table}] ." +
- s" Please check logs for more info.", factTimeStamp)
- }
- else {
- LOG.info(s"Horizontal Delete Compaction operation completed for [${db}.${table}].")
- LOG.audit(s"Horizontal Delete Compaction operation completed for [${db}.${table}].")
- }
- }
- catch {
- case e: Exception =>
- val msg = if (null != e.getMessage) {
- e.getMessage
- } else {
- "Please check logs for more info"
- }
- throw new HorizontalCompactionException(
- s"Horizontal Delete Compaction Failed for [${ db }.${ table }]. " + msg, factTimeStamp)
- }
- }
-}
-
-class HorizontalCompactionException(
- message: String,
- // required for cleanup
- val compactionTimeStamp: Long) extends RuntimeException(message) {
-}
-
-object deleteExecution {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-
- def getTableIdentifier(tableIdentifier: Seq[String]): TableIdentifier = {
- if (tableIdentifier.size > 1) {
- TableIdentifier(tableIdentifier(1), Some(tableIdentifier(0)))
- } else {
- TableIdentifier(tableIdentifier(0), None)
- }
- }
-
- def deleteDeltaExecution(identifier: Seq[String],
- sparkSession: SparkSession,
- dataRdd: RDD[Row],
- timestamp: String, relation: CarbonRelation, isUpdateOperation: Boolean,
- executorErrors: ExecutionErrors): Boolean = {
-
- var res: Array[List[(String, (SegmentUpdateDetails, ExecutionErrors))]] = null
- val tableName = getTableIdentifier(identifier).table
- val database = getDB.getDatabaseName(getTableIdentifier(identifier).database, sparkSession)
- val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .lookupRelation(deleteExecution.getTableIdentifier(identifier))(sparkSession).
- asInstanceOf[CarbonRelation]
-
- val storeLocation = relation.tableMeta.storePath
- val absoluteTableIdentifier: AbsoluteTableIdentifier = new
- AbsoluteTableIdentifier(storeLocation,
- relation.tableMeta.carbonTableIdentifier)
- var tablePath = CarbonStorePath
- .getCarbonTablePath(storeLocation,
- absoluteTableIdentifier.getCarbonTableIdentifier())
- var tableUpdateStatusPath = tablePath.getTableUpdateStatusFilePath
- val totalSegments =
- SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath).length
- var factPath = tablePath.getFactDir
-
- var carbonTable = relation.tableMeta.carbonTable
- var deleteStatus = true
- val deleteRdd = if (isUpdateOperation) {
- val schema =
- org.apache.spark.sql.types.StructType(Seq(org.apache.spark.sql.types.StructField(
- CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
- org.apache.spark.sql.types.StringType)))
- val rdd = dataRdd
- .map(row => Row(row.get(row.fieldIndex(
- CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))))
- sparkSession.createDataFrame(rdd, schema).rdd
- // sqlContext.createDataFrame(rdd, schema).rdd
- } else {
- dataRdd
- }
-
- val (carbonInputFormat, job) =
- QueryPlanUtil.createCarbonInputFormat(absoluteTableIdentifier)
- CarbonInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo)
- val keyRdd = deleteRdd.map({ row =>
- val tupleId: String = row
- .getString(row.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
- val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId)
- (key, row)
- }).groupByKey()
-
- // if no loads are present then no need to do anything.
- if (keyRdd.partitions.size == 0) {
- return true
- }
-
- var blockMappingVO = carbonInputFormat.getBlockRowCount(job, absoluteTableIdentifier)
- val segmentUpdateStatusMngr = new SegmentUpdateStatusManager(absoluteTableIdentifier)
- CarbonUpdateUtil
- .createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr)
-
- val rowContRdd =
- sparkSession.sparkContext.parallelize(
- blockMappingVO.getCompleteBlockRowDetailVO.asScala.toSeq,
- keyRdd.partitions.length)
-
-// val rowContRdd = sqlContext.sparkContext
-// .parallelize(blockMappingVO.getCompleteBlockRowDetailVO.asScala.toSeq,
-// keyRdd.partitions.size)
-
- val rdd = rowContRdd.join(keyRdd)
-
- // rdd.collect().foreach(println)
-
- res = rdd.mapPartitionsWithIndex(
- (index: Int, records: Iterator[((String), (RowCountDetailsVO, Iterable[Row]))]) =>
- Iterator[List[(String, (SegmentUpdateDetails, ExecutionErrors))]] {
-
- var result = List[(String, (SegmentUpdateDetails, ExecutionErrors))]()
- while (records.hasNext) {
- val ((key), (rowCountDetailsVO, groupedRows)) = records.next
- result = result ++
- deleteDeltaFunc(index,
- key,
- groupedRows.toIterator,
- timestamp,
- rowCountDetailsVO)
-
- }
- result
- }
- ).collect()
-
- // if no loads are present then no need to do anything.
- if (res.isEmpty) {
- return true
- }
-
- // update new status file
- checkAndUpdateStatusFiles
-
- // all or none : update status file, only if complete delete opeartion is successfull.
- def checkAndUpdateStatusFiles: Unit = {
- val blockUpdateDetailsList = new util.ArrayList[SegmentUpdateDetails]()
- val segmentDetails = new util.HashSet[String]()
- res.foreach(resultOfSeg => resultOfSeg.foreach(
- resultOfBlock => {
- if (resultOfBlock._1.equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)) {
- blockUpdateDetailsList.add(resultOfBlock._2._1)
- segmentDetails.add(resultOfBlock._2._1.getSegmentName)
- // if this block is invalid then decrement block count in map.
- if (CarbonUpdateUtil.isBlockInvalid(resultOfBlock._2._1.getStatus)) {
- CarbonUpdateUtil.decrementDeletedBlockCount(resultOfBlock._2._1,
- blockMappingVO.getSegmentNumberOfBlockMapping)
- }
- }
- else {
- deleteStatus = false
- // In case of failure , clean all related delete delta files
- CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
- LOGGER.audit(s"Delete data operation is failed for ${ database }.${ tableName }")
- val errorMsg =
- "Delete data operation is failed due to failure in creating delete delta file for " +
- "segment : " + resultOfBlock._2._1.getSegmentName + " block : " +
- resultOfBlock._2._1.getBlockName
- executorErrors.failureCauses = resultOfBlock._2._2.failureCauses
- executorErrors.errorMsg = resultOfBlock._2._2.errorMsg
-
- if (executorErrors.failureCauses == FailureCauses.NONE) {
- executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
- executorErrors.errorMsg = errorMsg
- }
- LOGGER.error(errorMsg)
- return
- }
- }
- )
- )
-
- val listOfSegmentToBeMarkedDeleted = CarbonUpdateUtil
- .getListOfSegmentsToMarkDeleted(blockMappingVO.getSegmentNumberOfBlockMapping)
-
-
-
- // this is delete flow so no need of putting timestamp in the status file.
- if (CarbonUpdateUtil
- .updateSegmentStatus(blockUpdateDetailsList, carbonTable, timestamp, false) &&
- CarbonUpdateUtil
- .updateTableMetadataStatus(segmentDetails,
- carbonTable,
- timestamp,
- !isUpdateOperation,
- listOfSegmentToBeMarkedDeleted)
- ) {
- LOGGER.info(s"Delete data operation is successful for ${ database }.${ tableName }")
- LOGGER.audit(s"Delete data operation is successful for ${ database }.${ tableName }")
- }
- else {
- // In case of failure , clean all related delete delta files
- CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
-
- val errorMessage = "Delete data operation is failed due to failure " +
- "in table status updation."
- LOGGER.audit(s"Delete data operation is failed for ${ database }.${ tableName }")
- LOGGER.error("Delete data operation is failed due to failure in table status updation.")
- executorErrors.failureCauses = FailureCauses.STATUS_FILE_UPDATION_FAILURE
- executorErrors.errorMsg = errorMessage
- // throw new Exception(errorMessage)
- }
- }
-
- def deleteDeltaFunc(index: Int,
- key: String,
- iter: Iterator[Row],
- timestamp: String,
- rowCountDetailsVO: RowCountDetailsVO):
- Iterator[(String, (SegmentUpdateDetails, ExecutionErrors))] = {
-
- val result = new DeleteDelataResultImpl()
- var deleteStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- // here key = segment/blockName
- val blockName = CarbonUpdateUtil
- .getBlockName(
- CarbonTablePath.addDataPartPrefix(key.split(CarbonCommonConstants.FILE_SEPARATOR)(1)))
- val segmentId = key.split(CarbonCommonConstants.FILE_SEPARATOR)(0)
- var deleteDeltaBlockDetails: DeleteDeltaBlockDetails = new DeleteDeltaBlockDetails(blockName)
- val resultIter = new Iterator[(String, (SegmentUpdateDetails, ExecutionErrors))] {
- val segmentUpdateDetails = new SegmentUpdateDetails()
- var TID = ""
- var countOfRows = 0
- try {
- while (iter.hasNext) {
- val oneRow = iter.next
- TID = oneRow
- .get(oneRow.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).toString
- val offset = CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.OFFSET)
- val blockletId = CarbonUpdateUtil
- .getRequiredFieldFromTID(TID, TupleIdEnum.BLOCKLET_ID)
- val pageId = Integer.parseInt(CarbonUpdateUtil
- .getRequiredFieldFromTID(TID, TupleIdEnum.PAGE_ID))
- val IsValidOffset = deleteDeltaBlockDetails.addBlocklet(blockletId, offset, pageId)
- // stop delete operation
- if(!IsValidOffset) {
- executorErrors.failureCauses = FailureCauses.MULTIPLE_INPUT_ROWS_MATCHING
- executorErrors.errorMsg = "Multiple input rows matched for same row."
- throw new MultipleMatchingException("Multiple input rows matched for same row.")
- }
- countOfRows = countOfRows + 1
- }
-
- val blockPath = CarbonUpdateUtil.getTableBlockPath(TID, factPath)
- val completeBlockName = CarbonTablePath
- .addDataPartPrefix(CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.BLOCK_ID) +
- CarbonCommonConstants.FACT_FILE_EXT)
- val deleteDeletaPath = CarbonUpdateUtil
- .getDeleteDeltaFilePath(blockPath, blockName, timestamp)
- val carbonDeleteWriter = new CarbonDeleteDeltaWriterImpl(deleteDeletaPath,
- FileFactory.getFileType(deleteDeletaPath))
-
-
-
- segmentUpdateDetails.setBlockName(blockName)
- segmentUpdateDetails.setActualBlockName(completeBlockName)
- segmentUpdateDetails.setSegmentName(segmentId)
- segmentUpdateDetails.setDeleteDeltaEndTimestamp(timestamp)
- segmentUpdateDetails.setDeleteDeltaStartTimestamp(timestamp)
-
- val alreadyDeletedRows: Long = rowCountDetailsVO.getDeletedRowsInBlock
- val totalDeletedRows: Long = alreadyDeletedRows + countOfRows
- segmentUpdateDetails.setDeletedRowsInBlock(totalDeletedRows.toString)
- if (totalDeletedRows == rowCountDetailsVO.getTotalNumberOfRows) {
- segmentUpdateDetails.setStatus(CarbonCommonConstants.MARKED_FOR_DELETE)
- }
- else {
- // write the delta file
- carbonDeleteWriter.write(deleteDeltaBlockDetails)
- }
-
- deleteStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
- } catch {
- case e : MultipleMatchingException =>
- LOGGER.audit(e.getMessage)
- LOGGER.error(e.getMessage)
- // dont throw exception here.
- case e: Exception =>
- val errorMsg = s"Delete data operation is failed for ${ database }.${ tableName }."
- LOGGER.audit(errorMsg)
- LOGGER.error(errorMsg + e.getMessage)
- throw e
- }
-
-
- var finished = false
-
- override def hasNext: Boolean = {
- if (!finished) {
- finished = true
- finished
- }
- else {
- !finished
- }
- }
-
- override def next(): (String, (SegmentUpdateDetails, ExecutionErrors)) = {
- finished = true
- result.getKey(deleteStatus, (segmentUpdateDetails, executorErrors))
- }
- }
- resultIter
- }
- true
- }
-}
-
-
-
-object UpdateExecution {
-
- def performUpdate(
- dataFrame: Dataset[Row],
- tableIdentifier: Seq[String],
- plan: LogicalPlan,
- sparkSession: SparkSession,
- currentTime: Long,
- executorErrors: ExecutionErrors): Unit = {
-
- def isDestinationRelation(relation: CarbonDatasourceHadoopRelation): Boolean = {
-
- val tableName = relation.identifier.getCarbonTableIdentifier.getTableName
- val dbName = relation.identifier.getCarbonTableIdentifier.getDatabaseName
- (tableIdentifier.size > 1 &&
- tableIdentifier(0) == dbName &&
- tableIdentifier(1) == tableName) ||
- (tableIdentifier(0) == tableName)
- }
- def getHeader(relation: CarbonDatasourceHadoopRelation, plan: LogicalPlan): String = {
- var header = ""
- var found = false
-
- plan match {
- case Project(pList, _) if (!found) =>
- found = true
- header = pList
- .filter(field => !field.name
- .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
- .map(col => if (col.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION)) {
- col.name
- .substring(0, col.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION))
- }
- else {
- col.name
- }).mkString(",")
- }
- header
- }
- val ex = dataFrame.queryExecution.analyzed
- val res = ex find {
- case relation: LogicalRelation
- if relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
- isDestinationRelation(relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]) =>
- true
- case _ => false
- }
- val carbonRelation: CarbonDatasourceHadoopRelation = res match {
- case Some(relation: LogicalRelation) =>
- relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
- case _ => sys.error("")
- }
-
- val updateTableModel = UpdateTableModel(true, currentTime, executorErrors)
-
- val header = getHeader(carbonRelation, plan)
-
- LoadTable(
- Some(carbonRelation.identifier.getCarbonTableIdentifier.getDatabaseName),
- carbonRelation.identifier.getCarbonTableIdentifier.getTableName,
- null,
- Seq(),
- Map(("fileheader" -> header)),
- false,
- null,
- Some(dataFrame),
- Some(updateTableModel)).run(sparkSession)
-
- executorErrors.errorMsg = updateTableModel.executorErrors.errorMsg
- executorErrors.failureCauses = updateTableModel.executorErrors.failureCauses
-
- Seq.empty
-
- }
-
-}