You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/10/31 07:00:03 UTC
[05/22] carbondata git commit: [CARBONDATA-1597] Remove spark1
integration
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
deleted file mode 100644
index cb35960..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
+++ /dev/null
@@ -1,842 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command
-
-import java.util
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
-import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum}
-import org.apache.carbondata.core.mutate.data.RowCountDetailsVO
-import org.apache.carbondata.core.statusmanager.{SegmentStatusManager, SegmentUpdateStatusManager}
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
-import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
-import org.apache.carbondata.processing.exception.MultipleMatchingException
-import org.apache.carbondata.processing.loading.FailureCauses
-import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CarbonDataMergerUtilResult, CompactionType}
-import org.apache.carbondata.spark.DeleteDelataResultImpl
-import org.apache.carbondata.spark.util.QueryPlanUtil
-
-
-/**
- * IUD update delete and compaction framework.
- *
- */
-
-private[sql] case class ProjectForDeleteCommand(
- plan: LogicalPlan,
- identifier: Seq[String],
- timestamp: String) extends RunnableCommand {
-
- val LOG = LogServiceFactory.getLogService(this.getClass.getName)
- var horizontalCompactionFailed = false
-
- override def run(sqlContext: SQLContext): Seq[Row] = {
-
- val dataFrame = DataFrame(sqlContext, plan)
- val dataRdd = dataFrame.rdd
-
- val relation = CarbonEnv.get.carbonMetastore
- .lookupRelation1(deleteExecution.getTableIdentifier(identifier))(sqlContext).
- asInstanceOf[CarbonRelation]
- val carbonTable = relation.tableMeta.carbonTable
- val metadataLock = CarbonLockFactory
- .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
- LockUsage.METADATA_LOCK)
- var lockStatus = false
- try {
- lockStatus = metadataLock.lockWithRetries()
- LOG.audit(s" Delete data request has been received " +
- s"for ${ relation.databaseName }.${ relation.tableName }.")
- if (lockStatus) {
- LOG.info("Successfully able to get the table metadata file lock")
- }
- else {
- throw new Exception("Table is locked for deletion. Please try after some time")
- }
- val tablePath = CarbonStorePath.getCarbonTablePath(
- carbonTable.getStorePath,
- carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier)
- var executorErrors = new ExecutionErrors(FailureCauses.NONE, "")
-
- // handle the clean up of IUD.
- CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
-
- if (deleteExecution
- .deleteDeltaExecution(identifier, sqlContext, dataRdd, timestamp, relation,
- false, executorErrors)) {
- // call IUD Compaction.
- IUDCommon.tryHorizontalCompaction(sqlContext, relation, isUpdateOperation = false)
- }
- } catch {
- case e: HorizontalCompactionException =>
- LOG.error("Delete operation passed. Exception in Horizontal Compaction." +
- " Please check logs. " + e.getMessage)
- CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
-
- case e: Exception =>
- LOG.error("Exception in Delete data operation " + e.getMessage)
- // ****** start clean up.
- // In case of failure , clean all related delete delta files
- CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
-
- // clean up. Null check is required as for executor error some times message is null
- if (null != e.getMessage) {
- sys.error("Delete data operation is failed. " + e.getMessage)
- }
- else {
- sys.error("Delete data operation is failed. Please check logs.")
- }
- } finally {
- if (lockStatus) {
- CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK)
- }
- }
- Seq.empty
- }
-}
-
-private[sql] case class ProjectForUpdateCommand(
- plan: LogicalPlan, tableIdentifier: Seq[String]) extends RunnableCommand {
- val LOGGER = LogServiceFactory.getLogService(ProjectForUpdateCommand.getClass.getName)
-
- override def run(sqlContext: SQLContext): Seq[Row] = {
-
- val res = plan find {
- case relation: LogicalRelation if (relation.relation
- .isInstanceOf[CarbonDatasourceRelation]) =>
- true
- case _ => false
- }
-
- if (!res.isDefined) {
- return Seq.empty
- }
-
- val relation = CarbonEnv.get.carbonMetastore
- .lookupRelation1(deleteExecution.getTableIdentifier(tableIdentifier))(sqlContext).
- asInstanceOf[CarbonRelation]
- val carbonTable = relation.tableMeta.carbonTable
- val metadataLock = CarbonLockFactory
- .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
- LockUsage.METADATA_LOCK)
- var lockStatus = false
- // get the current time stamp which should be same for delete and update.
- val currentTime = CarbonUpdateUtil.readCurrentTime
- var dataFrame: DataFrame = null
- val isPersistEnabledUserValue = CarbonProperties.getInstance
- .getProperty(CarbonCommonConstants.isPersistEnabled,
- CarbonCommonConstants.defaultValueIsPersistEnabled)
- var isPersistEnabled = CarbonCommonConstants.defaultValueIsPersistEnabled.toBoolean
- if (isPersistEnabledUserValue.equalsIgnoreCase("false")) {
- isPersistEnabled = false
- }
- else if (isPersistEnabledUserValue.equalsIgnoreCase("true")) {
- isPersistEnabled = true
- }
- try {
- lockStatus = metadataLock.lockWithRetries()
- if (lockStatus) {
- logInfo("Successfully able to get the table metadata file lock")
- }
- else {
- throw new Exception("Table is locked for updation. Please try after some time")
- }
- val tablePath = CarbonStorePath.getCarbonTablePath(
- carbonTable.getStorePath,
- carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier)
- // Get RDD.
- dataFrame = if (isPersistEnabled) {
- DataFrame(sqlContext, plan)
- .persist(StorageLevel.MEMORY_AND_DISK)
- }
- else {
- DataFrame(sqlContext, plan)
- }
- var executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
-
-
- // handle the clean up of IUD.
- CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
-
- // do delete operation.
- deleteExecution.deleteDeltaExecution(tableIdentifier, sqlContext, dataFrame.rdd,
- currentTime + "",
- relation, isUpdateOperation = true, executionErrors)
-
- if(executionErrors.failureCauses != FailureCauses.NONE) {
- throw new Exception(executionErrors.errorMsg)
- }
-
- // do update operation.
- UpdateExecution.performUpdate(dataFrame, tableIdentifier, plan,
- sqlContext, currentTime, executionErrors)
-
- if(executionErrors.failureCauses != FailureCauses.NONE) {
- throw new Exception(executionErrors.errorMsg)
- }
-
- // Do IUD Compaction.
- IUDCommon.tryHorizontalCompaction(sqlContext, relation, isUpdateOperation = true)
- }
-
- catch {
- case e: HorizontalCompactionException =>
- LOGGER.error(
- "Update operation passed. Exception in Horizontal Compaction. Please check logs." + e)
- // In case of failure , clean all related delta files
- CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
-
- case e: Exception =>
- LOGGER.error("Exception in update operation" + e)
- // ****** start clean up.
- // In case of failure , clean all related delete delta files
- CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, currentTime + "")
-
- // *****end clean up.
- if (null != e.getMessage) {
- sys.error("Update operation failed. " + e.getMessage)
- }
- if (null != e.getCause && null != e.getCause.getMessage) {
- sys.error("Update operation failed. " + e.getCause.getMessage)
- }
- sys.error("Update operation failed. please check logs.")
- }
- finally {
- if (null != dataFrame && isPersistEnabled) {
- dataFrame.unpersist()
- }
- if (lockStatus) {
- CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK)
- }
- }
- Seq.empty
- }
-}
-
-object IUDCommon {
-
- val LOG = LogServiceFactory.getLogService(this.getClass.getName)
-
- /**
- * The method does horizontal compaction. After Update and Delete completion
- * tryHorizontal compaction will be called. In case this method is called after
- * Update statement then Update Compaction followed by Delete Compaction will be
- * processed whereas for tryHorizontalCompaction called after Delete statement
- * then only Delete Compaction will be processed.
- *
- * @param sqlContext
- * @param carbonRelation
- * @param isUpdateOperation
- */
- def tryHorizontalCompaction(sqlContext: SQLContext,
- carbonRelation: CarbonRelation,
- isUpdateOperation: Boolean): Unit = {
-
- var ishorizontalCompaction = CarbonDataMergerUtil.isHorizontalCompactionEnabled()
-
- if (ishorizontalCompaction == false) {
- return
- }
-
- var compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
- val carbonTable = carbonRelation.tableMeta.carbonTable
- val (db, table) = (carbonTable.getDatabaseName, carbonTable.getFactTableName)
- val absTableIdentifier = carbonTable.getAbsoluteTableIdentifier
- val updateTimeStamp = System.currentTimeMillis()
- // To make sure that update and delete timestamps are not same,
- // required to commit to status metadata and cleanup
- val deleteTimeStamp = updateTimeStamp + 1
-
- // get the valid segments
- var segLists = CarbonDataMergerUtil.getValidSegmentList(absTableIdentifier)
-
- if (segLists == null || segLists.size() == 0) {
- return
- }
-
- // Should avoid reading Table Status file from Disk every time. Better to load it
- // in-memory at the starting and pass it along the routines. The constructor of
- // SegmentUpdateStatusManager reads the Table Status File and Table Update Status
- // file and save the content in segmentDetails and updateDetails respectively.
- val segmentUpdateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
- absTableIdentifier)
-
- if (isUpdateOperation == true) {
-
- // This is only update operation, perform only update compaction.
- compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
- performUpdateDeltaCompaction(sqlContext,
- compactionTypeIUD,
- carbonTable,
- absTableIdentifier,
- segmentUpdateStatusManager,
- updateTimeStamp,
- segLists)
- }
-
- // After Update Compaction perform delete compaction
- compactionTypeIUD = CompactionType.IUD_DELETE_DELTA_COMPACTION
- segLists = CarbonDataMergerUtil.getValidSegmentList(absTableIdentifier)
- if (segLists == null || segLists.size() == 0) {
- return
- }
-
- // Delete Compaction
- performDeleteDeltaCompaction(sqlContext,
- compactionTypeIUD,
- carbonTable,
- absTableIdentifier,
- segmentUpdateStatusManager,
- deleteTimeStamp,
- segLists)
- }
-
- /**
- * Update Delta Horizontal Compaction.
- *
- * @param sqlContext
- * @param compactionTypeIUD
- * @param carbonTable
- * @param absTableIdentifier
- * @param segLists
- */
- private def performUpdateDeltaCompaction(sqlContext: SQLContext,
- compactionTypeIUD: CompactionType,
- carbonTable: CarbonTable,
- absTableIdentifier: AbsoluteTableIdentifier,
- segmentUpdateStatusManager: SegmentUpdateStatusManager,
- factTimeStamp: Long,
- segLists: util.List[String]): Unit = {
- val db = carbonTable.getDatabaseName
- val table = carbonTable.getFactTableName
- // get the valid segments qualified for update compaction.
- val validSegList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists,
- absTableIdentifier,
- segmentUpdateStatusManager,
- compactionTypeIUD)
-
- if (validSegList.size() == 0) {
- return
- }
-
- LOG.info(s"Horizontal Update Compaction operation started for [${db}.${table}].")
- LOG.audit(s"Horizontal Update Compaction operation started for [${db}.${table}].")
-
- try {
- // Update Compaction.
- val altertablemodel = AlterTableModel(Option(carbonTable.getDatabaseName),
- carbonTable.getFactTableName,
- Some(segmentUpdateStatusManager),
- CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString,
- Some(factTimeStamp),
- "")
-
- AlterTableCompaction(altertablemodel).run(sqlContext)
- }
- catch {
- case e: Exception =>
- val msg = if (null != e.getMessage) {
- e.getMessage
- } else {
- "Please check logs for more info"
- }
- throw new HorizontalCompactionException(
- s"Horizontal Update Compaction Failed for [${ db }.${ table }]. " + msg, factTimeStamp)
- }
- LOG.info(s"Horizontal Update Compaction operation completed for [${ db }.${ table }].")
- LOG.audit(s"Horizontal Update Compaction operation completed for [${ db }.${ table }].")
- }
-
- /**
- * Delete Delta Horizontal Compaction.
- *
- * @param sqlContext
- * @param compactionTypeIUD
- * @param carbonTable
- * @param absTableIdentifier
- * @param segLists
- */
- private def performDeleteDeltaCompaction(sqlContext: SQLContext,
- compactionTypeIUD: CompactionType,
- carbonTable: CarbonTable,
- absTableIdentifier: AbsoluteTableIdentifier,
- segmentUpdateStatusManager: SegmentUpdateStatusManager,
- factTimeStamp: Long,
- segLists: util.List[String]): Unit = {
-
- val db = carbonTable.getDatabaseName
- val table = carbonTable.getFactTableName
- val deletedBlocksList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists,
- absTableIdentifier,
- segmentUpdateStatusManager,
- compactionTypeIUD)
-
- if (deletedBlocksList.size() == 0) {
- return
- }
-
- LOG.info(s"Horizontal Delete Compaction operation started for [${db}.${table}].")
- LOG.audit(s"Horizontal Delete Compaction operation started for [${db}.${table}].")
-
- try {
-
- // Delete Compaction RDD
- val rdd1 = sqlContext.sparkContext
- .parallelize(deletedBlocksList.asScala.toSeq, deletedBlocksList.size())
-
- val timestamp = factTimeStamp
- val updateStatusDetails = segmentUpdateStatusManager.getUpdateStatusDetails
- val result = rdd1.mapPartitions(iter =>
- new Iterator[Seq[CarbonDataMergerUtilResult]] {
- override def hasNext: Boolean = iter.hasNext
-
- override def next(): Seq[CarbonDataMergerUtilResult] = {
- val segmentAndBlocks = iter.next
- val segment = segmentAndBlocks.substring(0, segmentAndBlocks.lastIndexOf("/"))
- val blockName = segmentAndBlocks
- .substring(segmentAndBlocks.lastIndexOf("/") + 1, segmentAndBlocks.length)
-
- val result = CarbonDataMergerUtil.compactBlockDeleteDeltaFiles(segment, blockName,
- absTableIdentifier,
- updateStatusDetails,
- timestamp)
-
- result.asScala.toList
-
- }
- }).collect
-
- val resultList = ListBuffer[CarbonDataMergerUtilResult]()
- result.foreach(x => {
- x.foreach(y => {
- resultList += y
- })
- })
-
- val updateStatus = CarbonDataMergerUtil.updateStatusFile(resultList.toList.asJava,
- carbonTable,
- timestamp.toString,
- segmentUpdateStatusManager)
- if (updateStatus == false) {
- LOG.audit(s"Delete Compaction data operation is failed for [${db}.${table}].")
- LOG.error("Delete Compaction data operation is failed.")
- throw new HorizontalCompactionException(
- s"Horizontal Delete Compaction Failed for [${db}.${table}] ." +
- s" Please check logs for more info.", factTimeStamp)
- }
- else {
- LOG.info(s"Horizontal Delete Compaction operation completed for [${db}.${table}].")
- LOG.audit(s"Horizontal Delete Compaction operation completed for [${db}.${table}].")
- }
- }
- catch {
- case e: Exception =>
- val msg = if (null != e.getMessage) {
- e.getMessage
- } else {
- "Please check logs for more info"
- }
- throw new HorizontalCompactionException(
- s"Horizontal Delete Compaction Failed for [${ db }.${ table }]. " + msg, factTimeStamp)
- }
- }
-}
-
-class HorizontalCompactionException(
- message: String,
- // required for cleanup
- val compactionTimeStamp: Long) extends RuntimeException(message) {
-}
-
-object deleteExecution {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-
- def getTableIdentifier(tableIdentifier: Seq[String]): TableIdentifier = {
- if (tableIdentifier.size > 1) {
- TableIdentifier(tableIdentifier(1), Some(tableIdentifier(0)))
- } else {
- TableIdentifier(tableIdentifier(0), None)
- }
- }
-
- def deleteDeltaExecution(identifier: Seq[String],
- sqlContext: SQLContext,
- dataRdd: RDD[Row],
- timestamp: String, relation: CarbonRelation, isUpdateOperation: Boolean,
- executorErrors: ExecutionErrors): Boolean = {
-
- var res: Array[List[(String, (SegmentUpdateDetails, ExecutionErrors))]] = null
- val tableName = getTableIdentifier(identifier).table
- val database = getDB.getDatabaseName(getTableIdentifier(identifier).database, sqlContext)
- val relation = CarbonEnv.get.carbonMetastore
- .lookupRelation1(getTableIdentifier(identifier))(sqlContext).
- asInstanceOf[CarbonRelation]
-
- val storeLocation = relation.tableMeta.storePath
- val absoluteTableIdentifier: AbsoluteTableIdentifier = new
- AbsoluteTableIdentifier(storeLocation,
- relation.tableMeta.carbonTableIdentifier)
- var tablePath = CarbonStorePath
- .getCarbonTablePath(storeLocation,
- absoluteTableIdentifier.getCarbonTableIdentifier())
- var tableUpdateStatusPath = tablePath.getTableUpdateStatusFilePath
- val totalSegments =
- SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath).length
- var factPath = tablePath.getFactDir
-
- var carbonTable = relation.tableMeta.carbonTable
- var deleteStatus = true
- val deleteRdd = if (isUpdateOperation) {
- val schema =
- org.apache.spark.sql.types.StructType(Seq(org.apache.spark.sql.types.StructField(
- CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
- org.apache.spark.sql.types.StringType)))
- val rdd = dataRdd
- .map(row => Row(row.get(row.fieldIndex(
- CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))))
- sqlContext.createDataFrame(rdd, schema).rdd
- } else {
- dataRdd
- }
-
- val (carbonInputFormat, job) =
- QueryPlanUtil.createCarbonInputFormat(absoluteTableIdentifier)
-
- val keyRdd = deleteRdd.map({ row =>
- val tupleId: String = row
- .getString(row.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
- val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId)
- (key, row)
- }).groupByKey()
-
- // if no loads are present then no need to do anything.
- if (keyRdd.partitions.size == 0) {
- return true
- }
-
- var blockMappingVO = carbonInputFormat.getBlockRowCount(job, absoluteTableIdentifier)
- val segmentUpdateStatusMngr = new SegmentUpdateStatusManager(absoluteTableIdentifier)
- CarbonUpdateUtil
- .createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr)
-
- val rowContRdd = sqlContext.sparkContext
- .parallelize(blockMappingVO.getCompleteBlockRowDetailVO.asScala.toSeq,
- keyRdd.partitions.size)
-
- val rdd = rowContRdd.join(keyRdd)
-
- res = rdd.mapPartitionsWithIndex(
- (index: Int, records: Iterator[((String), (RowCountDetailsVO, Iterable[Row]))]) =>
- Iterator[List[(String, (SegmentUpdateDetails, ExecutionErrors))]] {
-
- var result = List[(String, (SegmentUpdateDetails, ExecutionErrors))]()
- while (records.hasNext) {
- val ((key), (rowCountDetailsVO, groupedRows)) = records.next
- result = result ++
- deleteDeltaFunc(index,
- key,
- groupedRows.toIterator,
- timestamp,
- rowCountDetailsVO)
-
- }
- result
- }
- ).collect()
-
- // if no loads are present then no need to do anything.
- if (res.isEmpty) {
- return true
- }
-
- // update new status file
- checkAndUpdateStatusFiles
-
- // all or none : update status file, only if complete delete opeartion is successfull.
- def checkAndUpdateStatusFiles: Unit = {
- val blockUpdateDetailsList = new util.ArrayList[SegmentUpdateDetails]()
- val segmentDetails = new util.HashSet[String]()
- res.foreach(resultOfSeg => resultOfSeg.foreach(
- resultOfBlock => {
- if (resultOfBlock._1.equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)) {
- blockUpdateDetailsList.add(resultOfBlock._2._1)
- segmentDetails.add(resultOfBlock._2._1.getSegmentName)
- // if this block is invalid then decrement block count in map.
- if (CarbonUpdateUtil.isBlockInvalid(resultOfBlock._2._1.getStatus)) {
- CarbonUpdateUtil.decrementDeletedBlockCount(resultOfBlock._2._1,
- blockMappingVO.getSegmentNumberOfBlockMapping)
- }
- }
- else {
- deleteStatus = false
- // In case of failure , clean all related delete delta files
- CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
- LOGGER.audit(s"Delete data operation is failed for ${ database }.${ tableName }")
- val errorMsg =
- "Delete data operation is failed due to failure in creating delete delta file for " +
- "segment : " + resultOfBlock._2._1.getSegmentName + " block : " +
- resultOfBlock._2._1.getBlockName
- executorErrors.failureCauses = resultOfBlock._2._2.failureCauses
- executorErrors.errorMsg = resultOfBlock._2._2.errorMsg
-
- if (executorErrors.failureCauses == FailureCauses.NONE) {
- executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
- executorErrors.errorMsg = errorMsg
- }
- LOGGER.error(errorMsg)
- return
- }
- }
- )
- )
-
- val listOfSegmentToBeMarkedDeleted = CarbonUpdateUtil
- .getListOfSegmentsToMarkDeleted(blockMappingVO.getSegmentNumberOfBlockMapping)
-
-
-
- // this is delete flow so no need of putting timestamp in the status file.
- if (CarbonUpdateUtil
- .updateSegmentStatus(blockUpdateDetailsList, carbonTable, timestamp, false) &&
- CarbonUpdateUtil
- .updateTableMetadataStatus(segmentDetails,
- carbonTable,
- timestamp,
- !isUpdateOperation,
- listOfSegmentToBeMarkedDeleted)
- ) {
- LOGGER.info(s"Delete data operation is successful for ${ database }.${ tableName }")
- LOGGER.audit(s"Delete data operation is successful for ${ database }.${ tableName }")
- }
- else {
- // In case of failure , clean all related delete delta files
- CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
-
- val errorMessage = "Delete data operation is failed due to failure " +
- "in table status updation."
- LOGGER.audit(s"Delete data operation is failed for ${ database }.${ tableName }")
- LOGGER.error("Delete data operation is failed due to failure in table status updation.")
- executorErrors.failureCauses = FailureCauses.STATUS_FILE_UPDATION_FAILURE
- executorErrors.errorMsg = errorMessage
- // throw new Exception(errorMessage)
- }
- }
-
- def deleteDeltaFunc(index: Int,
- key: String,
- iter: Iterator[Row],
- timestamp: String,
- rowCountDetailsVO: RowCountDetailsVO):
- Iterator[(String, (SegmentUpdateDetails, ExecutionErrors))] = {
-
- val result = new DeleteDelataResultImpl()
- var deleteStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- // here key = segment/blockName
- val blockName = CarbonUpdateUtil
- .getBlockName(
- CarbonTablePath.addDataPartPrefix(key.split(CarbonCommonConstants.FILE_SEPARATOR)(1)))
- val segmentId = key.split(CarbonCommonConstants.FILE_SEPARATOR)(0)
- var deleteDeltaBlockDetails: DeleteDeltaBlockDetails = new DeleteDeltaBlockDetails(blockName)
- val resultIter = new Iterator[(String, (SegmentUpdateDetails, ExecutionErrors))] {
- val segmentUpdateDetails = new SegmentUpdateDetails()
- var TID = ""
- var countOfRows = 0
- try {
- while (iter.hasNext) {
- val oneRow = iter.next
- TID = oneRow
- .get(oneRow.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).toString
- val offset = CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.OFFSET)
- val blockletId = CarbonUpdateUtil
- .getRequiredFieldFromTID(TID, TupleIdEnum.BLOCKLET_ID)
- val pageId = Integer.parseInt(CarbonUpdateUtil
- .getRequiredFieldFromTID(TID, TupleIdEnum.PAGE_ID))
- val IsValidOffset = deleteDeltaBlockDetails.addBlocklet(blockletId, offset, pageId)
- // stop delete operation
- if(!IsValidOffset) {
- executorErrors.failureCauses = FailureCauses.MULTIPLE_INPUT_ROWS_MATCHING
- executorErrors.errorMsg = "Multiple input rows matched for same row."
- throw new MultipleMatchingException("Multiple input rows matched for same row.")
- }
- countOfRows = countOfRows + 1
- }
-
- val blockPath = CarbonUpdateUtil.getTableBlockPath(TID, factPath)
- val completeBlockName = CarbonTablePath
- .addDataPartPrefix(CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.BLOCK_ID) +
- CarbonCommonConstants.FACT_FILE_EXT)
- val deleteDeletaPath = CarbonUpdateUtil
- .getDeleteDeltaFilePath(blockPath, blockName, timestamp)
- val carbonDeleteWriter = new CarbonDeleteDeltaWriterImpl(deleteDeletaPath,
- FileFactory.getFileType(deleteDeletaPath))
-
-
-
- segmentUpdateDetails.setBlockName(blockName)
- segmentUpdateDetails.setActualBlockName(completeBlockName)
- segmentUpdateDetails.setSegmentName(segmentId)
- segmentUpdateDetails.setDeleteDeltaEndTimestamp(timestamp)
- segmentUpdateDetails.setDeleteDeltaStartTimestamp(timestamp)
-
- val alreadyDeletedRows: Long = rowCountDetailsVO.getDeletedRowsInBlock
- val totalDeletedRows: Long = alreadyDeletedRows + countOfRows
- segmentUpdateDetails.setDeletedRowsInBlock(totalDeletedRows.toString)
- if (totalDeletedRows == rowCountDetailsVO.getTotalNumberOfRows) {
- segmentUpdateDetails.setStatus(CarbonCommonConstants.MARKED_FOR_DELETE)
- }
- else {
- // write the delta file
- carbonDeleteWriter.write(deleteDeltaBlockDetails)
- }
-
- deleteStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
- } catch {
- case e : MultipleMatchingException =>
- LOGGER.audit(e.getMessage)
- LOGGER.error(e.getMessage)
- // dont throw exception here.
- case e: Exception =>
- val errorMsg = s"Delete data operation is failed for ${ database }.${ tableName }."
- LOGGER.audit(errorMsg)
- LOGGER.error(errorMsg + e.getMessage)
- throw e
- }
-
-
- var finished = false
-
- override def hasNext: Boolean = {
- if (!finished) {
- finished = true
- finished
- }
- else {
- !finished
- }
- }
-
- override def next(): (String, (SegmentUpdateDetails, ExecutionErrors)) = {
- finished = true
- result.getKey(deleteStatus, (segmentUpdateDetails, executorErrors))
- }
- }
- resultIter
- }
- true
- }
-}
-
-
-
-object UpdateExecution {
-
- def performUpdate(
- dataFrame: DataFrame,
- tableIdentifier: Seq[String],
- plan: LogicalPlan,
- sqlContext: SQLContext,
- currentTime: Long,
- executorErrors: ExecutionErrors): Unit = {
-
- def isDestinationRelation(relation: CarbonDatasourceRelation): Boolean = {
-
- val tableName = relation.getTable()
- val dbName = relation.getDatabaseName()
- (tableIdentifier.size > 1 &&
- tableIdentifier(0) == dbName &&
- tableIdentifier(1) == tableName) ||
- (tableIdentifier(0) == tableName)
- }
- def getHeader(relation: CarbonDatasourceRelation, plan: LogicalPlan): String = {
- var header = ""
- var found = false
-
- plan match {
- case Project(pList, _) if (!found) =>
- found = true
- header = pList
- .filter(field => !field.name
- .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
- .map(col => if (col.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION)) {
- col.name
- .substring(0, col.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION))
- }
- else {
- col.name
- }).mkString(",")
- }
- header
- }
- val ex = dataFrame.queryExecution.analyzed
- val res = ex find {
- case relation: LogicalRelation if (relation.relation.isInstanceOf[CarbonDatasourceRelation] &&
- isDestinationRelation(relation.relation
- .asInstanceOf[CarbonDatasourceRelation])) =>
- true
- case _ => false
- }
- val carbonRelation: CarbonDatasourceRelation = res match {
- case Some(relation: LogicalRelation) =>
- relation.relation.asInstanceOf[CarbonDatasourceRelation]
- case _ => sys.error("")
- }
-
- val updateTableModel = UpdateTableModel(true, currentTime, executorErrors)
-
- val header = getHeader(carbonRelation, plan)
-
-
-
- LoadTable(
- Some(carbonRelation.getDatabaseName()),
- carbonRelation.getTable(),
- null,
- Seq(),
- Map(("fileheader" -> header)),
- false,
- null,
- Some(dataFrame),
- Some(updateTableModel)).run(sqlContext)
-
-
- executorErrors.errorMsg = updateTableModel.executorErrors.errorMsg
- executorErrors.failureCauses = updateTableModel.executorErrors.failureCauses
-
- Seq.empty
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
deleted file mode 100644
index 9814cc2..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ /dev/null
@@ -1,1019 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command
-
-import java.io.File
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
-import scala.language.implicitConversions
-
-import org.apache.commons.lang3.StringUtils
-import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.{RunnableCommand, SparkPlan}
-import org.apache.spark.sql.hive.CarbonMetastore
-import org.apache.spark.sql.types.TimestampType
-import org.apache.spark.util.{CausedBy, FileUtils}
-import org.codehaus.jackson.map.ObjectMapper
-
-import org.apache.carbondata.api.CarbonStore
-import org.apache.carbondata.common.constants.LoggerAction
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.dictionary.server.DictionaryServer
-import org.apache.carbondata.core.exception.InvalidConfigurationException
-import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
-import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.TableInfo
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
-import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.exception.DataLoadingException
-import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
-import org.apache.carbondata.processing.loading.exception.NoRetryException
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.processing.util.TableOptionConstant
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.load.ValidateUtil
-import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc, DictionaryLoadModel}
-import org.apache.carbondata.spark.util.{CommonUtil, GlobalDictionaryUtil}
-
-object Checker {
- def validateTableExists(
- dbName: Option[String],
- tableName: String,
- sqlContext: SQLContext): Unit = {
- val identifier = TableIdentifier(tableName, dbName)
- if (!CarbonEnv.get.carbonMetastore.tableExists(identifier)(sqlContext)) {
- val err = s"table $dbName.$tableName not found"
- LogServiceFactory.getLogService(this.getClass.getName).error(err)
- throw new IllegalArgumentException(err)
- }
- }
-}
-
-/**
- * Command for show table partitions Command
- *
- * @param tableIdentifier
- */
-private[sql] case class ShowCarbonPartitionsCommand(
- tableIdentifier: TableIdentifier) extends RunnableCommand {
- val LOGGER = LogServiceFactory.getLogService(ShowCarbonPartitionsCommand.getClass.getName)
- override val output = CommonUtil.partitionInfoOutput
- override def run(sqlContext: SQLContext): Seq[Row] = {
- val relation = CarbonEnv.get.carbonMetastore
- .lookupRelation1(tableIdentifier)(sqlContext).
- asInstanceOf[CarbonRelation]
- val carbonTable = relation.tableMeta.carbonTable
- var tableName = carbonTable.getFactTableName
- var partitionInfo = carbonTable.getPartitionInfo(
- carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
- if (partitionInfo == null) {
- throw new AnalysisException(
- s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableName")
- }
- var partitionType = partitionInfo.getPartitionType
- var columnName = partitionInfo.getColumnSchemaList.get(0).getColumnName
- LOGGER.info("partition column name:" + columnName)
- CommonUtil.getPartitionInfo(columnName, partitionType, partitionInfo)
- }
-}
-
-/**
- * Command for the compaction in alter table command
- *
- * @param alterTableModel
- */
-private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel) extends
- RunnableCommand {
-
- def run(sqlContext: SQLContext): Seq[Row] = {
- // TODO : Implement it.
- val tableName = alterTableModel.tableName
- val databaseName = getDB.getDatabaseName(alterTableModel.dbName, sqlContext)
- if (null == CarbonMetadata.getInstance.getCarbonTable(databaseName + "_" + tableName)) {
- logError(s"alter table failed. table not found: $databaseName.$tableName")
- sys.error(s"alter table failed. table not found: $databaseName.$tableName")
- }
-
- val relation =
- CarbonEnv.get.carbonMetastore
- .lookupRelation1(Option(databaseName), tableName)(sqlContext)
- .asInstanceOf[CarbonRelation]
- if (relation == null) {
- sys.error(s"Table $databaseName.$tableName does not exist")
- }
- val carbonLoadModel = new CarbonLoadModel()
-
-
- val table = relation.tableMeta.carbonTable
- carbonLoadModel.setTableName(table.getFactTableName)
- val dataLoadSchema = new CarbonDataLoadSchema(table)
- // Need to fill dimension relation
- carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
- carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
- carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
- carbonLoadModel.setStorePath(relation.tableMeta.storePath)
-
- var storeLocation = CarbonProperties.getInstance
- .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
- System.getProperty("java.io.tmpdir")
- )
- storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
- try {
- CarbonDataRDDFactory.alterTableForCompaction(sqlContext,
- alterTableModel,
- carbonLoadModel,
- relation.tableMeta.storePath,
- storeLocation
- )
- } catch {
- case e: Exception =>
- if (null != e.getMessage) {
- sys.error(s"Compaction failed. Please check logs for more info. ${ e.getMessage }")
- } else {
- sys.error("Exception in compaction. Please check logs for more info.")
- }
- }
- Seq.empty
- }
-}
-
-case class CreateTable(cm: TableModel) extends RunnableCommand {
-
- def run(sqlContext: SQLContext): Seq[Row] = {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- cm.databaseName = getDB.getDatabaseName(cm.databaseNameOp, sqlContext)
- val tbName = cm.tableName
- val dbName = cm.databaseName
- LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]")
-
- val tableInfo: TableInfo = TableNewProcessor(cm)
-
- // Add validation for sort scope when create table
- val sortScope = tableInfo.getFactTable.getTableProperties
- .getOrDefault("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
- if (!CarbonUtil.isValidSortOption(sortScope)) {
- throw new InvalidConfigurationException(s"Passing invalid SORT_SCOPE '$sortScope'," +
- s" valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT', 'LOCAL_SORT' and 'GLOBAL_SORT' ")
- }
-
- if (tableInfo.getFactTable.getListOfColumns.isEmpty) {
- sys.error("No Dimensions found. Table should have at least one dimesnion !")
- }
-
- if (sqlContext.tableNames(dbName).exists(_.equalsIgnoreCase(tbName))) {
- if (!cm.ifNotExistsSet) {
- LOGGER.audit(
- s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " +
- s"Table [$tbName] already exists under database [$dbName]")
- sys.error(s"Table [$tbName] already exists under database [$dbName]")
- }
- } else {
- // Add Database to catalog and persist
- val catalog = CarbonEnv.get.carbonMetastore
- // Need to fill partitioner class when we support partition
- val tablePath = catalog.createTableFromThrift(tableInfo, dbName, tbName, null)(sqlContext)
- try {
- sqlContext.sql(
- s"""CREATE TABLE $dbName.$tbName USING carbondata""" +
- s""" OPTIONS (tableName "$dbName.$tbName", tablePath "$tablePath") """)
- .collect
- } catch {
- case e: Exception =>
- val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
- // call the drop table to delete the created table.
-
- CarbonEnv.get.carbonMetastore
- .dropTable(catalog.storePath, identifier)(sqlContext)
-
- LOGGER.audit(s"Table creation with Database name [$dbName] " +
- s"and Table name [$tbName] failed")
- throw e
- }
-
- LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tbName]")
- }
-
- Seq.empty
- }
-
- def setV(ref: Any, name: String, value: Any): Unit = {
- ref.getClass.getFields.find(_.getName == name).get
- .set(ref, value.asInstanceOf[AnyRef])
- }
-}
-
-private[sql] case class DeleteLoadsById(
- loadids: Seq[String],
- databaseNameOp: Option[String],
- tableName: String) extends RunnableCommand {
-
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- def run(sqlContext: SQLContext): Seq[Row] = {
- Checker.validateTableExists(databaseNameOp, tableName, sqlContext)
- val carbonTable = CarbonEnv.get.carbonMetastore.lookupRelation1(databaseNameOp,
- tableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
- CarbonStore.deleteLoadById(
- loadids,
- getDB.getDatabaseName(databaseNameOp, sqlContext),
- tableName,
- carbonTable
- )
- Seq.empty
-
- }
-
- // validates load ids
- private def validateLoadIds: Unit = {
- if (loadids.isEmpty) {
- val errorMessage = "Error: Segment id(s) should not be empty."
- throw new MalformedCarbonCommandException(errorMessage)
-
- }
- }
-}
-
-private[sql] case class DeleteLoadsByLoadDate(
- databaseNameOp: Option[String],
- tableName: String,
- dateField: String,
- loadDate: String) extends RunnableCommand {
-
- val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.tablemodel.tableSchema")
-
- def run(sqlContext: SQLContext): Seq[Row] = {
- Checker.validateTableExists(databaseNameOp, tableName, sqlContext)
- val carbonTable = CarbonEnv.get.carbonMetastore.lookupRelation1(databaseNameOp,
- tableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
- CarbonStore.deleteLoadByDate(
- loadDate,
- getDB.getDatabaseName(databaseNameOp, sqlContext),
- tableName,
- carbonTable
- )
- Seq.empty
-
- }
-
-}
-
-object LoadTable {
-
- def updateTableMetadata(carbonLoadModel: CarbonLoadModel,
- sqlContext: SQLContext,
- model: DictionaryLoadModel,
- noDictDimension: Array[CarbonDimension]): Unit = {
-
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation,
- model.table)
- val schemaFilePath = carbonTablePath.getSchemaFilePath
-
- // read TableInfo
- val tableInfo = CarbonMetastore.readSchemaFileToThriftTable(schemaFilePath)
-
- // modify TableInfo
- val columns = tableInfo.getFact_table.getTable_columns
- for (i <- 0 until columns.size) {
- if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId))) {
- columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY)
- }
- }
-
- // write TableInfo
- CarbonMetastore.writeThriftTableToSchemaFile(schemaFilePath, tableInfo)
-
- // update Metadata
- val catalog = CarbonEnv.get.carbonMetastore
- catalog.updateMetadataByThriftTable(schemaFilePath, tableInfo,
- model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath)
-
- // update CarbonDataLoadSchema
- val carbonTable = catalog.lookupRelation1(Option(model.table.getDatabaseName),
- model.table.getTableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
- carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
- }
-
-}
-
-private[sql] case class LoadTableByInsert(relation: CarbonDatasourceRelation,
- child: LogicalPlan, isOverwriteExist: Boolean) extends RunnableCommand {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- def run(sqlContext: SQLContext): Seq[Row] = {
- val df = new DataFrame(sqlContext, child)
- val header = relation.carbonRelation.output.map(_.name).mkString(",")
- val load = LoadTable(
- Some(relation.carbonRelation.databaseName),
- relation.carbonRelation.tableName,
- null,
- Seq(),
- scala.collection.immutable.Map("fileheader" -> header),
- isOverwriteExist,
- null,
- Some(df)).run(sqlContext)
- // updating relation metadata. This is in case of auto detect high cardinality
- relation.carbonRelation.metaData =
- CarbonSparkUtil.createSparkMeta(relation.carbonRelation.tableMeta.carbonTable)
- load
- }
-}
-case class LoadTable(
- databaseNameOp: Option[String],
- tableName: String,
- factPathFromUser: String,
- dimFilesPath: Seq[DataLoadTableFileMapping],
- options: scala.collection.immutable.Map[String, String],
- isOverwriteExist: Boolean,
- var inputSqlString: String = null,
- dataFrame: Option[DataFrame] = None,
- updateModel: Option[UpdateTableModel] = None) extends RunnableCommand {
-
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- private def checkDefaultValue(value: String, default: String) = if (StringUtils.isEmpty(value)) {
- default
- } else {
- value
- }
-
- def run(sqlContext: SQLContext): Seq[Row] = {
- if (dataFrame.isDefined && !updateModel.isDefined) {
- val rdd = dataFrame.get.rdd
- if (rdd.partitions == null || rdd.partitions.length == 0) {
- LOGGER.warn("DataLoading finished. No data was loaded.")
- return Seq.empty
- }
- }
-
- val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
- if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)) {
- logError(s"Data loading failed. table not found: $dbName.$tableName")
- LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
- sys.error(s"Data loading failed. table not found: $dbName.$tableName")
- }
-
- val relation = CarbonEnv.get.carbonMetastore
- .lookupRelation1(Option(dbName), tableName)(sqlContext)
- .asInstanceOf[CarbonRelation]
- if (relation == null) {
- sys.error(s"Table $dbName.$tableName does not exist")
- }
- CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false")
- val carbonLock = CarbonLockFactory
- .getCarbonLockObj(relation.tableMeta.carbonTable.getAbsoluteTableIdentifier
- .getCarbonTableIdentifier,
- LockUsage.METADATA_LOCK
- )
- try {
- // take lock only in case of normal data load.
- if (!updateModel.isDefined) {
- if (carbonLock.lockWithRetries()) {
- logInfo("Successfully able to get the table metadata file lock")
- } else {
- sys.error("Table is locked for updation. Please try after some time")
- }
- }
-
- val factPath = if (dataFrame.isDefined) {
- ""
- } else {
- FileUtils.getPaths(
- CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser))
- }
- val carbonLoadModel = new CarbonLoadModel()
- carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
- carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
- carbonLoadModel.setStorePath(relation.tableMeta.storePath)
-
- val table = relation.tableMeta.carbonTable
- carbonLoadModel.setTableName(table.getFactTableName)
- val dataLoadSchema = new CarbonDataLoadSchema(table)
- // Need to fill dimension relation
- carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
-
- val partitionLocation = relation.tableMeta.storePath + "/partition/" +
- relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
- relation.tableMeta.carbonTableIdentifier.getTableName + "/"
-
-
- val columnar = sqlContext.getConf("carbon.is.columnar.storage", "true").toBoolean
-
- val delimiter = options.getOrElse("delimiter", ",")
- val quoteChar = options.getOrElse("quotechar", "\"")
- var fileHeader = options.getOrElse("fileheader", "")
- val escapeChar = options.getOrElse("escapechar", "\\")
- val commentchar = options.getOrElse("commentchar", "#")
- val columnDict = options.getOrElse("columndict", null)
- val serializationNullFormat = options.getOrElse("serialization_null_format", "\\N")
- val badRecordsLoggerEnable = options.getOrElse("bad_records_logger_enable", "false")
- val badRecordActionValue = CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
- CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
- val badRecordsAction = options.getOrElse("bad_records_action", badRecordActionValue)
- val isEmptyDataBadRecord = options.getOrElse("is_empty_data_bad_record", "false")
- val allDictionaryPath = options.getOrElse("all_dictionary_path", "")
- val complex_delimiter_level_1 = options.getOrElse("complex_delimiter_level_1", "\\$")
- val complex_delimiter_level_2 = options.getOrElse("complex_delimiter_level_2", "\\:")
- val dateFormat = options.getOrElse("dateformat", null)
- ValidateUtil.validateDateFormat(dateFormat, table, tableName)
- val maxColumns = options.getOrElse("maxcolumns", null)
- val tableProperties = table.getTableInfo.getFactTable.getTableProperties
- val sortScopeDefault = CarbonProperties.getInstance().
- getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
- CarbonProperties.getInstance().getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
- CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
- val sortScope = if (null == tableProperties) {
- sortScopeDefault
- } else {
- tableProperties.getOrDefault("sort_scope", sortScopeDefault)
- }
-
- ValidateUtil.validateSortScope(table, sortScope)
- val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
- val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb", carbonProperty
- .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
- carbonProperty.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
- CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)))
- val globalSortPartitions = options.getOrElse("global_sort_partitions", carbonProperty
- .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, null))
- ValidateUtil.validateGlobalSortPartitions(globalSortPartitions)
-
- // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option,
- // we should use table schema to generate file header.
- val headerOption = options.get("header")
- if (headerOption.isDefined) {
- // whether the csv file has file header
- // the default value is true
- val header = try {
- headerOption.get.toBoolean
- } catch {
- case ex: IllegalArgumentException =>
- throw new MalformedCarbonCommandException(
- "'header' option should be either 'true' or 'false'. " + ex.getMessage)
- }
- header match {
- case true =>
- if (fileHeader.nonEmpty) {
- throw new MalformedCarbonCommandException(
- "When 'header' option is true, 'fileheader' option is not required.")
- }
- case false =>
- // generate file header
- if (fileHeader.isEmpty) {
- fileHeader = table.getCreateOrderColumn(table.getFactTableName)
- .asScala.map(_.getColName).mkString(",")
- }
- }
- }
-
- val bad_record_path = options.getOrElse("bad_record_path",
- CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
- CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
- if (badRecordsLoggerEnable.toBoolean ||
- LoggerAction.REDIRECT.name().equalsIgnoreCase(badRecordsAction)) {
- if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
- sys.error("Invalid bad records location.")
- }
- }
- carbonLoadModel.setBadRecordsLocation(bad_record_path)
- carbonLoadModel.setEscapeChar(checkDefaultValue(escapeChar, "\\"))
- carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\""))
- carbonLoadModel.setCommentChar(checkDefaultValue(commentchar, "#"))
- carbonLoadModel.setDateFormat(dateFormat)
- carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
- CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
- carbonLoadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.CARBON_DATE_FORMAT,
- CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
- carbonLoadModel
- .setSerializationNullFormat(
- TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," + serializationNullFormat)
- carbonLoadModel
- .setBadRecordsLoggerEnable(
- TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + badRecordsLoggerEnable)
- carbonLoadModel
- .setBadRecordsAction(
- TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + badRecordsAction)
- carbonLoadModel
- .setIsEmptyDataBadRecord(
- DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + isEmptyDataBadRecord)
- carbonLoadModel.setSortScope(sortScope)
- carbonLoadModel.setBatchSortSizeInMb(batchSortSizeInMB)
- carbonLoadModel.setGlobalSortPartitions(globalSortPartitions)
- // when single_pass=true, and not use all dict
- val useOnePass = options.getOrElse("single_pass", "false").trim.toLowerCase match {
- case "true" =>
- true
- case "false" =>
- // when single_pass = false and if either alldictionary
- // or columnDict is configured the do not allow load
- if (StringUtils.isNotEmpty(allDictionaryPath) || StringUtils.isNotEmpty(columnDict)) {
- throw new MalformedCarbonCommandException(
- "Can not use all_dictionary_path or columndict without single_pass.")
- } else {
- false
- }
- case illegal =>
- LOGGER.error(s"Can't use single_pass, because illegal syntax found: [" + illegal + "] " +
- "Please set it as 'true' or 'false'")
- false
- }
- carbonLoadModel.setUseOnePass(useOnePass)
-
- if (delimiter.equalsIgnoreCase(complex_delimiter_level_1) ||
- complex_delimiter_level_1.equalsIgnoreCase(complex_delimiter_level_2) ||
- delimiter.equalsIgnoreCase(complex_delimiter_level_2)) {
- sys.error(s"Field Delimiter & Complex types delimiter are same")
- }
- else {
- carbonLoadModel.setComplexDelimiterLevel1(
- CarbonUtil.delimiterConverter(complex_delimiter_level_1))
- carbonLoadModel.setComplexDelimiterLevel2(
- CarbonUtil.delimiterConverter(complex_delimiter_level_2))
- }
- // set local dictionary path, and dictionary file extension
- carbonLoadModel.setAllDictPath(allDictionaryPath)
-
- val partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
-
- try {
- // First system has to partition the data first and then call the load data
- LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
- carbonLoadModel.setFactFilePath(factPath)
- carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimiter))
- carbonLoadModel.setCsvHeader(fileHeader)
- carbonLoadModel.setColDictFilePath(columnDict)
- carbonLoadModel.setDirectLoad(true)
- carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
- val validatedMaxColumns = CommonUtil.validateMaxColumns(carbonLoadModel.getCsvHeaderColumns,
- maxColumns)
- carbonLoadModel.setMaxColumns(validatedMaxColumns.toString)
- GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata
- val storePath = relation.tableMeta.storePath
- val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
- .getCarbonTableIdentifier
- val carbonTablePath = CarbonStorePath
- .getCarbonTablePath(storePath, carbonTableIdentifier)
- val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
- val dimensions = carbonTable.getDimensionByTableName(
- carbonTable.getFactTableName).asScala.toArray
- // add the start entry for the new load in the table status file
- if (!updateModel.isDefined) {
- CommonUtil.
- readAndUpdateLoadProgressInTableMeta(carbonLoadModel, storePath, isOverwriteExist)
- }
- if (isOverwriteExist) {
- LOGGER.info(s"Overwrite is in progress for carbon table with $dbName.$tableName")
- }
- if (null == carbonLoadModel.getLoadMetadataDetails) {
- CommonUtil.readLoadMetadataDetails(carbonLoadModel)
- }
- if (carbonLoadModel.getLoadMetadataDetails.isEmpty && carbonLoadModel.getUseOnePass &&
- StringUtils.isEmpty(columnDict) && StringUtils.isEmpty(allDictionaryPath)) {
- LOGGER.info(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
- LOGGER.audit(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
- carbonLoadModel.setUseOnePass(false)
- }
- if (carbonLoadModel.getUseOnePass) {
- val colDictFilePath = carbonLoadModel.getColDictFilePath
- if (!StringUtils.isEmpty(colDictFilePath)) {
- carbonLoadModel.initPredefDictMap()
- // generate predefined dictionary
- GlobalDictionaryUtil
- .generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier,
- dimensions, carbonLoadModel, sqlContext, storePath, dictFolderPath)
- }
- val allDictPath: String = carbonLoadModel.getAllDictPath
- if(!StringUtils.isEmpty(allDictPath)) {
- carbonLoadModel.initPredefDictMap()
- GlobalDictionaryUtil
- .generateDictionaryFromDictionaryFiles(sqlContext,
- carbonLoadModel,
- storePath,
- carbonTableIdentifier,
- dictFolderPath,
- dimensions,
- allDictionaryPath)
- }
- // dictionaryServerClient dictionary generator
- val dictionaryServerPort = CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
- CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
- val sparkDriverHost = sqlContext.sparkContext.getConf.get("spark.driver.host")
- carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
- // start dictionary server when use one pass load and dimension with DICTIONARY
- // encoding is present.
- val allDimensions = table.getAllDimensions.asScala.toList
- val createDictionary = allDimensions.exists {
- carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
- !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
- }
- val server: Option[DictionaryServer] = if (createDictionary) {
- val dictionaryServer = DictionaryServer
- .getInstance(dictionaryServerPort.toInt, carbonTable)
- carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
- sqlContext.sparkContext.addSparkListener(new SparkListener() {
- override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
- dictionaryServer.shutdown()
- }
- })
- Some(dictionaryServer)
- } else {
- None
- }
- CarbonDataRDDFactory.loadCarbonData(sqlContext,
- carbonLoadModel,
- relation.tableMeta.storePath,
- columnar,
- partitionStatus,
- server,
- isOverwriteExist,
- dataFrame,
- updateModel)
- } else {
- val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
- val fields = dataFrame.get.schema.fields
- import org.apache.spark.sql.functions.udf
- // extracting only segment from tupleId
- val getSegIdUDF = udf((tupleId: String) =>
- CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
- // getting all fields except tupleId field as it is not required in the value
- var otherFields = fields.toSeq
- .filter(field => !field.name
- .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
- .map(field => {
- if (field.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION) && false) {
- new Column(field.name
- .substring(0,
- field.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION)))
- } else {
-
- new Column(field.name)
- }
- })
-
- // extract tupleId field which will be used as a key
- val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
- .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).
- as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID)
- // use dataFrameWithoutTupleId as dictionaryDataFrame
- val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
- otherFields = otherFields :+ segIdColumn
- // use dataFrameWithTupleId as loadDataFrame
- val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
- (Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
- } else {
- (dataFrame, dataFrame)
- }
- GlobalDictionaryUtil
- .generateGlobalDictionary(sqlContext, carbonLoadModel, relation.tableMeta.storePath,
- dictionaryDataFrame)
- CarbonDataRDDFactory.loadCarbonData(sqlContext,
- carbonLoadModel,
- relation.tableMeta.storePath,
- columnar,
- partitionStatus,
- None,
- isOverwriteExist,
- loadDataFrame,
- updateModel)
- }
- } catch {
- case CausedBy(ex: NoRetryException) =>
- LOGGER.error(ex, s"Dataload failure for $dbName.$tableName")
- throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
- case ex: Exception =>
- LOGGER.error(ex)
- LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs")
- throw ex
- } finally {
- // Once the data load is successful delete the unwanted partition files
- try {
- val fileType = FileFactory.getFileType(partitionLocation)
- if (FileFactory.isFileExist(partitionLocation, fileType)) {
- val file = FileFactory
- .getCarbonFile(partitionLocation, fileType)
- CarbonUtil.deleteFoldersAndFiles(file)
- }
- } catch {
- case ex: Exception =>
- LOGGER.error(ex)
- LOGGER.audit(s"Dataload failure for $dbName.$tableName. " +
- "Problem deleting the partition folder")
- throw ex
- }
-
- }
- } catch {
- case dle: DataLoadingException =>
- LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + dle.getMessage)
- throw dle
- case mce: MalformedCarbonCommandException =>
- LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + mce.getMessage)
- throw mce
- } finally {
- if (carbonLock != null) {
- if (carbonLock.unlock()) {
- logInfo("Table MetaData Unlocked Successfully after data load")
- } else {
- logError("Unable to unlock Table MetaData")
- }
- }
- }
- Seq.empty
- }
-
- private def updateTableMetadata(carbonLoadModel: CarbonLoadModel,
- sqlContext: SQLContext,
- model: DictionaryLoadModel,
- noDictDimension: Array[CarbonDimension]): Unit = {
-
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation,
- model.table)
- val schemaFilePath = carbonTablePath.getSchemaFilePath
-
- // read TableInfo
- val tableInfo = CarbonMetastore.readSchemaFileToThriftTable(schemaFilePath)
-
- // modify TableInfo
- val columns = tableInfo.getFact_table.getTable_columns
- for (i <- 0 until columns.size) {
- if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId))) {
- columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY)
- }
- }
-
- // write TableInfo
- CarbonMetastore.writeThriftTableToSchemaFile(schemaFilePath, tableInfo)
-
-
- val catalog = CarbonEnv.get.carbonMetastore
-
- // upate the schema modified time
- catalog.updateSchemasUpdatedTime(catalog.touchSchemaFileSystemTime(
- carbonLoadModel.getDatabaseName,
- carbonLoadModel.getTableName))
-
- // update Metadata
- catalog.updateMetadataByThriftTable(schemaFilePath, tableInfo,
- model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath)
-
- // update CarbonDataLoadSchema
- val carbonTable = catalog.lookupRelation1(Option(model.table.getDatabaseName),
- model.table.getTableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
- carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
- }
-
-}
-
-private[sql] case class DropTableCommand(ifExistsSet: Boolean, databaseNameOp: Option[String],
- tableName: String)
- extends RunnableCommand {
-
- def run(sqlContext: SQLContext): Seq[Row] = {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
- val identifier = TableIdentifier(tableName, Option(dbName))
- val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
- val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK)
- val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
- val catalog = CarbonEnv.get.carbonMetastore
- val storePath = catalog.storePath
- try {
- locksToBeAcquired foreach {
- lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTableIdentifier, lock)
- }
- LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
- CarbonEnv.get.carbonMetastore.dropTable(storePath, identifier)(sqlContext)
- LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
- } catch {
- case ex: Exception =>
- LOGGER.error(ex, s"Dropping table $dbName.$tableName failed")
- sys.error(s"Dropping table $dbName.$tableName failed: ${ex.getMessage}")
- } finally {
- if (carbonLocks.nonEmpty) {
- val unlocked = carbonLocks.forall(_.unlock())
- if (unlocked) {
- logInfo("Table MetaData Unlocked Successfully")
- // deleting any remaining files.
- val metadataFilePath = CarbonStorePath
- .getCarbonTablePath(storePath, carbonTableIdentifier).getMetadataDirectoryPath
- val fileType = FileFactory.getFileType(metadataFilePath)
- if (FileFactory.isFileExist(metadataFilePath, fileType)) {
- val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
- CarbonUtil.deleteFoldersAndFiles(file.getParentFile)
- }
- }
- }
- }
- Seq.empty
- }
-}
-
-private[sql] case class ShowLoads(
- databaseNameOp: Option[String],
- tableName: String,
- limit: Option[String],
- override val output: Seq[Attribute]) extends RunnableCommand {
-
- override def run(sqlContext: SQLContext): Seq[Row] = {
- Checker.validateTableExists(databaseNameOp, tableName, sqlContext)
- val carbonTable = CarbonEnv.get.carbonMetastore.lookupRelation1(databaseNameOp,
- tableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
- CarbonStore.showSegments(
- getDB.getDatabaseName(databaseNameOp, sqlContext),
- tableName,
- limit,
- carbonTable.getMetaDataFilepath
- )
- }
-}
-
-private[sql] case class DescribeCommandFormatted(
- child: SparkPlan,
- override val output: Seq[Attribute],
- tblIdentifier: TableIdentifier)
- extends RunnableCommand {
-
- override def run(sqlContext: SQLContext): Seq[Row] = {
- val relation = CarbonEnv.get.carbonMetastore
- .lookupRelation1(tblIdentifier)(sqlContext).asInstanceOf[CarbonRelation]
- val mapper = new ObjectMapper()
- val colProps = StringBuilder.newBuilder
- var results: Seq[(String, String, String)] = child.schema.fields.map { field =>
- val comment = if (relation.metaData.dims.contains(field.name)) {
- val dimension = relation.metaData.carbonTable.getDimensionByName(
- relation.tableMeta.carbonTableIdentifier.getTableName,
- field.name)
- if (null != dimension.getColumnProperties && dimension.getColumnProperties.size() > 0) {
- colProps.append(field.name).append(".")
- .append(mapper.writeValueAsString(dimension.getColumnProperties))
- .append(",")
- }
- if (dimension.hasEncoding(Encoding.DICTIONARY)) {
- "DICTIONARY, KEY COLUMN" + (dimension.hasEncoding(Encoding.INVERTED_INDEX) match {
- case false => ",NOINVERTEDINDEX"
- case _ => ""
- })
- } else {
- "KEY COLUMN" + (dimension.hasEncoding(Encoding.INVERTED_INDEX) match {
- case false => ",NOINVERTEDINDEX"
- case _ => ""
- })
- }
- } else {
- "MEASURE"
- }
- (field.name, field.dataType.simpleString, comment)
- }
- val colPropStr = if (colProps.toString().trim().length() > 0) {
- // drops additional comma at endpom
- colProps.toString().dropRight(1)
- } else {
- colProps.toString()
- }
- results ++= Seq(("", "", ""), ("##Detailed Table Information", "", ""))
- results ++= Seq(("Database Name: ", relation.tableMeta.carbonTableIdentifier
- .getDatabaseName, "")
- )
- results ++= Seq(("Table Name: ", relation.tableMeta.carbonTableIdentifier.getTableName, ""))
- results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, ""))
- val carbonTable = relation.tableMeta.carbonTable
- results ++= Seq(("Table Block Size : ", carbonTable.getBlockSizeInMB + " MB", ""))
- results ++= Seq(("SORT_SCOPE", carbonTable.getTableInfo.getFactTable
- .getTableProperties.getOrDefault("sort_scope", CarbonCommonConstants
- .LOAD_SORT_SCOPE_DEFAULT), CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
- results ++= Seq(("", "", ""), ("##Detailed Column property", "", ""))
- if (colPropStr.length() > 0) {
- results ++= Seq((colPropStr, "", ""))
- } else {
- results ++= Seq(("ADAPTIVE", "", ""))
- }
- results ++= Seq(("SORT_COLUMNS", relation.metaData.carbonTable.getSortColumns(
- relation.tableMeta.carbonTableIdentifier.getTableName).asScala
- .map(column => column).mkString(","), ""))
- val dimension = carbonTable
- .getDimensionByTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
- results ++= getColumnGroups(dimension.asScala.toList)
- if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) {
- results ++=
- Seq(("Partition Columns: ", carbonTable.getPartitionInfo(carbonTable.getFactTableName)
- .getColumnSchemaList.asScala.map(_.getColumnName).mkString(","), ""))
- }
- results.map { case (name, dataType, comment) =>
- Row(f"$name%-36s $dataType%-80s $comment%-72s")
- }
- }
-
- private def getColumnGroups(dimensions: List[CarbonDimension]): Seq[(String, String, String)] = {
- var results: Seq[(String, String, String)] =
- Seq(("", "", ""), ("##Column Group Information", "", ""))
- val groupedDimensions = dimensions.groupBy(x => x.columnGroupId()).filter {
- case (groupId, _) => groupId != -1
- }.toSeq.sortBy(_._1)
- val groups = groupedDimensions.map(colGroups => {
- colGroups._2.map(dim => dim.getColName).mkString(", ")
- })
- var index = 1
- groups.foreach { x =>
- results = results :+ (s"Column Group $index", x, "")
- index = index + 1
- }
- results
- }
-}
-
-private[sql] case class DeleteLoadByDate(
- databaseNameOp: Option[String],
- tableName: String,
- dateField: String,
- dateValue: String
-) extends RunnableCommand {
-
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- def run(sqlContext: SQLContext): Seq[Row] = {
- val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
- LOGGER.audit(s"The delete load by date request has been received for $dbName.$tableName")
- val identifier = TableIdentifier(tableName, Option(dbName))
- val relation = CarbonEnv.get.carbonMetastore
- .lookupRelation1(identifier)(sqlContext).asInstanceOf[CarbonRelation]
- var level: String = ""
- val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName)
- if (relation == null) {
- LOGGER.audit(s"The delete load by date is failed. Table $dbName.$tableName does not exist")
- sys.error(s"Table $dbName.$tableName does not exist")
- }
- val matches: Seq[AttributeReference] = relation.dimensionsAttr.filter(
- filter => filter.name.equalsIgnoreCase(dateField) &&
- filter.dataType.isInstanceOf[TimestampType]).toList
- if (matches.isEmpty) {
- LOGGER.audit("The delete load by date is failed. " +
- s"Table $dbName.$tableName does not contain date field: $dateField")
- sys.error(s"Table $dbName.$tableName does not contain date field $dateField")
- } else {
- level = matches.asJava.get(0).name
- }
- val actualColName = relation.metaData.carbonTable.getDimensionByName(tableName, level)
- .getColName
- DataManagementFunc.deleteLoadByDate(
- sqlContext,
- new CarbonDataLoadSchema(carbonTable),
- dbName,
- tableName,
- CarbonEnv.get.carbonMetastore.storePath,
- level,
- actualColName,
- dateValue)
- LOGGER.audit(s"The delete load by date $dateValue is successful for $dbName.$tableName.")
- Seq.empty
- }
-
-}
-
-private[sql] case class CleanFiles(
- databaseNameOp: Option[String],
- tableName: String) extends RunnableCommand {
-
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- def run(sqlContext: SQLContext): Seq[Row] = {
- Checker.validateTableExists(databaseNameOp, tableName, sqlContext)
- val carbonTable = CarbonEnv.get.carbonMetastore.lookupRelation1(databaseNameOp,
- tableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
- CarbonStore.cleanFiles(
- getDB.getDatabaseName(databaseNameOp, sqlContext),
- tableName,
- sqlContext.asInstanceOf[CarbonContext].storePath,
- carbonTable,
- false)
- Seq.empty
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
deleted file mode 100644
index d23b18f..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit
-import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
-import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
-import org.apache.spark.sql.catalyst.expressions.Alias
-import org.apache.spark.sql.catalyst.plans.Inner
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.execution.command.ProjectForDeleteCommand
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-
-/**
- * Insert into carbon table from other source
- */
-object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
- def apply(plan: LogicalPlan): LogicalPlan = plan.transform {
- // Wait until children are resolved.
- case p: LogicalPlan if !p.childrenResolved => p
-
- case p @ InsertIntoTable(relation: LogicalRelation, _, child, _, _)
- if relation.relation.isInstanceOf[CarbonDatasourceRelation] =>
- castChildOutput(p, relation.relation.asInstanceOf[CarbonDatasourceRelation], child)
- }
-
- def castChildOutput(p: InsertIntoTable, relation: CarbonDatasourceRelation, child: LogicalPlan)
- : LogicalPlan = {
- if (relation.carbonRelation.output.size > CarbonCommonConstants
- .DEFAULT_MAX_NUMBER_OF_COLUMNS) {
- sys
- .error("Maximum supported column by carbon is:" + CarbonCommonConstants
- .DEFAULT_MAX_NUMBER_OF_COLUMNS
- )
- }
- if (child.output.size >= relation.carbonRelation.output.size ) {
- InsertIntoCarbonTable(relation, p.partition, p.child, p.overwrite, p.ifNotExists)
- } else {
- sys.error("Cannot insert into target table because column number are different")
- }
- }
-}
-
-
-object CarbonIUDAnalysisRule extends Rule[LogicalPlan] {
-
- var sqlContext: SQLContext = _
-
- def init(sqlContext: SQLContext) {
- this.sqlContext = sqlContext
- }
-
- private def processUpdateQuery(
- table: UnresolvedRelation,
- columns: List[String],
- selectStmt: String,
- filter: String): LogicalPlan = {
- var includedDestColumns = false
- var includedDestRelation = false
- var addedTupleId = false
-
- def prepareTargetReleation(relation: UnresolvedRelation): Subquery = {
- val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId",
- Seq.empty, isDistinct = false), "tupleId")())
- val projList = Seq(
- UnresolvedAlias(UnresolvedStar(table.alias)), tupleId)
- // include tuple id and rest of the required columns in subqury
- Subquery(table.alias.getOrElse(""), Project(projList, relation))
- }
- // get the un-analyzed logical plan
- val targetTable = prepareTargetReleation(table)
- val selectPlan = org.apache.spark.sql.SQLParser.parse(selectStmt, sqlContext) transform {
- case Project(projectList, child) if (!includedDestColumns) =>
- includedDestColumns = true
- if (projectList.size != columns.size) {
- sys.error("Number of source and destination columns are not matching")
- }
- val renamedProjectList = projectList.zip(columns).map{ case(attr, col) =>
- attr match {
- case UnresolvedAlias(child) =>
- UnresolvedAlias(Alias(child, col + "-updatedColumn")())
- case _ => attr
- }
- }
- val list = Seq(
- UnresolvedAlias(UnresolvedStar(table.alias))) ++ renamedProjectList
- Project(list, child)
- case Filter(cond, child) if (!includedDestRelation) =>
- includedDestRelation = true
- Filter(cond, Join(child, targetTable, Inner, None))
- case r @ UnresolvedRelation(t, a) if (!includedDestRelation &&
- t != table.tableIdentifier) =>
- includedDestRelation = true
- Join(r, targetTable, Inner, None)
- }
- val updatedSelectPlan = if (!includedDestRelation) {
- // special case to handle self join queries
- // Eg. update tableName SET (column1) = (column1+1)
- selectPlan transform {
- case relation: UnresolvedRelation if (table.tableIdentifier == relation.tableIdentifier &&
- addedTupleId == false) =>
- addedTupleId = true
- targetTable
- }
- } else {
- selectPlan
- }
- val finalPlan = if (filter.length > 0) {
- val alias = table.alias.getOrElse("")
- var transformed: Boolean = false
- // Create a dummy projection to include filter conditions
- SQLParser.parse("select * from " +
- table.tableIdentifier.mkString(".") + " " + alias + " " + filter, sqlContext) transform {
- case UnresolvedRelation(t, Some(a)) if (
- !transformed && t == table.tableIdentifier && a == alias) =>
- transformed = true
- // Add the filter condition of update statement on destination table
- Subquery(alias, updatedSelectPlan)
- }
- } else {
- updatedSelectPlan
- }
- val tid = CarbonTableIdentifierImplicit.toTableIdentifier(table.tableIdentifier)
- val tidSeq = Seq(getDB.getDatabaseName(tid.database, sqlContext), tid.table)
- val destinationTable = UnresolvedRelation(tidSeq, table.alias)
- ProjectForUpdate(destinationTable, columns, Seq(finalPlan))
- }
-
- def processDeleteRecordsQuery(selectStmt: String, table: UnresolvedRelation): LogicalPlan = {
- val tid = CarbonTableIdentifierImplicit.toTableIdentifier(table.tableIdentifier)
- val tidSeq = Seq(getDB.getDatabaseName(tid.database, sqlContext), tid.table)
- var addedTupleId = false
- val selectPlan = SQLParser.parse(selectStmt, sqlContext) transform {
- case relation: UnresolvedRelation if (table.tableIdentifier == relation.tableIdentifier &&
- addedTupleId == false) =>
- addedTupleId = true
- val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId",
- Seq.empty, isDistinct = false), "tupleId")())
- val projList = Seq(
- UnresolvedAlias(UnresolvedStar(table.alias)), tupleId)
- // include tuple id in subqury
- Project(projList, relation)
- }
- ProjectForDeleteCommand(
- selectPlan,
- tidSeq,
- System.currentTimeMillis().toString)
- }
-
- override def apply(logicalplan: LogicalPlan): LogicalPlan = {
-
- logicalplan transform {
- case UpdateTable(t, cols, sel, where) => processUpdateQuery(t, cols, sel, where)
- case DeleteRecords(statement, table) => processDeleteRecordsQuery(statement, table)
- }
- }
-}