You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2017/10/19 10:33:10 UTC

[5/9] carbondata git commit: [CARBONDATA-1597] Remove spark1 integration

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
deleted file mode 100644
index cb35960..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
+++ /dev/null
@@ -1,842 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command
-
-import java.util
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
-import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum}
-import org.apache.carbondata.core.mutate.data.RowCountDetailsVO
-import org.apache.carbondata.core.statusmanager.{SegmentStatusManager, SegmentUpdateStatusManager}
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
-import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
-import org.apache.carbondata.processing.exception.MultipleMatchingException
-import org.apache.carbondata.processing.loading.FailureCauses
-import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CarbonDataMergerUtilResult, CompactionType}
-import org.apache.carbondata.spark.DeleteDelataResultImpl
-import org.apache.carbondata.spark.util.QueryPlanUtil
-
-
-/**
- * IUD update delete and compaction framework.
- *
- */
-
-private[sql] case class ProjectForDeleteCommand(
-     plan: LogicalPlan,
-     identifier: Seq[String],
-     timestamp: String) extends RunnableCommand {
-
-  val LOG = LogServiceFactory.getLogService(this.getClass.getName)
-  var horizontalCompactionFailed = false
-
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-
-    val dataFrame = DataFrame(sqlContext, plan)
-    val dataRdd = dataFrame.rdd
-
-    val relation = CarbonEnv.get.carbonMetastore
-      .lookupRelation1(deleteExecution.getTableIdentifier(identifier))(sqlContext).
-      asInstanceOf[CarbonRelation]
-    val carbonTable = relation.tableMeta.carbonTable
-    val metadataLock = CarbonLockFactory
-      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-        LockUsage.METADATA_LOCK)
-    var lockStatus = false
-    try {
-      lockStatus = metadataLock.lockWithRetries()
-      LOG.audit(s" Delete data request has been received " +
-                s"for ${ relation.databaseName }.${ relation.tableName }.")
-      if (lockStatus) {
-        LOG.info("Successfully able to get the table metadata file lock")
-      }
-      else {
-        throw new Exception("Table is locked for deletion. Please try after some time")
-      }
-      val tablePath = CarbonStorePath.getCarbonTablePath(
-        carbonTable.getStorePath,
-        carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier)
-      var executorErrors = new ExecutionErrors(FailureCauses.NONE, "")
-
-        // handle the clean up of IUD.
-        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
-
-          if (deleteExecution
-            .deleteDeltaExecution(identifier, sqlContext, dataRdd, timestamp, relation,
-              false, executorErrors)) {
-            // call IUD Compaction.
-            IUDCommon.tryHorizontalCompaction(sqlContext, relation, isUpdateOperation = false)
-          }
-    } catch {
-      case e: HorizontalCompactionException =>
-          LOG.error("Delete operation passed. Exception in Horizontal Compaction." +
-              " Please check logs. " + e.getMessage)
-          CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
-
-      case e: Exception =>
-        LOG.error("Exception in Delete data operation " + e.getMessage)
-        // ****** start clean up.
-        // In case of failure , clean all related delete delta files
-        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
-
-        // clean up. Null check is required as for executor error some times message is null
-        if (null != e.getMessage) {
-          sys.error("Delete data operation is failed. " + e.getMessage)
-        }
-        else {
-          sys.error("Delete data operation is failed. Please check logs.")
-        }
-    } finally {
-      if (lockStatus) {
-        CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK)
-      }
-    }
-    Seq.empty
-  }
-}
-
-private[sql] case class ProjectForUpdateCommand(
-    plan: LogicalPlan, tableIdentifier: Seq[String]) extends RunnableCommand {
-  val LOGGER = LogServiceFactory.getLogService(ProjectForUpdateCommand.getClass.getName)
-
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-
-    val res = plan find {
-      case relation: LogicalRelation if (relation.relation
-        .isInstanceOf[CarbonDatasourceRelation]) =>
-        true
-      case _ => false
-    }
-
-    if (!res.isDefined) {
-      return Seq.empty
-    }
-
-    val relation = CarbonEnv.get.carbonMetastore
-      .lookupRelation1(deleteExecution.getTableIdentifier(tableIdentifier))(sqlContext).
-      asInstanceOf[CarbonRelation]
-    val carbonTable = relation.tableMeta.carbonTable
-    val metadataLock = CarbonLockFactory
-      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-        LockUsage.METADATA_LOCK)
-    var lockStatus = false
-    // get the current time stamp which should be same for delete and update.
-    val currentTime = CarbonUpdateUtil.readCurrentTime
-    var dataFrame: DataFrame = null
-    val isPersistEnabledUserValue = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.isPersistEnabled,
-        CarbonCommonConstants.defaultValueIsPersistEnabled)
-   var isPersistEnabled = CarbonCommonConstants.defaultValueIsPersistEnabled.toBoolean
-    if (isPersistEnabledUserValue.equalsIgnoreCase("false")) {
-      isPersistEnabled = false
-    }
-    else if (isPersistEnabledUserValue.equalsIgnoreCase("true")) {
-      isPersistEnabled = true
-    }
-    try {
-      lockStatus = metadataLock.lockWithRetries()
-      if (lockStatus) {
-        logInfo("Successfully able to get the table metadata file lock")
-      }
-      else {
-        throw new Exception("Table is locked for updation. Please try after some time")
-      }
-      val tablePath = CarbonStorePath.getCarbonTablePath(
-        carbonTable.getStorePath,
-        carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier)
-        // Get RDD.
-        dataFrame = if (isPersistEnabled) {
-          DataFrame(sqlContext, plan)
-            .persist(StorageLevel.MEMORY_AND_DISK)
-        }
-        else {
-          DataFrame(sqlContext, plan)
-        }
-        var executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
-
-
-        // handle the clean up of IUD.
-        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
-
-        // do delete operation.
-        deleteExecution.deleteDeltaExecution(tableIdentifier, sqlContext, dataFrame.rdd,
-          currentTime + "",
-        relation, isUpdateOperation = true, executionErrors)
-
-        if(executionErrors.failureCauses != FailureCauses.NONE) {
-          throw new Exception(executionErrors.errorMsg)
-        }
-
-        // do update operation.
-        UpdateExecution.performUpdate(dataFrame, tableIdentifier, plan,
-          sqlContext, currentTime, executionErrors)
-
-        if(executionErrors.failureCauses != FailureCauses.NONE) {
-          throw new Exception(executionErrors.errorMsg)
-        }
-
-        // Do IUD Compaction.
-        IUDCommon.tryHorizontalCompaction(sqlContext, relation, isUpdateOperation = true)
-    }
-
-    catch {
-      case e: HorizontalCompactionException =>
-        LOGGER.error(
-            "Update operation passed. Exception in Horizontal Compaction. Please check logs." + e)
-        // In case of failure , clean all related delta files
-        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
-
-      case e: Exception =>
-        LOGGER.error("Exception in update operation" + e)
-        // ****** start clean up.
-        // In case of failure , clean all related delete delta files
-        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, currentTime + "")
-
-        // *****end clean up.
-        if (null != e.getMessage) {
-          sys.error("Update operation failed. " + e.getMessage)
-        }
-        if (null != e.getCause && null != e.getCause.getMessage) {
-          sys.error("Update operation failed. " + e.getCause.getMessage)
-        }
-        sys.error("Update operation failed. please check logs.")
-    }
-    finally {
-      if (null != dataFrame && isPersistEnabled) {
-        dataFrame.unpersist()
-      }
-      if (lockStatus) {
-        CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK)
-      }
-    }
-    Seq.empty
-  }
-}
-
-object IUDCommon {
-
-  val LOG = LogServiceFactory.getLogService(this.getClass.getName)
-
-  /**
-   * The method does horizontal compaction. After Update and Delete completion
-   * tryHorizontal compaction will be called. In case this method is called after
-   * Update statement then Update Compaction followed by Delete Compaction will be
-   * processed whereas for tryHorizontalCompaction called after Delete statement
-   * then only Delete Compaction will be processed.
-    *
-    * @param sqlContext
-   * @param carbonRelation
-   * @param isUpdateOperation
-   */
-  def tryHorizontalCompaction(sqlContext: SQLContext,
-      carbonRelation: CarbonRelation,
-      isUpdateOperation: Boolean): Unit = {
-
-    var ishorizontalCompaction = CarbonDataMergerUtil.isHorizontalCompactionEnabled()
-
-    if (ishorizontalCompaction == false) {
-      return
-    }
-
-    var compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
-    val carbonTable = carbonRelation.tableMeta.carbonTable
-    val (db, table) = (carbonTable.getDatabaseName, carbonTable.getFactTableName)
-    val absTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-    val updateTimeStamp = System.currentTimeMillis()
-    // To make sure that update and delete timestamps are not same,
-    // required to commit to status metadata and cleanup
-    val deleteTimeStamp = updateTimeStamp + 1
-
-    // get the valid segments
-    var segLists = CarbonDataMergerUtil.getValidSegmentList(absTableIdentifier)
-
-    if (segLists == null || segLists.size() == 0) {
-      return
-    }
-
-    // Should avoid reading Table Status file from Disk every time. Better to load it
-    // in-memory at the starting and pass it along the routines. The constructor of
-    // SegmentUpdateStatusManager reads the Table Status File and Table Update Status
-    // file and save the content in segmentDetails and updateDetails respectively.
-    val segmentUpdateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
-      absTableIdentifier)
-
-    if (isUpdateOperation == true) {
-
-      // This is only update operation, perform only update compaction.
-      compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
-      performUpdateDeltaCompaction(sqlContext,
-        compactionTypeIUD,
-        carbonTable,
-        absTableIdentifier,
-        segmentUpdateStatusManager,
-        updateTimeStamp,
-        segLists)
-    }
-
-    // After Update Compaction perform delete compaction
-    compactionTypeIUD = CompactionType.IUD_DELETE_DELTA_COMPACTION
-    segLists = CarbonDataMergerUtil.getValidSegmentList(absTableIdentifier)
-    if (segLists == null || segLists.size() == 0) {
-      return
-    }
-
-    // Delete Compaction
-    performDeleteDeltaCompaction(sqlContext,
-      compactionTypeIUD,
-      carbonTable,
-      absTableIdentifier,
-      segmentUpdateStatusManager,
-      deleteTimeStamp,
-      segLists)
-  }
-
-  /**
-   * Update Delta Horizontal Compaction.
-    *
-    * @param sqlContext
-   * @param compactionTypeIUD
-   * @param carbonTable
-   * @param absTableIdentifier
-   * @param segLists
-   */
-  private def performUpdateDeltaCompaction(sqlContext: SQLContext,
-      compactionTypeIUD: CompactionType,
-      carbonTable: CarbonTable,
-      absTableIdentifier: AbsoluteTableIdentifier,
-      segmentUpdateStatusManager: SegmentUpdateStatusManager,
-      factTimeStamp: Long,
-      segLists: util.List[String]): Unit = {
-    val db = carbonTable.getDatabaseName
-    val table = carbonTable.getFactTableName
-    // get the valid segments qualified for update compaction.
-    val validSegList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists,
-      absTableIdentifier,
-      segmentUpdateStatusManager,
-      compactionTypeIUD)
-
-    if (validSegList.size() == 0) {
-      return
-    }
-
-    LOG.info(s"Horizontal Update Compaction operation started for [${db}.${table}].")
-    LOG.audit(s"Horizontal Update Compaction operation started for [${db}.${table}].")
-
-    try {
-      // Update Compaction.
-      val altertablemodel = AlterTableModel(Option(carbonTable.getDatabaseName),
-        carbonTable.getFactTableName,
-        Some(segmentUpdateStatusManager),
-        CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString,
-        Some(factTimeStamp),
-        "")
-
-      AlterTableCompaction(altertablemodel).run(sqlContext)
-    }
-    catch {
-      case e: Exception =>
-        val msg = if (null != e.getMessage) {
-          e.getMessage
-        } else {
-          "Please check logs for more info"
-        }
-        throw new HorizontalCompactionException(
-          s"Horizontal Update Compaction Failed for [${ db }.${ table }]. " + msg, factTimeStamp)
-    }
-    LOG.info(s"Horizontal Update Compaction operation completed for [${ db }.${ table }].")
-    LOG.audit(s"Horizontal Update Compaction operation completed for [${ db }.${ table }].")
-  }
-
-  /**
-   * Delete Delta Horizontal Compaction.
-    *
-    * @param sqlContext
-   * @param compactionTypeIUD
-   * @param carbonTable
-   * @param absTableIdentifier
-   * @param segLists
-   */
-  private def performDeleteDeltaCompaction(sqlContext: SQLContext,
-      compactionTypeIUD: CompactionType,
-      carbonTable: CarbonTable,
-      absTableIdentifier: AbsoluteTableIdentifier,
-      segmentUpdateStatusManager: SegmentUpdateStatusManager,
-      factTimeStamp: Long,
-      segLists: util.List[String]): Unit = {
-
-    val db = carbonTable.getDatabaseName
-    val table = carbonTable.getFactTableName
-    val deletedBlocksList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists,
-      absTableIdentifier,
-      segmentUpdateStatusManager,
-      compactionTypeIUD)
-
-    if (deletedBlocksList.size() == 0) {
-      return
-    }
-
-    LOG.info(s"Horizontal Delete Compaction operation started for [${db}.${table}].")
-    LOG.audit(s"Horizontal Delete Compaction operation started for [${db}.${table}].")
-
-    try {
-
-      // Delete Compaction RDD
-      val rdd1 = sqlContext.sparkContext
-        .parallelize(deletedBlocksList.asScala.toSeq, deletedBlocksList.size())
-
-      val timestamp = factTimeStamp
-      val updateStatusDetails = segmentUpdateStatusManager.getUpdateStatusDetails
-      val result = rdd1.mapPartitions(iter =>
-        new Iterator[Seq[CarbonDataMergerUtilResult]] {
-          override def hasNext: Boolean = iter.hasNext
-
-          override def next(): Seq[CarbonDataMergerUtilResult] = {
-            val segmentAndBlocks = iter.next
-            val segment = segmentAndBlocks.substring(0, segmentAndBlocks.lastIndexOf("/"))
-            val blockName = segmentAndBlocks
-              .substring(segmentAndBlocks.lastIndexOf("/") + 1, segmentAndBlocks.length)
-
-            val result = CarbonDataMergerUtil.compactBlockDeleteDeltaFiles(segment, blockName,
-              absTableIdentifier,
-              updateStatusDetails,
-              timestamp)
-
-            result.asScala.toList
-
-          }
-        }).collect
-
-      val resultList = ListBuffer[CarbonDataMergerUtilResult]()
-      result.foreach(x => {
-        x.foreach(y => {
-          resultList += y
-        })
-      })
-
-      val updateStatus = CarbonDataMergerUtil.updateStatusFile(resultList.toList.asJava,
-        carbonTable,
-        timestamp.toString,
-        segmentUpdateStatusManager)
-      if (updateStatus == false) {
-        LOG.audit(s"Delete Compaction data operation is failed for [${db}.${table}].")
-        LOG.error("Delete Compaction data operation is failed.")
-        throw new HorizontalCompactionException(
-          s"Horizontal Delete Compaction Failed for [${db}.${table}] ." +
-          s" Please check logs for more info.", factTimeStamp)
-      }
-      else {
-        LOG.info(s"Horizontal Delete Compaction operation completed for [${db}.${table}].")
-        LOG.audit(s"Horizontal Delete Compaction operation completed for [${db}.${table}].")
-      }
-    }
-    catch {
-      case e: Exception =>
-        val msg = if (null != e.getMessage) {
-          e.getMessage
-        } else {
-          "Please check logs for more info"
-        }
-        throw new HorizontalCompactionException(
-          s"Horizontal Delete Compaction Failed for [${ db }.${ table }]. " + msg, factTimeStamp)
-    }
-  }
-}
-
-class HorizontalCompactionException(
-    message: String,
-    // required for cleanup
-    val compactionTimeStamp: Long) extends RuntimeException(message) {
-}
-
-object deleteExecution {
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-
-  def getTableIdentifier(tableIdentifier: Seq[String]): TableIdentifier = {
-    if (tableIdentifier.size > 1) {
-      TableIdentifier(tableIdentifier(1), Some(tableIdentifier(0)))
-    } else {
-      TableIdentifier(tableIdentifier(0), None)
-    }
-  }
-
-  def deleteDeltaExecution(identifier: Seq[String],
-                           sqlContext: SQLContext,
-                           dataRdd: RDD[Row],
-                           timestamp: String, relation: CarbonRelation, isUpdateOperation: Boolean,
-                           executorErrors: ExecutionErrors): Boolean = {
-
-    var res: Array[List[(String, (SegmentUpdateDetails, ExecutionErrors))]] = null
-    val tableName = getTableIdentifier(identifier).table
-    val database = getDB.getDatabaseName(getTableIdentifier(identifier).database, sqlContext)
-    val relation = CarbonEnv.get.carbonMetastore
-      .lookupRelation1(getTableIdentifier(identifier))(sqlContext).
-      asInstanceOf[CarbonRelation]
-
-    val storeLocation = relation.tableMeta.storePath
-    val absoluteTableIdentifier: AbsoluteTableIdentifier = new
-        AbsoluteTableIdentifier(storeLocation,
-          relation.tableMeta.carbonTableIdentifier)
-    var tablePath = CarbonStorePath
-      .getCarbonTablePath(storeLocation,
-        absoluteTableIdentifier.getCarbonTableIdentifier())
-    var tableUpdateStatusPath = tablePath.getTableUpdateStatusFilePath
-    val totalSegments =
-      SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath).length
-    var factPath = tablePath.getFactDir
-
-    var carbonTable = relation.tableMeta.carbonTable
-    var deleteStatus = true
-    val deleteRdd = if (isUpdateOperation) {
-      val schema =
-        org.apache.spark.sql.types.StructType(Seq(org.apache.spark.sql.types.StructField(
-          CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
-          org.apache.spark.sql.types.StringType)))
-      val rdd = dataRdd
-        .map(row => Row(row.get(row.fieldIndex(
-          CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))))
-      sqlContext.createDataFrame(rdd, schema).rdd
-    } else {
-      dataRdd
-    }
-
-    val (carbonInputFormat, job) =
-      QueryPlanUtil.createCarbonInputFormat(absoluteTableIdentifier)
-
-    val keyRdd = deleteRdd.map({ row =>
-      val tupleId: String = row
-        .getString(row.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
-      val key = CarbonUpdateUtil.getSegmentWithBlockFromTID(tupleId)
-      (key, row)
-    }).groupByKey()
-
-    // if no loads are present then no need to do anything.
-    if (keyRdd.partitions.size == 0) {
-      return true
-    }
-
-    var blockMappingVO = carbonInputFormat.getBlockRowCount(job, absoluteTableIdentifier)
-    val segmentUpdateStatusMngr = new SegmentUpdateStatusManager(absoluteTableIdentifier)
-    CarbonUpdateUtil
-      .createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr)
-
-    val rowContRdd = sqlContext.sparkContext
-      .parallelize(blockMappingVO.getCompleteBlockRowDetailVO.asScala.toSeq,
-        keyRdd.partitions.size)
-
-    val rdd = rowContRdd.join(keyRdd)
-
-    res = rdd.mapPartitionsWithIndex(
-      (index: Int, records: Iterator[((String), (RowCountDetailsVO, Iterable[Row]))]) =>
-        Iterator[List[(String, (SegmentUpdateDetails, ExecutionErrors))]] {
-
-          var result = List[(String, (SegmentUpdateDetails, ExecutionErrors))]()
-          while (records.hasNext) {
-            val ((key), (rowCountDetailsVO, groupedRows)) = records.next
-            result = result ++
-              deleteDeltaFunc(index,
-                key,
-                groupedRows.toIterator,
-                timestamp,
-                rowCountDetailsVO)
-
-          }
-          result
-        }
-    ).collect()
-
-    // if no loads are present then no need to do anything.
-    if (res.isEmpty) {
-      return true
-    }
-
-    // update new status file
-    checkAndUpdateStatusFiles
-
-    // all or none : update status file, only if complete delete opeartion is successfull.
-    def checkAndUpdateStatusFiles: Unit = {
-      val blockUpdateDetailsList = new util.ArrayList[SegmentUpdateDetails]()
-      val segmentDetails = new util.HashSet[String]()
-      res.foreach(resultOfSeg => resultOfSeg.foreach(
-        resultOfBlock => {
-          if (resultOfBlock._1.equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)) {
-            blockUpdateDetailsList.add(resultOfBlock._2._1)
-            segmentDetails.add(resultOfBlock._2._1.getSegmentName)
-            // if this block is invalid then decrement block count in map.
-            if (CarbonUpdateUtil.isBlockInvalid(resultOfBlock._2._1.getStatus)) {
-              CarbonUpdateUtil.decrementDeletedBlockCount(resultOfBlock._2._1,
-                blockMappingVO.getSegmentNumberOfBlockMapping)
-            }
-          }
-          else {
-            deleteStatus = false
-            // In case of failure , clean all related delete delta files
-            CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
-            LOGGER.audit(s"Delete data operation is failed for ${ database }.${ tableName }")
-            val errorMsg =
-              "Delete data operation is failed due to failure in creating delete delta file for " +
-                "segment : " + resultOfBlock._2._1.getSegmentName + " block : " +
-                resultOfBlock._2._1.getBlockName
-            executorErrors.failureCauses = resultOfBlock._2._2.failureCauses
-            executorErrors.errorMsg = resultOfBlock._2._2.errorMsg
-
-            if (executorErrors.failureCauses == FailureCauses.NONE) {
-              executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
-              executorErrors.errorMsg = errorMsg
-            }
-            LOGGER.error(errorMsg)
-            return
-          }
-        }
-      )
-      )
-
-      val listOfSegmentToBeMarkedDeleted = CarbonUpdateUtil
-        .getListOfSegmentsToMarkDeleted(blockMappingVO.getSegmentNumberOfBlockMapping)
-
-
-
-      // this is delete flow so no need of putting timestamp in the status file.
-      if (CarbonUpdateUtil
-        .updateSegmentStatus(blockUpdateDetailsList, carbonTable, timestamp, false) &&
-        CarbonUpdateUtil
-          .updateTableMetadataStatus(segmentDetails,
-            carbonTable,
-            timestamp,
-            !isUpdateOperation,
-            listOfSegmentToBeMarkedDeleted)
-      ) {
-        LOGGER.info(s"Delete data operation is successful for ${ database }.${ tableName }")
-        LOGGER.audit(s"Delete data operation is successful for ${ database }.${ tableName }")
-      }
-      else {
-        // In case of failure , clean all related delete delta files
-        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
-
-        val errorMessage = "Delete data operation is failed due to failure " +
-          "in table status updation."
-        LOGGER.audit(s"Delete data operation is failed for ${ database }.${ tableName }")
-        LOGGER.error("Delete data operation is failed due to failure in table status updation.")
-        executorErrors.failureCauses = FailureCauses.STATUS_FILE_UPDATION_FAILURE
-        executorErrors.errorMsg = errorMessage
-        // throw new Exception(errorMessage)
-      }
-    }
-
-    def deleteDeltaFunc(index: Int,
-                        key: String,
-                        iter: Iterator[Row],
-                        timestamp: String,
-                        rowCountDetailsVO: RowCountDetailsVO):
-    Iterator[(String, (SegmentUpdateDetails, ExecutionErrors))] = {
-
-      val result = new DeleteDelataResultImpl()
-      var deleteStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
-      val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-      // here key = segment/blockName
-      val blockName = CarbonUpdateUtil
-        .getBlockName(
-          CarbonTablePath.addDataPartPrefix(key.split(CarbonCommonConstants.FILE_SEPARATOR)(1)))
-      val segmentId = key.split(CarbonCommonConstants.FILE_SEPARATOR)(0)
-      var deleteDeltaBlockDetails: DeleteDeltaBlockDetails = new DeleteDeltaBlockDetails(blockName)
-      val resultIter = new Iterator[(String, (SegmentUpdateDetails, ExecutionErrors))] {
-        val segmentUpdateDetails = new SegmentUpdateDetails()
-        var TID = ""
-        var countOfRows = 0
-        try {
-          while (iter.hasNext) {
-            val oneRow = iter.next
-            TID = oneRow
-              .get(oneRow.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).toString
-            val offset = CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.OFFSET)
-            val blockletId = CarbonUpdateUtil
-              .getRequiredFieldFromTID(TID, TupleIdEnum.BLOCKLET_ID)
-            val pageId = Integer.parseInt(CarbonUpdateUtil
-              .getRequiredFieldFromTID(TID, TupleIdEnum.PAGE_ID))
-            val IsValidOffset = deleteDeltaBlockDetails.addBlocklet(blockletId, offset, pageId)
-            // stop delete operation
-            if(!IsValidOffset) {
-              executorErrors.failureCauses = FailureCauses.MULTIPLE_INPUT_ROWS_MATCHING
-              executorErrors.errorMsg = "Multiple input rows matched for same row."
-              throw new MultipleMatchingException("Multiple input rows matched for same row.")
-            }
-            countOfRows = countOfRows + 1
-          }
-
-          val blockPath = CarbonUpdateUtil.getTableBlockPath(TID, factPath)
-          val completeBlockName = CarbonTablePath
-            .addDataPartPrefix(CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.BLOCK_ID) +
-              CarbonCommonConstants.FACT_FILE_EXT)
-          val deleteDeletaPath = CarbonUpdateUtil
-            .getDeleteDeltaFilePath(blockPath, blockName, timestamp)
-          val carbonDeleteWriter = new CarbonDeleteDeltaWriterImpl(deleteDeletaPath,
-            FileFactory.getFileType(deleteDeletaPath))
-
-
-
-          segmentUpdateDetails.setBlockName(blockName)
-          segmentUpdateDetails.setActualBlockName(completeBlockName)
-          segmentUpdateDetails.setSegmentName(segmentId)
-          segmentUpdateDetails.setDeleteDeltaEndTimestamp(timestamp)
-          segmentUpdateDetails.setDeleteDeltaStartTimestamp(timestamp)
-
-          val alreadyDeletedRows: Long = rowCountDetailsVO.getDeletedRowsInBlock
-          val totalDeletedRows: Long = alreadyDeletedRows + countOfRows
-          segmentUpdateDetails.setDeletedRowsInBlock(totalDeletedRows.toString)
-          if (totalDeletedRows == rowCountDetailsVO.getTotalNumberOfRows) {
-            segmentUpdateDetails.setStatus(CarbonCommonConstants.MARKED_FOR_DELETE)
-          }
-          else {
-            // write the delta file
-            carbonDeleteWriter.write(deleteDeltaBlockDetails)
-          }
-
-          deleteStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
-        } catch {
-          case e : MultipleMatchingException =>
-            LOGGER.audit(e.getMessage)
-            LOGGER.error(e.getMessage)
-          // dont throw exception here.
-          case e: Exception =>
-            val errorMsg = s"Delete data operation is failed for ${ database }.${ tableName }."
-            LOGGER.audit(errorMsg)
-            LOGGER.error(errorMsg + e.getMessage)
-            throw e
-        }
-
-
-        var finished = false
-
-        override def hasNext: Boolean = {
-          if (!finished) {
-            finished = true
-            finished
-          }
-          else {
-            !finished
-          }
-        }
-
-        override def next(): (String, (SegmentUpdateDetails, ExecutionErrors)) = {
-          finished = true
-          result.getKey(deleteStatus, (segmentUpdateDetails, executorErrors))
-        }
-      }
-      resultIter
-    }
-    true
-  }
-}
-
-
-
-object UpdateExecution {
-
-  def performUpdate(
-         dataFrame: DataFrame,
-         tableIdentifier: Seq[String],
-         plan: LogicalPlan,
-         sqlContext: SQLContext,
-         currentTime: Long,
-         executorErrors: ExecutionErrors): Unit = {
-
-    def isDestinationRelation(relation: CarbonDatasourceRelation): Boolean = {
-
-      val tableName = relation.getTable()
-      val dbName = relation.getDatabaseName()
-      (tableIdentifier.size > 1 &&
-        tableIdentifier(0) == dbName &&
-        tableIdentifier(1) == tableName) ||
-        (tableIdentifier(0) == tableName)
-    }
-    def getHeader(relation: CarbonDatasourceRelation, plan: LogicalPlan): String = {
-      var header = ""
-      var found = false
-
-      plan match {
-        case Project(pList, _) if (!found) =>
-          found = true
-          header = pList
-            .filter(field => !field.name
-              .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
-            .map(col => if (col.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION)) {
-              col.name
-                .substring(0, col.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION))
-            }
-            else {
-              col.name
-            }).mkString(",")
-      }
-      header
-    }
-    val ex = dataFrame.queryExecution.analyzed
-    val res = ex find {
-      case relation: LogicalRelation if (relation.relation.isInstanceOf[CarbonDatasourceRelation] &&
-        isDestinationRelation(relation.relation
-          .asInstanceOf[CarbonDatasourceRelation])) =>
-        true
-      case _ => false
-    }
-    val carbonRelation: CarbonDatasourceRelation = res match {
-      case Some(relation: LogicalRelation) =>
-        relation.relation.asInstanceOf[CarbonDatasourceRelation]
-      case _ => sys.error("")
-    }
-
-    val updateTableModel = UpdateTableModel(true, currentTime, executorErrors)
-
-    val header = getHeader(carbonRelation, plan)
-
-
-
-    LoadTable(
-      Some(carbonRelation.getDatabaseName()),
-      carbonRelation.getTable(),
-      null,
-      Seq(),
-      Map(("fileheader" -> header)),
-      false,
-      null,
-      Some(dataFrame),
-      Some(updateTableModel)).run(sqlContext)
-
-
-    executorErrors.errorMsg = updateTableModel.executorErrors.errorMsg
-    executorErrors.failureCauses = updateTableModel.executorErrors.failureCauses
-
-    Seq.empty
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
deleted file mode 100644
index 9814cc2..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ /dev/null
@@ -1,1019 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command
-
-import java.io.File
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
-import scala.language.implicitConversions
-
-import org.apache.commons.lang3.StringUtils
-import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.{RunnableCommand, SparkPlan}
-import org.apache.spark.sql.hive.CarbonMetastore
-import org.apache.spark.sql.types.TimestampType
-import org.apache.spark.util.{CausedBy, FileUtils}
-import org.codehaus.jackson.map.ObjectMapper
-
-import org.apache.carbondata.api.CarbonStore
-import org.apache.carbondata.common.constants.LoggerAction
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.dictionary.server.DictionaryServer
-import org.apache.carbondata.core.exception.InvalidConfigurationException
-import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
-import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.TableInfo
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
-import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.exception.DataLoadingException
-import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
-import org.apache.carbondata.processing.loading.exception.NoRetryException
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.processing.util.TableOptionConstant
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.load.ValidateUtil
-import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc, DictionaryLoadModel}
-import org.apache.carbondata.spark.util.{CommonUtil, GlobalDictionaryUtil}
-
-object Checker {
-  def validateTableExists(
-      dbName: Option[String],
-      tableName: String,
-      sqlContext: SQLContext): Unit = {
-    val identifier = TableIdentifier(tableName, dbName)
-    if (!CarbonEnv.get.carbonMetastore.tableExists(identifier)(sqlContext)) {
-      val err = s"table $dbName.$tableName not found"
-      LogServiceFactory.getLogService(this.getClass.getName).error(err)
-      throw new IllegalArgumentException(err)
-    }
-  }
-}
-
-/**
- * Command for show table partitions Command
- *
- * @param tableIdentifier
- */
-private[sql] case class ShowCarbonPartitionsCommand(
-    tableIdentifier: TableIdentifier) extends RunnableCommand {
-  val LOGGER = LogServiceFactory.getLogService(ShowCarbonPartitionsCommand.getClass.getName)
-  override val output = CommonUtil.partitionInfoOutput
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val relation = CarbonEnv.get.carbonMetastore
-      .lookupRelation1(tableIdentifier)(sqlContext).
-      asInstanceOf[CarbonRelation]
-    val carbonTable = relation.tableMeta.carbonTable
-    var tableName = carbonTable.getFactTableName
-    var partitionInfo = carbonTable.getPartitionInfo(
-      carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
-    if (partitionInfo == null) {
-      throw new AnalysisException(
-        s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableName")
-    }
-    var partitionType = partitionInfo.getPartitionType
-    var columnName = partitionInfo.getColumnSchemaList.get(0).getColumnName
-    LOGGER.info("partition column name:" + columnName)
-    CommonUtil.getPartitionInfo(columnName, partitionType, partitionInfo)
-  }
-}
-
-/**
- * Command for the compaction in alter table command
- *
- * @param alterTableModel
- */
-private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel) extends
-  RunnableCommand {
-
-  def run(sqlContext: SQLContext): Seq[Row] = {
-    // TODO : Implement it.
-    val tableName = alterTableModel.tableName
-    val databaseName = getDB.getDatabaseName(alterTableModel.dbName, sqlContext)
-    if (null == CarbonMetadata.getInstance.getCarbonTable(databaseName + "_" + tableName)) {
-      logError(s"alter table failed. table not found: $databaseName.$tableName")
-      sys.error(s"alter table failed. table not found: $databaseName.$tableName")
-    }
-
-    val relation =
-      CarbonEnv.get.carbonMetastore
-        .lookupRelation1(Option(databaseName), tableName)(sqlContext)
-        .asInstanceOf[CarbonRelation]
-    if (relation == null) {
-      sys.error(s"Table $databaseName.$tableName does not exist")
-    }
-    val carbonLoadModel = new CarbonLoadModel()
-
-
-    val table = relation.tableMeta.carbonTable
-    carbonLoadModel.setTableName(table.getFactTableName)
-    val dataLoadSchema = new CarbonDataLoadSchema(table)
-    // Need to fill dimension relation
-    carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
-    carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
-    carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
-    carbonLoadModel.setStorePath(relation.tableMeta.storePath)
-
-    var storeLocation = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
-        System.getProperty("java.io.tmpdir")
-      )
-    storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
-    try {
-      CarbonDataRDDFactory.alterTableForCompaction(sqlContext,
-          alterTableModel,
-          carbonLoadModel,
-          relation.tableMeta.storePath,
-          storeLocation
-        )
-    } catch {
-      case e: Exception =>
-        if (null != e.getMessage) {
-          sys.error(s"Compaction failed. Please check logs for more info. ${ e.getMessage }")
-        } else {
-          sys.error("Exception in compaction. Please check logs for more info.")
-        }
-    }
-    Seq.empty
-  }
-}
-
-case class CreateTable(cm: TableModel) extends RunnableCommand {
-
-  def run(sqlContext: SQLContext): Seq[Row] = {
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    cm.databaseName = getDB.getDatabaseName(cm.databaseNameOp, sqlContext)
-    val tbName = cm.tableName
-    val dbName = cm.databaseName
-    LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]")
-
-    val tableInfo: TableInfo = TableNewProcessor(cm)
-
-    // Add validation for sort scope when create table
-    val sortScope = tableInfo.getFactTable.getTableProperties
-      .getOrDefault("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
-    if (!CarbonUtil.isValidSortOption(sortScope)) {
-      throw new InvalidConfigurationException(s"Passing invalid SORT_SCOPE '$sortScope'," +
-        s" valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT', 'LOCAL_SORT' and 'GLOBAL_SORT' ")
-    }
-
-    if (tableInfo.getFactTable.getListOfColumns.isEmpty) {
-      sys.error("No Dimensions found. Table should have at least one dimesnion !")
-    }
-
-    if (sqlContext.tableNames(dbName).exists(_.equalsIgnoreCase(tbName))) {
-      if (!cm.ifNotExistsSet) {
-        LOGGER.audit(
-          s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " +
-          s"Table [$tbName] already exists under database [$dbName]")
-        sys.error(s"Table [$tbName] already exists under database [$dbName]")
-      }
-    } else {
-      // Add Database to catalog and persist
-      val catalog = CarbonEnv.get.carbonMetastore
-      // Need to fill partitioner class when we support partition
-      val tablePath = catalog.createTableFromThrift(tableInfo, dbName, tbName, null)(sqlContext)
-      try {
-        sqlContext.sql(
-          s"""CREATE TABLE $dbName.$tbName USING carbondata""" +
-          s""" OPTIONS (tableName "$dbName.$tbName", tablePath "$tablePath") """)
-          .collect
-      } catch {
-        case e: Exception =>
-          val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
-          // call the drop table to delete the created table.
-
-          CarbonEnv.get.carbonMetastore
-            .dropTable(catalog.storePath, identifier)(sqlContext)
-
-          LOGGER.audit(s"Table creation with Database name [$dbName] " +
-                       s"and Table name [$tbName] failed")
-          throw e
-      }
-
-      LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tbName]")
-    }
-
-    Seq.empty
-  }
-
-  def setV(ref: Any, name: String, value: Any): Unit = {
-    ref.getClass.getFields.find(_.getName == name).get
-      .set(ref, value.asInstanceOf[AnyRef])
-  }
-}
-
-private[sql] case class DeleteLoadsById(
-    loadids: Seq[String],
-    databaseNameOp: Option[String],
-    tableName: String) extends RunnableCommand {
-
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  def run(sqlContext: SQLContext): Seq[Row] = {
-    Checker.validateTableExists(databaseNameOp, tableName, sqlContext)
-    val carbonTable = CarbonEnv.get.carbonMetastore.lookupRelation1(databaseNameOp,
-      tableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
-    CarbonStore.deleteLoadById(
-      loadids,
-      getDB.getDatabaseName(databaseNameOp, sqlContext),
-      tableName,
-      carbonTable
-    )
-    Seq.empty
-
-  }
-
-  // validates load ids
-  private def validateLoadIds: Unit = {
-    if (loadids.isEmpty) {
-      val errorMessage = "Error: Segment id(s) should not be empty."
-      throw new MalformedCarbonCommandException(errorMessage)
-
-    }
-  }
-}
-
-private[sql] case class DeleteLoadsByLoadDate(
-    databaseNameOp: Option[String],
-    tableName: String,
-    dateField: String,
-    loadDate: String) extends RunnableCommand {
-
-  val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.tablemodel.tableSchema")
-
-  def run(sqlContext: SQLContext): Seq[Row] = {
-    Checker.validateTableExists(databaseNameOp, tableName, sqlContext)
-    val carbonTable = CarbonEnv.get.carbonMetastore.lookupRelation1(databaseNameOp,
-      tableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
-    CarbonStore.deleteLoadByDate(
-      loadDate,
-      getDB.getDatabaseName(databaseNameOp, sqlContext),
-      tableName,
-      carbonTable
-    )
-    Seq.empty
-
-  }
-
-}
-
-object LoadTable {
-
-  def updateTableMetadata(carbonLoadModel: CarbonLoadModel,
-      sqlContext: SQLContext,
-      model: DictionaryLoadModel,
-      noDictDimension: Array[CarbonDimension]): Unit = {
-
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation,
-      model.table)
-    val schemaFilePath = carbonTablePath.getSchemaFilePath
-
-    // read TableInfo
-    val tableInfo = CarbonMetastore.readSchemaFileToThriftTable(schemaFilePath)
-
-    // modify TableInfo
-    val columns = tableInfo.getFact_table.getTable_columns
-    for (i <- 0 until columns.size) {
-      if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId))) {
-        columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY)
-      }
-    }
-
-    // write TableInfo
-    CarbonMetastore.writeThriftTableToSchemaFile(schemaFilePath, tableInfo)
-
-    // update Metadata
-    val catalog = CarbonEnv.get.carbonMetastore
-    catalog.updateMetadataByThriftTable(schemaFilePath, tableInfo,
-      model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath)
-
-    // update CarbonDataLoadSchema
-    val carbonTable = catalog.lookupRelation1(Option(model.table.getDatabaseName),
-      model.table.getTableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
-    carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
-  }
-
-}
-
-private[sql] case class LoadTableByInsert(relation: CarbonDatasourceRelation,
-    child: LogicalPlan, isOverwriteExist: Boolean) extends RunnableCommand {
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-  def run(sqlContext: SQLContext): Seq[Row] = {
-    val df = new DataFrame(sqlContext, child)
-    val header = relation.carbonRelation.output.map(_.name).mkString(",")
-    val load = LoadTable(
-      Some(relation.carbonRelation.databaseName),
-      relation.carbonRelation.tableName,
-      null,
-      Seq(),
-      scala.collection.immutable.Map("fileheader" -> header),
-      isOverwriteExist,
-      null,
-      Some(df)).run(sqlContext)
-    // updating relation metadata. This is in case of auto detect high cardinality
-    relation.carbonRelation.metaData =
-      CarbonSparkUtil.createSparkMeta(relation.carbonRelation.tableMeta.carbonTable)
-    load
-  }
-}
-case class LoadTable(
-    databaseNameOp: Option[String],
-    tableName: String,
-    factPathFromUser: String,
-    dimFilesPath: Seq[DataLoadTableFileMapping],
-    options: scala.collection.immutable.Map[String, String],
-    isOverwriteExist: Boolean,
-    var inputSqlString: String = null,
-    dataFrame: Option[DataFrame] = None,
-    updateModel: Option[UpdateTableModel] = None) extends RunnableCommand {
-
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  private def checkDefaultValue(value: String, default: String) = if (StringUtils.isEmpty(value)) {
-    default
-  } else {
-    value
-  }
-
-  def run(sqlContext: SQLContext): Seq[Row] = {
-    if (dataFrame.isDefined && !updateModel.isDefined) {
-      val rdd = dataFrame.get.rdd
-      if (rdd.partitions == null || rdd.partitions.length == 0) {
-        LOGGER.warn("DataLoading finished. No data was loaded.")
-        return Seq.empty
-      }
-    }
-
-    val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
-    if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)) {
-      logError(s"Data loading failed. table not found: $dbName.$tableName")
-      LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
-      sys.error(s"Data loading failed. table not found: $dbName.$tableName")
-    }
-
-    val relation = CarbonEnv.get.carbonMetastore
-      .lookupRelation1(Option(dbName), tableName)(sqlContext)
-      .asInstanceOf[CarbonRelation]
-    if (relation == null) {
-      sys.error(s"Table $dbName.$tableName does not exist")
-    }
-    CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false")
-    val carbonLock = CarbonLockFactory
-      .getCarbonLockObj(relation.tableMeta.carbonTable.getAbsoluteTableIdentifier
-        .getCarbonTableIdentifier,
-        LockUsage.METADATA_LOCK
-      )
-    try {
-      // take lock only in case of normal data load.
-      if (!updateModel.isDefined) {
-        if (carbonLock.lockWithRetries()) {
-          logInfo("Successfully able to get the table metadata file lock")
-        } else {
-          sys.error("Table is locked for updation. Please try after some time")
-        }
-      }
-
-      val factPath = if (dataFrame.isDefined) {
-        ""
-      } else {
-        FileUtils.getPaths(
-          CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser))
-      }
-      val carbonLoadModel = new CarbonLoadModel()
-      carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
-      carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
-      carbonLoadModel.setStorePath(relation.tableMeta.storePath)
-
-      val table = relation.tableMeta.carbonTable
-      carbonLoadModel.setTableName(table.getFactTableName)
-      val dataLoadSchema = new CarbonDataLoadSchema(table)
-      // Need to fill dimension relation
-      carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
-
-      val partitionLocation = relation.tableMeta.storePath + "/partition/" +
-                              relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
-                              relation.tableMeta.carbonTableIdentifier.getTableName + "/"
-
-
-      val columnar = sqlContext.getConf("carbon.is.columnar.storage", "true").toBoolean
-
-      val delimiter = options.getOrElse("delimiter", ",")
-      val quoteChar = options.getOrElse("quotechar", "\"")
-      var fileHeader = options.getOrElse("fileheader", "")
-      val escapeChar = options.getOrElse("escapechar", "\\")
-      val commentchar = options.getOrElse("commentchar", "#")
-      val columnDict = options.getOrElse("columndict", null)
-      val serializationNullFormat = options.getOrElse("serialization_null_format", "\\N")
-      val badRecordsLoggerEnable = options.getOrElse("bad_records_logger_enable", "false")
-      val badRecordActionValue = CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
-          CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
-      val badRecordsAction = options.getOrElse("bad_records_action", badRecordActionValue)
-      val isEmptyDataBadRecord = options.getOrElse("is_empty_data_bad_record", "false")
-      val allDictionaryPath = options.getOrElse("all_dictionary_path", "")
-      val complex_delimiter_level_1 = options.getOrElse("complex_delimiter_level_1", "\\$")
-      val complex_delimiter_level_2 = options.getOrElse("complex_delimiter_level_2", "\\:")
-      val dateFormat = options.getOrElse("dateformat", null)
-      ValidateUtil.validateDateFormat(dateFormat, table, tableName)
-      val maxColumns = options.getOrElse("maxcolumns", null)
-      val tableProperties = table.getTableInfo.getFactTable.getTableProperties
-      val sortScopeDefault = CarbonProperties.getInstance().
-        getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
-          CarbonProperties.getInstance().getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
-            CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
-      val sortScope = if (null == tableProperties) {
-        sortScopeDefault
-      } else {
-        tableProperties.getOrDefault("sort_scope", sortScopeDefault)
-      }
-
-      ValidateUtil.validateSortScope(table, sortScope)
-      val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
-      val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb", carbonProperty
-        .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
-        carbonProperty.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
-          CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)))
-      val globalSortPartitions = options.getOrElse("global_sort_partitions", carbonProperty
-        .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, null))
-      ValidateUtil.validateGlobalSortPartitions(globalSortPartitions)
-
-      // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option,
-      // we should use table schema to generate file header.
-      val headerOption = options.get("header")
-      if (headerOption.isDefined) {
-        // whether the csv file has file header
-        // the default value is true
-        val header = try {
-          headerOption.get.toBoolean
-        } catch {
-          case ex: IllegalArgumentException =>
-            throw new MalformedCarbonCommandException(
-              "'header' option should be either 'true' or 'false'. " + ex.getMessage)
-        }
-        header match {
-          case true =>
-            if (fileHeader.nonEmpty) {
-              throw new MalformedCarbonCommandException(
-                "When 'header' option is true, 'fileheader' option is not required.")
-            }
-          case false =>
-            // generate file header
-            if (fileHeader.isEmpty) {
-              fileHeader = table.getCreateOrderColumn(table.getFactTableName)
-                .asScala.map(_.getColName).mkString(",")
-            }
-        }
-      }
-
-      val bad_record_path = options.getOrElse("bad_record_path",
-          CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-            CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
-      if (badRecordsLoggerEnable.toBoolean ||
-          LoggerAction.REDIRECT.name().equalsIgnoreCase(badRecordsAction)) {
-        if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
-          sys.error("Invalid bad records location.")
-        }
-      }
-      carbonLoadModel.setBadRecordsLocation(bad_record_path)
-      carbonLoadModel.setEscapeChar(checkDefaultValue(escapeChar, "\\"))
-      carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\""))
-      carbonLoadModel.setCommentChar(checkDefaultValue(commentchar, "#"))
-      carbonLoadModel.setDateFormat(dateFormat)
-      carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
-        CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
-        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
-      carbonLoadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty(
-        CarbonCommonConstants.CARBON_DATE_FORMAT,
-        CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
-      carbonLoadModel
-        .setSerializationNullFormat(
-          TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," + serializationNullFormat)
-      carbonLoadModel
-        .setBadRecordsLoggerEnable(
-          TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + badRecordsLoggerEnable)
-      carbonLoadModel
-        .setBadRecordsAction(
-          TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + badRecordsAction)
-      carbonLoadModel
-        .setIsEmptyDataBadRecord(
-          DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + isEmptyDataBadRecord)
-      carbonLoadModel.setSortScope(sortScope)
-      carbonLoadModel.setBatchSortSizeInMb(batchSortSizeInMB)
-      carbonLoadModel.setGlobalSortPartitions(globalSortPartitions)
-      // when single_pass=true, and not use all dict
-      val useOnePass = options.getOrElse("single_pass", "false").trim.toLowerCase match {
-        case "true" =>
-          true
-        case "false" =>
-          // when single_pass = false  and if either alldictionary
-          // or columnDict is configured the do not allow load
-          if (StringUtils.isNotEmpty(allDictionaryPath) || StringUtils.isNotEmpty(columnDict)) {
-            throw new MalformedCarbonCommandException(
-              "Can not use all_dictionary_path or columndict without single_pass.")
-          } else {
-            false
-          }
-        case illegal =>
-          LOGGER.error(s"Can't use single_pass, because illegal syntax found: [" + illegal + "] " +
-                       "Please set it as 'true' or 'false'")
-          false
-      }
-      carbonLoadModel.setUseOnePass(useOnePass)
-
-      if (delimiter.equalsIgnoreCase(complex_delimiter_level_1) ||
-          complex_delimiter_level_1.equalsIgnoreCase(complex_delimiter_level_2) ||
-          delimiter.equalsIgnoreCase(complex_delimiter_level_2)) {
-        sys.error(s"Field Delimiter & Complex types delimiter are same")
-      }
-      else {
-        carbonLoadModel.setComplexDelimiterLevel1(
-          CarbonUtil.delimiterConverter(complex_delimiter_level_1))
-        carbonLoadModel.setComplexDelimiterLevel2(
-          CarbonUtil.delimiterConverter(complex_delimiter_level_2))
-      }
-      // set local dictionary path, and dictionary file extension
-      carbonLoadModel.setAllDictPath(allDictionaryPath)
-
-      val partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
-
-      try {
-        // First system has to partition the data first and then call the load data
-        LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
-        carbonLoadModel.setFactFilePath(factPath)
-        carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimiter))
-        carbonLoadModel.setCsvHeader(fileHeader)
-        carbonLoadModel.setColDictFilePath(columnDict)
-        carbonLoadModel.setDirectLoad(true)
-        carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
-        val validatedMaxColumns = CommonUtil.validateMaxColumns(carbonLoadModel.getCsvHeaderColumns,
-          maxColumns)
-        carbonLoadModel.setMaxColumns(validatedMaxColumns.toString)
-        GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata
-        val storePath = relation.tableMeta.storePath
-        val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-        val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-          .getCarbonTableIdentifier
-        val carbonTablePath = CarbonStorePath
-          .getCarbonTablePath(storePath, carbonTableIdentifier)
-        val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
-        val dimensions = carbonTable.getDimensionByTableName(
-          carbonTable.getFactTableName).asScala.toArray
-        // add the start entry for the new load in the table status file
-        if (!updateModel.isDefined) {
-          CommonUtil.
-            readAndUpdateLoadProgressInTableMeta(carbonLoadModel, storePath, isOverwriteExist)
-        }
-        if (isOverwriteExist) {
-          LOGGER.info(s"Overwrite is in progress for carbon table with $dbName.$tableName")
-        }
-        if (null == carbonLoadModel.getLoadMetadataDetails) {
-          CommonUtil.readLoadMetadataDetails(carbonLoadModel)
-        }
-        if (carbonLoadModel.getLoadMetadataDetails.isEmpty && carbonLoadModel.getUseOnePass &&
-            StringUtils.isEmpty(columnDict) && StringUtils.isEmpty(allDictionaryPath)) {
-          LOGGER.info(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
-          LOGGER.audit(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
-          carbonLoadModel.setUseOnePass(false)
-        }
-        if (carbonLoadModel.getUseOnePass) {
-          val colDictFilePath = carbonLoadModel.getColDictFilePath
-          if (!StringUtils.isEmpty(colDictFilePath)) {
-            carbonLoadModel.initPredefDictMap()
-            // generate predefined dictionary
-            GlobalDictionaryUtil
-              .generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier,
-                dimensions, carbonLoadModel, sqlContext, storePath, dictFolderPath)
-          }
-          val allDictPath: String = carbonLoadModel.getAllDictPath
-          if(!StringUtils.isEmpty(allDictPath)) {
-            carbonLoadModel.initPredefDictMap()
-            GlobalDictionaryUtil
-              .generateDictionaryFromDictionaryFiles(sqlContext,
-                carbonLoadModel,
-                storePath,
-                carbonTableIdentifier,
-                dictFolderPath,
-                dimensions,
-                allDictionaryPath)
-          }
-          // dictionaryServerClient dictionary generator
-          val dictionaryServerPort = CarbonProperties.getInstance()
-            .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
-              CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
-          val sparkDriverHost = sqlContext.sparkContext.getConf.get("spark.driver.host")
-          carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
-          // start dictionary server when use one pass load and dimension with DICTIONARY
-          // encoding is present.
-          val allDimensions = table.getAllDimensions.asScala.toList
-          val createDictionary = allDimensions.exists {
-            carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
-              !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
-          }
-          val server: Option[DictionaryServer] = if (createDictionary) {
-            val dictionaryServer = DictionaryServer
-              .getInstance(dictionaryServerPort.toInt, carbonTable)
-            carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
-            sqlContext.sparkContext.addSparkListener(new SparkListener() {
-              override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
-                dictionaryServer.shutdown()
-              }
-            })
-            Some(dictionaryServer)
-          } else {
-            None
-          }
-          CarbonDataRDDFactory.loadCarbonData(sqlContext,
-            carbonLoadModel,
-            relation.tableMeta.storePath,
-            columnar,
-            partitionStatus,
-            server,
-            isOverwriteExist,
-            dataFrame,
-            updateModel)
-        } else {
-          val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
-            val fields = dataFrame.get.schema.fields
-            import org.apache.spark.sql.functions.udf
-            // extracting only segment from tupleId
-            val getSegIdUDF = udf((tupleId: String) =>
-              CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
-            // getting all fields except tupleId field as it is not required in the value
-            var otherFields = fields.toSeq
-              .filter(field => !field.name
-                .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
-              .map(field => {
-                if (field.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION) && false) {
-                  new Column(field.name
-                    .substring(0,
-                      field.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION)))
-                } else {
-
-                  new Column(field.name)
-                }
-              })
-
-            // extract tupleId field which will be used as a key
-            val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
-              .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).
-              as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID)
-            // use dataFrameWithoutTupleId as dictionaryDataFrame
-            val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
-            otherFields = otherFields :+ segIdColumn
-            // use dataFrameWithTupleId as loadDataFrame
-            val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
-            (Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
-          } else {
-            (dataFrame, dataFrame)
-          }
-          GlobalDictionaryUtil
-            .generateGlobalDictionary(sqlContext, carbonLoadModel, relation.tableMeta.storePath,
-              dictionaryDataFrame)
-          CarbonDataRDDFactory.loadCarbonData(sqlContext,
-            carbonLoadModel,
-            relation.tableMeta.storePath,
-            columnar,
-            partitionStatus,
-            None,
-            isOverwriteExist,
-            loadDataFrame,
-            updateModel)
-        }
-      } catch {
-        case CausedBy(ex: NoRetryException) =>
-          LOGGER.error(ex, s"Dataload failure for $dbName.$tableName")
-          throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
-        case ex: Exception =>
-          LOGGER.error(ex)
-          LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs")
-          throw ex
-      } finally {
-        // Once the data load is successful delete the unwanted partition files
-        try {
-          val fileType = FileFactory.getFileType(partitionLocation)
-          if (FileFactory.isFileExist(partitionLocation, fileType)) {
-            val file = FileFactory
-              .getCarbonFile(partitionLocation, fileType)
-            CarbonUtil.deleteFoldersAndFiles(file)
-          }
-        } catch {
-          case ex: Exception =>
-            LOGGER.error(ex)
-            LOGGER.audit(s"Dataload failure for $dbName.$tableName. " +
-                         "Problem deleting the partition folder")
-            throw ex
-        }
-
-      }
-    } catch {
-      case dle: DataLoadingException =>
-        LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + dle.getMessage)
-        throw dle
-      case mce: MalformedCarbonCommandException =>
-        LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + mce.getMessage)
-        throw mce
-    } finally {
-      if (carbonLock != null) {
-        if (carbonLock.unlock()) {
-          logInfo("Table MetaData Unlocked Successfully after data load")
-        } else {
-          logError("Unable to unlock Table MetaData")
-        }
-      }
-    }
-    Seq.empty
-  }
-
-  private def updateTableMetadata(carbonLoadModel: CarbonLoadModel,
-      sqlContext: SQLContext,
-      model: DictionaryLoadModel,
-      noDictDimension: Array[CarbonDimension]): Unit = {
-
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation,
-      model.table)
-    val schemaFilePath = carbonTablePath.getSchemaFilePath
-
-    // read TableInfo
-    val tableInfo = CarbonMetastore.readSchemaFileToThriftTable(schemaFilePath)
-
-    // modify TableInfo
-    val columns = tableInfo.getFact_table.getTable_columns
-    for (i <- 0 until columns.size) {
-      if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId))) {
-        columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY)
-      }
-    }
-
-    // write TableInfo
-    CarbonMetastore.writeThriftTableToSchemaFile(schemaFilePath, tableInfo)
-
-
-    val catalog = CarbonEnv.get.carbonMetastore
-
-    // upate the schema modified time
-    catalog.updateSchemasUpdatedTime(catalog.touchSchemaFileSystemTime(
-      carbonLoadModel.getDatabaseName,
-      carbonLoadModel.getTableName))
-
-    // update Metadata
-    catalog.updateMetadataByThriftTable(schemaFilePath, tableInfo,
-      model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath)
-
-    // update CarbonDataLoadSchema
-    val carbonTable = catalog.lookupRelation1(Option(model.table.getDatabaseName),
-      model.table.getTableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
-    carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
-  }
-
-}
-
-private[sql] case class DropTableCommand(ifExistsSet: Boolean, databaseNameOp: Option[String],
-    tableName: String)
-  extends RunnableCommand {
-
-  def run(sqlContext: SQLContext): Seq[Row] = {
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
-    val identifier = TableIdentifier(tableName, Option(dbName))
-    val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
-    val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK)
-    val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
-    val catalog = CarbonEnv.get.carbonMetastore
-    val storePath = catalog.storePath
-    try {
-      locksToBeAcquired foreach {
-        lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTableIdentifier, lock)
-      }
-      LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
-      CarbonEnv.get.carbonMetastore.dropTable(storePath, identifier)(sqlContext)
-      LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
-    } catch {
-      case ex: Exception =>
-        LOGGER.error(ex, s"Dropping table $dbName.$tableName failed")
-        sys.error(s"Dropping table $dbName.$tableName failed: ${ex.getMessage}")
-    } finally {
-      if (carbonLocks.nonEmpty) {
-        val unlocked = carbonLocks.forall(_.unlock())
-        if (unlocked) {
-          logInfo("Table MetaData Unlocked Successfully")
-          // deleting any remaining files.
-          val metadataFilePath = CarbonStorePath
-            .getCarbonTablePath(storePath, carbonTableIdentifier).getMetadataDirectoryPath
-          val fileType = FileFactory.getFileType(metadataFilePath)
-          if (FileFactory.isFileExist(metadataFilePath, fileType)) {
-            val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
-            CarbonUtil.deleteFoldersAndFiles(file.getParentFile)
-          }
-        }
-      }
-    }
-    Seq.empty
-  }
-}
-
-private[sql] case class ShowLoads(
-    databaseNameOp: Option[String],
-    tableName: String,
-    limit: Option[String],
-    override val output: Seq[Attribute]) extends RunnableCommand {
-
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    Checker.validateTableExists(databaseNameOp, tableName, sqlContext)
-    val carbonTable = CarbonEnv.get.carbonMetastore.lookupRelation1(databaseNameOp,
-      tableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
-    CarbonStore.showSegments(
-      getDB.getDatabaseName(databaseNameOp, sqlContext),
-      tableName,
-      limit,
-      carbonTable.getMetaDataFilepath
-    )
-  }
-}
-
-private[sql] case class DescribeCommandFormatted(
-    child: SparkPlan,
-    override val output: Seq[Attribute],
-    tblIdentifier: TableIdentifier)
-  extends RunnableCommand {
-
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val relation = CarbonEnv.get.carbonMetastore
-      .lookupRelation1(tblIdentifier)(sqlContext).asInstanceOf[CarbonRelation]
-    val mapper = new ObjectMapper()
-    val colProps = StringBuilder.newBuilder
-    var results: Seq[(String, String, String)] = child.schema.fields.map { field =>
-      val comment = if (relation.metaData.dims.contains(field.name)) {
-        val dimension = relation.metaData.carbonTable.getDimensionByName(
-          relation.tableMeta.carbonTableIdentifier.getTableName,
-          field.name)
-        if (null != dimension.getColumnProperties && dimension.getColumnProperties.size() > 0) {
-          colProps.append(field.name).append(".")
-            .append(mapper.writeValueAsString(dimension.getColumnProperties))
-            .append(",")
-        }
-        if (dimension.hasEncoding(Encoding.DICTIONARY)) {
-          "DICTIONARY, KEY COLUMN" + (dimension.hasEncoding(Encoding.INVERTED_INDEX) match {
-                      case false => ",NOINVERTEDINDEX"
-                      case _ => ""
-                    })
-        } else {
-          "KEY COLUMN" + (dimension.hasEncoding(Encoding.INVERTED_INDEX) match {
-            case false => ",NOINVERTEDINDEX"
-            case _ => ""
-          })
-        }
-      } else {
-        "MEASURE"
-      }
-      (field.name, field.dataType.simpleString, comment)
-    }
-    val colPropStr = if (colProps.toString().trim().length() > 0) {
-      // drops additional comma at endpom
-      colProps.toString().dropRight(1)
-    } else {
-      colProps.toString()
-    }
-    results ++= Seq(("", "", ""), ("##Detailed Table Information", "", ""))
-    results ++= Seq(("Database Name: ", relation.tableMeta.carbonTableIdentifier
-      .getDatabaseName, "")
-    )
-    results ++= Seq(("Table Name: ", relation.tableMeta.carbonTableIdentifier.getTableName, ""))
-    results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, ""))
-    val carbonTable = relation.tableMeta.carbonTable
-    results ++= Seq(("Table Block Size : ", carbonTable.getBlockSizeInMB + " MB", ""))
-    results ++= Seq(("SORT_SCOPE", carbonTable.getTableInfo.getFactTable
-      .getTableProperties.getOrDefault("sort_scope", CarbonCommonConstants
-      .LOAD_SORT_SCOPE_DEFAULT), CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
-    results ++= Seq(("", "", ""), ("##Detailed Column property", "", ""))
-    if (colPropStr.length() > 0) {
-      results ++= Seq((colPropStr, "", ""))
-    } else {
-      results ++= Seq(("ADAPTIVE", "", ""))
-    }
-    results ++= Seq(("SORT_COLUMNS", relation.metaData.carbonTable.getSortColumns(
-      relation.tableMeta.carbonTableIdentifier.getTableName).asScala
-      .map(column => column).mkString(","), ""))
-    val dimension = carbonTable
-      .getDimensionByTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
-    results ++= getColumnGroups(dimension.asScala.toList)
-    if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) {
-      results ++=
-      Seq(("Partition Columns: ", carbonTable.getPartitionInfo(carbonTable.getFactTableName)
-        .getColumnSchemaList.asScala.map(_.getColumnName).mkString(","), ""))
-    }
-    results.map { case (name, dataType, comment) =>
-      Row(f"$name%-36s $dataType%-80s $comment%-72s")
-    }
-  }
-
-  private def getColumnGroups(dimensions: List[CarbonDimension]): Seq[(String, String, String)] = {
-    var results: Seq[(String, String, String)] =
-      Seq(("", "", ""), ("##Column Group Information", "", ""))
-    val groupedDimensions = dimensions.groupBy(x => x.columnGroupId()).filter {
-      case (groupId, _) => groupId != -1
-    }.toSeq.sortBy(_._1)
-    val groups = groupedDimensions.map(colGroups => {
-      colGroups._2.map(dim => dim.getColName).mkString(", ")
-    })
-    var index = 1
-    groups.foreach { x =>
-      results = results :+ (s"Column Group $index", x, "")
-      index = index + 1
-    }
-    results
-  }
-}
-
-private[sql] case class DeleteLoadByDate(
-    databaseNameOp: Option[String],
-    tableName: String,
-    dateField: String,
-    dateValue: String
-) extends RunnableCommand {
-
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  def run(sqlContext: SQLContext): Seq[Row] = {
-    val dbName = getDB.getDatabaseName(databaseNameOp, sqlContext)
-    LOGGER.audit(s"The delete load by date request has been received for $dbName.$tableName")
-    val identifier = TableIdentifier(tableName, Option(dbName))
-    val relation = CarbonEnv.get.carbonMetastore
-      .lookupRelation1(identifier)(sqlContext).asInstanceOf[CarbonRelation]
-    var level: String = ""
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName)
-    if (relation == null) {
-      LOGGER.audit(s"The delete load by date is failed. Table $dbName.$tableName does not exist")
-      sys.error(s"Table $dbName.$tableName does not exist")
-    }
-    val matches: Seq[AttributeReference] = relation.dimensionsAttr.filter(
-      filter => filter.name.equalsIgnoreCase(dateField) &&
-                filter.dataType.isInstanceOf[TimestampType]).toList
-    if (matches.isEmpty) {
-      LOGGER.audit("The delete load by date is failed. " +
-                   s"Table $dbName.$tableName does not contain date field: $dateField")
-      sys.error(s"Table $dbName.$tableName does not contain date field $dateField")
-    } else {
-      level = matches.asJava.get(0).name
-    }
-    val actualColName = relation.metaData.carbonTable.getDimensionByName(tableName, level)
-      .getColName
-    DataManagementFunc.deleteLoadByDate(
-      sqlContext,
-      new CarbonDataLoadSchema(carbonTable),
-      dbName,
-      tableName,
-      CarbonEnv.get.carbonMetastore.storePath,
-      level,
-      actualColName,
-      dateValue)
-    LOGGER.audit(s"The delete load by date $dateValue is successful for $dbName.$tableName.")
-    Seq.empty
-  }
-
-}
-
-private[sql] case class CleanFiles(
-    databaseNameOp: Option[String],
-    tableName: String) extends RunnableCommand {
-
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  def run(sqlContext: SQLContext): Seq[Row] = {
-    Checker.validateTableExists(databaseNameOp, tableName, sqlContext)
-    val carbonTable = CarbonEnv.get.carbonMetastore.lookupRelation1(databaseNameOp,
-      tableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
-    CarbonStore.cleanFiles(
-      getDB.getDatabaseName(databaseNameOp, sqlContext),
-      tableName,
-      sqlContext.asInstanceOf[CarbonContext].storePath,
-      carbonTable,
-      false)
-    Seq.empty
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
deleted file mode 100644
index d23b18f..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit
-import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
-import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
-import org.apache.spark.sql.catalyst.expressions.Alias
-import org.apache.spark.sql.catalyst.plans.Inner
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.execution.command.ProjectForDeleteCommand
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-
-/**
- * Insert into carbon table from other source
- */
-object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan.transform {
-    // Wait until children are resolved.
-    case p: LogicalPlan if !p.childrenResolved => p
-
-    case p @ InsertIntoTable(relation: LogicalRelation, _, child, _, _)
-      if relation.relation.isInstanceOf[CarbonDatasourceRelation] =>
-      castChildOutput(p, relation.relation.asInstanceOf[CarbonDatasourceRelation], child)
-  }
-
-  def castChildOutput(p: InsertIntoTable, relation: CarbonDatasourceRelation, child: LogicalPlan)
-  : LogicalPlan = {
-    if (relation.carbonRelation.output.size > CarbonCommonConstants
-      .DEFAULT_MAX_NUMBER_OF_COLUMNS) {
-      sys
-        .error("Maximum supported column by carbon is:" + CarbonCommonConstants
-          .DEFAULT_MAX_NUMBER_OF_COLUMNS
-        )
-    }
-    if (child.output.size >= relation.carbonRelation.output.size ) {
-      InsertIntoCarbonTable(relation, p.partition, p.child, p.overwrite, p.ifNotExists)
-    } else {
-      sys.error("Cannot insert into target table because column number are different")
-    }
-  }
-}
-
-
-object CarbonIUDAnalysisRule extends Rule[LogicalPlan] {
-
-  var sqlContext: SQLContext = _
-
-  def init(sqlContext: SQLContext) {
-    this.sqlContext = sqlContext
-  }
-
-  private def processUpdateQuery(
-                   table: UnresolvedRelation,
-                   columns: List[String],
-                   selectStmt: String,
-                   filter: String): LogicalPlan = {
-    var includedDestColumns = false
-    var includedDestRelation = false
-    var addedTupleId = false
-
-    def prepareTargetReleation(relation: UnresolvedRelation): Subquery = {
-      val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId",
-        Seq.empty, isDistinct = false), "tupleId")())
-      val projList = Seq(
-        UnresolvedAlias(UnresolvedStar(table.alias)), tupleId)
-      // include tuple id and rest of the required columns in subqury
-      Subquery(table.alias.getOrElse(""), Project(projList, relation))
-    }
-    // get the un-analyzed logical plan
-    val targetTable = prepareTargetReleation(table)
-    val selectPlan = org.apache.spark.sql.SQLParser.parse(selectStmt, sqlContext) transform {
-      case Project(projectList, child) if (!includedDestColumns) =>
-        includedDestColumns = true
-        if (projectList.size != columns.size) {
-          sys.error("Number of source and destination columns are not matching")
-        }
-        val renamedProjectList = projectList.zip(columns).map{ case(attr, col) =>
-          attr match {
-            case UnresolvedAlias(child) =>
-              UnresolvedAlias(Alias(child, col + "-updatedColumn")())
-            case _ => attr
-          }
-        }
-        val list = Seq(
-          UnresolvedAlias(UnresolvedStar(table.alias))) ++ renamedProjectList
-        Project(list, child)
-      case Filter(cond, child) if (!includedDestRelation) =>
-        includedDestRelation = true
-        Filter(cond, Join(child, targetTable, Inner, None))
-      case r @ UnresolvedRelation(t, a) if (!includedDestRelation &&
-        t != table.tableIdentifier) =>
-        includedDestRelation = true
-        Join(r, targetTable, Inner, None)
-    }
-    val updatedSelectPlan = if (!includedDestRelation) {
-      // special case to handle self join queries
-      // Eg. update tableName  SET (column1) = (column1+1)
-      selectPlan transform {
-        case relation: UnresolvedRelation if (table.tableIdentifier == relation.tableIdentifier &&
-          addedTupleId == false) =>
-          addedTupleId = true
-          targetTable
-      }
-    } else {
-      selectPlan
-    }
-    val finalPlan = if (filter.length > 0) {
-      val alias = table.alias.getOrElse("")
-      var transformed: Boolean = false
-      // Create a dummy projection to include filter conditions
-      SQLParser.parse("select * from  " +
-        table.tableIdentifier.mkString(".") + " " + alias + " " + filter, sqlContext)  transform {
-        case UnresolvedRelation(t, Some(a)) if (
-          !transformed && t == table.tableIdentifier && a == alias) =>
-          transformed = true
-          // Add the filter condition of update statement  on destination table
-          Subquery(alias, updatedSelectPlan)
-      }
-    } else {
-      updatedSelectPlan
-    }
-    val tid = CarbonTableIdentifierImplicit.toTableIdentifier(table.tableIdentifier)
-    val tidSeq = Seq(getDB.getDatabaseName(tid.database, sqlContext), tid.table)
-    val destinationTable = UnresolvedRelation(tidSeq, table.alias)
-    ProjectForUpdate(destinationTable, columns, Seq(finalPlan))
-  }
-
-  def processDeleteRecordsQuery(selectStmt: String, table: UnresolvedRelation): LogicalPlan = {
-    val tid = CarbonTableIdentifierImplicit.toTableIdentifier(table.tableIdentifier)
-    val tidSeq = Seq(getDB.getDatabaseName(tid.database, sqlContext), tid.table)
-    var addedTupleId = false
-    val selectPlan = SQLParser.parse(selectStmt, sqlContext) transform {
-      case relation: UnresolvedRelation if (table.tableIdentifier == relation.tableIdentifier &&
-        addedTupleId == false) =>
-        addedTupleId = true
-        val tupleId = UnresolvedAlias(Alias(UnresolvedFunction("getTupleId",
-          Seq.empty, isDistinct = false), "tupleId")())
-        val projList = Seq(
-          UnresolvedAlias(UnresolvedStar(table.alias)), tupleId)
-        // include tuple id in subqury
-        Project(projList, relation)
-    }
-    ProjectForDeleteCommand(
-      selectPlan,
-      tidSeq,
-      System.currentTimeMillis().toString)
-  }
-
-  override def apply(logicalplan: LogicalPlan): LogicalPlan = {
-
-    logicalplan transform {
-      case UpdateTable(t, cols, sel, where) => processUpdateQuery(t, cols, sel, where)
-      case DeleteRecords(statement, table) => processDeleteRecordsQuery(statement, table)
-    }
-  }
-}