You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/11/29 10:41:13 UTC

[2/4] carbondata git commit: [CARBONDATA-1815][PreAgg] Add AtomicRunnableCommand abstraction

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala
index e2bfbdc..4e983a2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala
@@ -41,15 +41,66 @@ object Checker {
 }
 
 /**
- * Interface for command that modifies schema
+ * Operation that modifies metadata(schema, table_status, etc)
  */
-trait SchemaProcessCommand {
-  def processSchema(sparkSession: SparkSession): Seq[Row]
+trait MetadataProcessOpeation {
+  def processMetadata(sparkSession: SparkSession): Seq[Row]
 }
 
 /**
- * Interface for command that need to process data in file system
+ * Operation that process data
  */
-trait DataProcessCommand {
+trait DataProcessOperation {
   def processData(sparkSession: SparkSession): Seq[Row]
 }
+
+/**
+ * Command that modifies metadata(schema, table_status, etc) only without processing data
+ */
+abstract class MetadataCommand extends RunnableCommand with MetadataProcessOpeation {
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    processMetadata(sparkSession)
+  }
+}
+
+/**
+ * Command that process data only without modifying metadata
+ */
+abstract class DataCommand extends RunnableCommand with DataProcessOperation {
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    processData(sparkSession)
+  }
+}
+
+/**
+ * Subclass of this command is executed in an atomic fashion, either all or nothing.
+ * Subclass need to process both metadata and data, processMetadata should be undoable
+ * if process data failed.
+ */
+abstract class AtomicRunnableCommand
+  extends RunnableCommand with MetadataProcessOpeation with DataProcessOperation {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    processMetadata(sparkSession)
+    try {
+      processData(sparkSession)
+    } catch {
+      case e: Exception =>
+        undoMetadata(sparkSession, e)
+        throw e
+    }
+  }
+
+  /**
+   * Developer should override this function to undo the changes in processMetadata.
+   * @param sparkSession spark session
+   * @param exception exception raised when processing data
+   * @return rows to return to spark
+   */
+  def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = {
+    val msg = s"Got exception $exception when processing data. " +
+              s"But this command does not support undo yet, skipping the undo part."
+    LogServiceFactory.getLogService(this.getClass.getCanonicalName).error(msg)
+    Seq.empty
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
deleted file mode 100644
index db87fc8..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.partition
-
-import java.util
-import java.util.concurrent.{Executors, ExecutorService, Future}
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
-import org.apache.spark.sql.execution.command.{AlterTableDropPartitionModel, DataProcessCommand, DropPartitionCallableModel, RunnableCommand, SchemaProcessCommand}
-import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.util.AlterTableUtil
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.cache.CacheProvider
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
-import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema.partition.PartitionType
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
-import org.apache.carbondata.spark.partition.DropPartitionCallable
-
-case class AlterTableDropCarbonPartitionCommand(
-    model: AlterTableDropPartitionModel)
-  extends RunnableCommand with DataProcessCommand with SchemaProcessCommand {
-
-  private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
-  private val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]()
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    if (model.partitionId.equals("0")) {
-      sys.error(s"Cannot drop default partition! Please use delete statement!")
-    }
-    processSchema(sparkSession)
-    processData(sparkSession)
-    Seq.empty
-  }
-
-  override def processSchema(sparkSession: SparkSession): Seq[Row] = {
-    val dbName = model.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
-    val tableName = model.tableName
-    val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
-      .asInstanceOf[CarbonRelation]
-    val tablePath = relation.carbonTable.getTablePath
-    carbonMetaStore.checkSchemasModifiedTimeAndReloadTables()
-    if (relation == null) {
-      sys.error(s"Table $dbName.$tableName does not exist")
-    }
-    if (null == CarbonMetadata.getInstance.getCarbonTable(dbName, tableName)) {
-      LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
-      sys.error(s"Alter table failed. table not found: $dbName.$tableName")
-    }
-    val table = relation.carbonTable
-    val partitionInfo = table.getPartitionInfo(tableName)
-    if (partitionInfo == null) {
-      sys.error(s"Table $tableName is not a partition table.")
-    }
-    val partitionIds = partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList
-    // keep a copy of partitionIdList before update partitionInfo.
-    // will be used in partition data scan
-    oldPartitionIds.addAll(partitionIds.asJava)
-    val partitionIndex = partitionIds.indexOf(Integer.valueOf(model.partitionId))
-    partitionInfo.getPartitionType match {
-      case PartitionType.HASH => sys.error(s"Hash partition cannot be dropped!")
-      case PartitionType.RANGE =>
-        val rangeInfo = new util.ArrayList(partitionInfo.getRangeInfo)
-        val rangeToRemove = partitionInfo.getRangeInfo.get(partitionIndex - 1)
-        rangeInfo.remove(rangeToRemove)
-        partitionInfo.setRangeInfo(rangeInfo)
-      case PartitionType.LIST =>
-        val listInfo = new util.ArrayList(partitionInfo.getListInfo)
-        val listToRemove = partitionInfo.getListInfo.get(partitionIndex - 1)
-        listInfo.remove(listToRemove)
-        partitionInfo.setListInfo(listInfo)
-      case PartitionType.RANGE_INTERVAL =>
-        sys.error(s"Dropping range interval partition isn't support yet!")
-    }
-    partitionInfo.dropPartition(partitionIndex)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
-    val schemaFilePath = carbonTablePath.getSchemaFilePath
-    // read TableInfo
-    val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
-
-    val schemaConverter = new ThriftWrapperSchemaConverterImpl()
-    val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
-      dbName, tableName, tablePath)
-    val tableSchema = wrapperTableInfo.getFactTable
-    tableSchema.setPartitionInfo(partitionInfo)
-    wrapperTableInfo.setFactTable(tableSchema)
-    wrapperTableInfo.setLastUpdatedTime(System.currentTimeMillis())
-    val thriftTable =
-      schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
-    thriftTable.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
-      .setTime_stamp(System.currentTimeMillis)
-    carbonMetaStore.updateMetadataByThriftTable(schemaFilePath, thriftTable,
-      dbName, tableName, tablePath)
-    CarbonUtil.writeThriftTableToSchemaFile(schemaFilePath, thriftTable)
-    // update the schema modified time
-    carbonMetaStore.updateAndTouchSchemasUpdatedTime()
-    // sparkSession.catalog.refreshTable(tableName)
-    Seq.empty
-  }
-
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
-    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
-    val dbName = model.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
-    val tableName = model.tableName
-    var locks = List.empty[ICarbonLock]
-    var success = false
-    try {
-      val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
-        LockUsage.COMPACTION_LOCK,
-        LockUsage.DELETE_SEGMENT_LOCK,
-        LockUsage.DROP_TABLE_LOCK,
-        LockUsage.CLEAN_FILES_LOCK,
-        LockUsage.ALTER_PARTITION_LOCK)
-      locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName,
-        locksToBeAcquired)(sparkSession)
-      val carbonLoadModel = new CarbonLoadModel()
-      val table = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
-      val dataLoadSchema = new CarbonDataLoadSchema(table)
-      // Need to fill dimension relation
-      carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
-      carbonLoadModel.setTableName(table.getTableName)
-      carbonLoadModel.setDatabaseName(table.getDatabaseName)
-      carbonLoadModel.setTablePath(table.getTablePath)
-      val loadStartTime = CarbonUpdateUtil.readCurrentTime
-      carbonLoadModel.setFactTimeStamp(loadStartTime)
-      alterTableDropPartition(
-        sparkSession.sqlContext,
-        model.partitionId,
-        carbonLoadModel,
-        model.dropWithData,
-        oldPartitionIds.asScala.toList
-      )
-      success = true
-    } catch {
-      case e: Exception =>
-        sys.error(s"Drop Partition failed. Please check logs for more info. ${ e.getMessage } ")
-        success = false
-    } finally {
-      CacheProvider.getInstance().dropAllCache()
-      AlterTableUtil.releaseLocks(locks)
-      LOGGER.info("Locks released after alter table drop partition action.")
-      LOGGER.audit("Locks released after alter table drop partition action.")
-    }
-    LOGGER.info(s"Alter table drop partition is successful for table $dbName.$tableName")
-    LOGGER.audit(s"Alter table drop partition is successful for table $dbName.$tableName")
-    Seq.empty
-  }
-
-  private def alterTableDropPartition(sqlContext: SQLContext,
-      partitionId: String,
-      carbonLoadModel: CarbonLoadModel,
-      dropWithData: Boolean,
-      oldPartitionIds: List[Int]): Unit = {
-    LOGGER.audit(s"Drop partition request received for table " +
-                 s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-    try {
-      startDropThreads(
-        sqlContext,
-        carbonLoadModel,
-        partitionId,
-        dropWithData,
-        oldPartitionIds)
-    } catch {
-      case e: Exception =>
-        LOGGER.error(s"Exception in start dropping partition thread. ${ e.getMessage }")
-        throw e
-    }
-  }
-
-  private def startDropThreads(sqlContext: SQLContext,
-      carbonLoadModel: CarbonLoadModel,
-      partitionId: String,
-      dropWithData: Boolean,
-      oldPartitionIds: List[Int]): Unit = {
-    val numberOfCores = CarbonProperties.getInstance().getProperty(
-      CarbonCommonConstants.NUM_CORES_ALT_PARTITION,
-        CarbonCommonConstants.DEFAULT_NUMBER_CORES)
-    val executor : ExecutorService = Executors.newFixedThreadPool(numberOfCores.toInt)
-    try {
-      val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-      val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-      val segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier)
-      val validSegments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala
-      val threadArray: Array[Thread] = new Array[Thread](validSegments.size)
-      var i = 0
-      for (segmentId: String <- validSegments) {
-        threadArray(i) = dropPartitionThread(sqlContext, carbonLoadModel, executor,
-          segmentId, partitionId, dropWithData, oldPartitionIds)
-        threadArray(i).start()
-        i += 1
-      }
-      for (thread <- threadArray) {
-        thread.join()
-      }
-      val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath,
-        carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
-      val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
-      refresher.refreshSegments(validSegments.asJava)
-    } catch {
-      case e: Exception =>
-        LOGGER.error(s"Exception when dropping partition: ${ e.getMessage }")
-    } finally {
-      executor.shutdown()
-      try {
-        CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false)
-      } catch {
-        case e: Exception =>
-          LOGGER.error(s"Exception in dropping partition thread while deleting partial load file" +
-                       s" ${ e.getMessage }")
-      }
-    }
-  }
-}
-
-case class dropPartitionThread(sqlContext: SQLContext,
-    carbonLoadModel: CarbonLoadModel,
-    executor: ExecutorService,
-    segmentId: String,
-    partitionId: String,
-    dropWithData: Boolean,
-    oldPartitionIds: List[Int]) extends Thread {
-
-  private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
-
-  override def run(): Unit = {
-    try {
-      executeDroppingPartition(sqlContext, carbonLoadModel, executor,
-        segmentId, partitionId, dropWithData, oldPartitionIds)
-    } catch {
-      case e: Exception =>
-        val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
-        LOGGER.error(s"Exception in dropping partition thread: ${ e.getMessage } }")
-    }
-  }
-
-  private def executeDroppingPartition(sqlContext: SQLContext,
-      carbonLoadModel: CarbonLoadModel,
-      executor: ExecutorService,
-      segmentId: String,
-      partitionId: String,
-      dropWithData: Boolean,
-      oldPartitionIds: List[Int]): Unit = {
-    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    val model = new DropPartitionCallableModel(carbonLoadModel,
-      segmentId, partitionId, oldPartitionIds, dropWithData, carbonTable, sqlContext)
-    val future: Future[Void] = executor.submit(new DropPartitionCallable(model))
-    try {
-      future.get
-    } catch {
-      case e: Exception =>
-        LOGGER.error(e, s"Exception in partition drop thread ${ e.getMessage }")
-        throw e
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
deleted file mode 100644
index 21871f3..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.partition
-
-import java.text.SimpleDateFormat
-import java.util
-import java.util.concurrent.{Executors, ExecutorService, Future}
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.{AlterTableSplitPartitionModel, DataProcessCommand, RunnableCommand, SchemaProcessCommand, SplitPartitionCallableModel}
-import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.util.{AlterTableUtil, PartitionUtils}
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.cache.CacheProvider
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
-import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema.PartitionInfo
-import org.apache.carbondata.core.metadata.schema.partition.PartitionType
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
-import org.apache.carbondata.spark.partition.SplitPartitionCallable
-
-/**
- * Command for Alter Table Add & Split partition
- * Add is a special case of Splitting the default partition (part0)
- */
-case class AlterTableSplitCarbonPartitionCommand(
-    splitPartitionModel: AlterTableSplitPartitionModel)
-  extends RunnableCommand with DataProcessCommand with SchemaProcessCommand {
-
-  private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
-  private val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]()
-
-  // TODO will add rollback function in case of process data failure
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    processSchema(sparkSession)
-    processData(sparkSession)
-  }
-
-  override def processSchema(sparkSession: SparkSession): Seq[Row] = {
-    val dbName = splitPartitionModel.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
-    val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    val tableName = splitPartitionModel.tableName
-    val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
-      .asInstanceOf[CarbonRelation]
-    val tablePath = relation.carbonTable.getTablePath
-    if (relation == null) {
-      sys.error(s"Table $dbName.$tableName does not exist")
-    }
-    carbonMetaStore.checkSchemasModifiedTimeAndReloadTables()
-    if (null == CarbonMetadata.getInstance.getCarbonTable(dbName, tableName)) {
-      LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
-      sys.error(s"Alter table failed. table not found: $dbName.$tableName")
-    }
-    val table = relation.carbonTable
-    val partitionInfo = table.getPartitionInfo(tableName)
-    val partitionIds = partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList
-    // keep a copy of partitionIdList before update partitionInfo.
-    // will be used in partition data scan
-    oldPartitionIds.addAll(partitionIds.asJava)
-
-    if (partitionInfo == null) {
-      sys.error(s"Table $tableName is not a partition table.")
-    }
-    if (partitionInfo.getPartitionType == PartitionType.HASH) {
-      sys.error(s"Hash partition table cannot be added or split!")
-    }
-
-    updatePartitionInfo(partitionInfo, partitionIds)
-
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
-    val schemaFilePath = carbonTablePath.getSchemaFilePath
-    // read TableInfo
-    val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
-    val schemaConverter = new ThriftWrapperSchemaConverterImpl()
-    val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
-      dbName, tableName, tablePath)
-    val tableSchema = wrapperTableInfo.getFactTable
-    tableSchema.setPartitionInfo(partitionInfo)
-    wrapperTableInfo.setFactTable(tableSchema)
-    wrapperTableInfo.setLastUpdatedTime(System.currentTimeMillis())
-    val thriftTable =
-      schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
-    carbonMetaStore.updateMetadataByThriftTable(schemaFilePath, thriftTable,
-      dbName, tableName, tablePath)
-    CarbonUtil.writeThriftTableToSchemaFile(schemaFilePath, thriftTable)
-    // update the schema modified time
-    carbonMetaStore.updateAndTouchSchemasUpdatedTime()
-    sparkSession.sessionState.catalog.refreshTable(TableIdentifier(tableName, Option(dbName)))
-    Seq.empty
-  }
-
-  private def updatePartitionInfo(partitionInfo: PartitionInfo, partitionIds: List[Int]): Unit = {
-    val dateFormatter = new SimpleDateFormat(CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
-        CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
-
-    val timestampFormatter = new SimpleDateFormat(CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
-        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
-
-    PartitionUtils.updatePartitionInfo(
-      partitionInfo,
-      partitionIds,
-      splitPartitionModel.partitionId.toInt,
-      splitPartitionModel.splitInfo,
-      timestampFormatter,
-      dateFormatter)
-  }
-
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
-    val dbName = splitPartitionModel.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
-    val tableName = splitPartitionModel.tableName
-    var locks = List.empty[ICarbonLock]
-    var success = false
-    try {
-      val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
-        LockUsage.COMPACTION_LOCK,
-        LockUsage.DELETE_SEGMENT_LOCK,
-        LockUsage.DROP_TABLE_LOCK,
-        LockUsage.CLEAN_FILES_LOCK,
-        LockUsage.ALTER_PARTITION_LOCK)
-      locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName,
-        locksToBeAcquired)(sparkSession)
-      val carbonLoadModel = new CarbonLoadModel()
-      val table = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
-      val tablePath = table.getTablePath
-      val dataLoadSchema = new CarbonDataLoadSchema(table)
-      carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
-      carbonLoadModel.setTableName(table.getTableName)
-      carbonLoadModel.setDatabaseName(table.getDatabaseName)
-      carbonLoadModel.setTablePath(tablePath)
-      val loadStartTime = CarbonUpdateUtil.readCurrentTime
-      carbonLoadModel.setFactTimeStamp(loadStartTime)
-      alterTableSplitPartition(
-        sparkSession.sqlContext,
-        splitPartitionModel.partitionId.toInt.toString,
-        carbonLoadModel,
-        oldPartitionIds.asScala.toList
-      )
-      success = true
-    } catch {
-      case e: Exception =>
-        success = false
-        sys.error(s"Add/Split Partition failed. Please check logs for more info. ${ e.getMessage }")
-    } finally {
-      AlterTableUtil.releaseLocks(locks)
-      CacheProvider.getInstance().dropAllCache()
-      val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
-      LOGGER.info("Locks released after alter table add/split partition action.")
-      LOGGER.audit("Locks released after alter table add/split partition action.")
-      if (success) {
-        LOGGER.info(s"Alter table add/split partition is successful for table $dbName.$tableName")
-        LOGGER.audit(s"Alter table add/split partition is successful for table $dbName.$tableName")
-      }
-    }
-    Seq.empty
-  }
-
-  private def alterTableSplitPartition(
-      sqlContext: SQLContext,
-      partitionId: String,
-      carbonLoadModel: CarbonLoadModel,
-      oldPartitionIdList: List[Int]
-  ): Unit = {
-    LOGGER.audit(s"Add partition request received for table " +
-                 s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-    try {
-      startSplitThreads(sqlContext,
-        carbonLoadModel,
-        partitionId,
-        oldPartitionIdList)
-    } catch {
-      case e: Exception =>
-        LOGGER.error(s"Exception in start splitting partition thread. ${ e.getMessage }")
-        throw e
-    }
-  }
-
-  private def startSplitThreads(sqlContext: SQLContext,
-      carbonLoadModel: CarbonLoadModel,
-      partitionId: String,
-      oldPartitionIdList: List[Int]): Unit = {
-    val numberOfCores = CarbonProperties.getInstance()
-      .getProperty(CarbonCommonConstants.NUM_CORES_ALT_PARTITION,
-        CarbonCommonConstants.DEFAULT_NUMBER_CORES)
-    val executor : ExecutorService = Executors.newFixedThreadPool(numberOfCores.toInt)
-    try {
-      val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-      val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-      val segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier)
-      val validSegments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala
-      val threadArray: Array[SplitThread] = new Array[SplitThread](validSegments.size)
-      var i = 0
-      validSegments.foreach { segmentId =>
-        threadArray(i) = SplitThread(sqlContext, carbonLoadModel, executor,
-          segmentId, partitionId, oldPartitionIdList)
-        threadArray(i).start()
-        i += 1
-      }
-      threadArray.foreach {
-        thread => thread.join()
-      }
-      val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath,
-        carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
-      val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
-      refresher.refreshSegments(validSegments.asJava)
-    } catch {
-      case e: Exception =>
-        LOGGER.error(s"Exception when split partition: ${ e.getMessage }")
-        throw e
-    } finally {
-      executor.shutdown()
-      try {
-        CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false)
-      } catch {
-        case e: Exception =>
-          LOGGER.error(s"Exception in add/split partition thread while deleting partial load file" +
-                       s" ${ e.getMessage }")
-      }
-    }
-  }
-}
-
-case class SplitThread(sqlContext: SQLContext,
-    carbonLoadModel: CarbonLoadModel,
-    executor: ExecutorService,
-    segmentId: String,
-    partitionId: String,
-    oldPartitionIdList: List[Int]) extends Thread {
-
-  private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
-
-  override def run(): Unit = {
-    var triggeredSplitPartitionStatus = false
-    var exception: Exception = null
-    try {
-      executePartitionSplit(sqlContext,
-        carbonLoadModel, executor, segmentId, partitionId, oldPartitionIdList)
-      triggeredSplitPartitionStatus = true
-    } catch {
-      case e: Exception =>
-        val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
-        LOGGER.error(s"Exception in partition split thread: ${ e.getMessage } }")
-        exception = e
-    }
-    if (!triggeredSplitPartitionStatus) {
-      throw new Exception("Exception in split partition " + exception.getMessage)
-    }
-  }
-
-  private def executePartitionSplit( sqlContext: SQLContext,
-      carbonLoadModel: CarbonLoadModel,
-      executor: ExecutorService,
-      segment: String,
-      partitionId: String,
-      oldPartitionIdList: List[Int]): Unit = {
-    val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]](
-      CarbonCommonConstants.DEFAULT_COLLECTION_SIZE
-    )
-    scanSegmentsForSplitPartition(futureList, executor, segment, partitionId,
-      sqlContext, carbonLoadModel, oldPartitionIdList)
-    try {
-      futureList.asScala.foreach { future =>
-        future.get
-      }
-    } catch {
-      case e: Exception =>
-        LOGGER.error(e, s"Exception in partition split thread ${ e.getMessage }")
-        throw e
-    }
-  }
-
-  private def scanSegmentsForSplitPartition(futureList: util.List[Future[Void]],
-      executor: ExecutorService,
-      segmentId: String,
-      partitionId: String,
-      sqlContext: SQLContext,
-      carbonLoadModel: CarbonLoadModel,
-      oldPartitionIdList: List[Int]): Unit = {
-
-    val splitModel = SplitPartitionCallableModel(carbonLoadModel,
-      segmentId,
-      partitionId,
-      oldPartitionIdList,
-      sqlContext)
-
-    val future: Future[Void] = executor.submit(new SplitPartitionCallable(splitModel))
-    futureList.add(future)
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
new file mode 100644
index 0000000..69aa91a
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.partition
+
+import java.util
+import java.util.concurrent.{Executors, ExecutorService, Future}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.util.AlterTableUtil
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.cache.CacheProvider
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.partition.DropPartitionCallable
+
+case class CarbonAlterTableDropPartitionCommand(
+    model: AlterTableDropPartitionModel)
+  extends AtomicRunnableCommand {
+
+  private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+  private val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]()
+
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    if (model.partitionId.equals("0")) {
+      sys.error(s"Cannot drop default partition! Please use delete statement!")
+    }
+    val dbName = model.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
+    val tableName = model.tableName
+    val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
+      .asInstanceOf[CarbonRelation]
+    val tablePath = relation.carbonTable.getTablePath
+    carbonMetaStore.checkSchemasModifiedTimeAndReloadTables()
+    if (relation == null) {
+      sys.error(s"Table $dbName.$tableName does not exist")
+    }
+    if (null == CarbonMetadata.getInstance.getCarbonTable(dbName, tableName)) {
+      LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
+      sys.error(s"Alter table failed. table not found: $dbName.$tableName")
+    }
+    val table = relation.carbonTable
+    val partitionInfo = table.getPartitionInfo(tableName)
+    if (partitionInfo == null) {
+      sys.error(s"Table $tableName is not a partition table.")
+    }
+    val partitionIds = partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList
+    // keep a copy of partitionIdList before update partitionInfo.
+    // will be used in partition data scan
+    oldPartitionIds.addAll(partitionIds.asJava)
+    val partitionIndex = partitionIds.indexOf(Integer.valueOf(model.partitionId))
+    partitionInfo.getPartitionType match {
+      case PartitionType.HASH => sys.error(s"Hash partition cannot be dropped!")
+      case PartitionType.RANGE =>
+        val rangeInfo = new util.ArrayList(partitionInfo.getRangeInfo)
+        val rangeToRemove = partitionInfo.getRangeInfo.get(partitionIndex - 1)
+        rangeInfo.remove(rangeToRemove)
+        partitionInfo.setRangeInfo(rangeInfo)
+      case PartitionType.LIST =>
+        val listInfo = new util.ArrayList(partitionInfo.getListInfo)
+        val listToRemove = partitionInfo.getListInfo.get(partitionIndex - 1)
+        listInfo.remove(listToRemove)
+        partitionInfo.setListInfo(listInfo)
+      case PartitionType.RANGE_INTERVAL =>
+        sys.error(s"Dropping range interval partition isn't support yet!")
+    }
+    partitionInfo.dropPartition(partitionIndex)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
+    val schemaFilePath = carbonTablePath.getSchemaFilePath
+    // read TableInfo
+    val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
+
+    val schemaConverter = new ThriftWrapperSchemaConverterImpl()
+    val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
+      dbName, tableName, tablePath)
+    val tableSchema = wrapperTableInfo.getFactTable
+    tableSchema.setPartitionInfo(partitionInfo)
+    wrapperTableInfo.setFactTable(tableSchema)
+    wrapperTableInfo.setLastUpdatedTime(System.currentTimeMillis())
+    val thriftTable =
+      schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
+    thriftTable.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
+      .setTime_stamp(System.currentTimeMillis)
+    carbonMetaStore.updateMetadataByThriftTable(schemaFilePath, thriftTable,
+      dbName, tableName, tablePath)
+    CarbonUtil.writeThriftTableToSchemaFile(schemaFilePath, thriftTable)
+    // update the schema modified time
+    carbonMetaStore.updateAndTouchSchemasUpdatedTime()
+    // sparkSession.catalog.refreshTable(tableName)
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+    val dbName = model.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
+    val tableName = model.tableName
+    var locks = List.empty[ICarbonLock]
+    var success = false
+    try {
+      val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
+        LockUsage.COMPACTION_LOCK,
+        LockUsage.DELETE_SEGMENT_LOCK,
+        LockUsage.DROP_TABLE_LOCK,
+        LockUsage.CLEAN_FILES_LOCK,
+        LockUsage.ALTER_PARTITION_LOCK)
+      locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName,
+        locksToBeAcquired)(sparkSession)
+      val carbonLoadModel = new CarbonLoadModel()
+      val table = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+      val dataLoadSchema = new CarbonDataLoadSchema(table)
+      // Need to fill dimension relation
+      carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
+      carbonLoadModel.setTableName(table.getTableName)
+      carbonLoadModel.setDatabaseName(table.getDatabaseName)
+      carbonLoadModel.setTablePath(table.getTablePath)
+      val loadStartTime = CarbonUpdateUtil.readCurrentTime
+      carbonLoadModel.setFactTimeStamp(loadStartTime)
+      alterTableDropPartition(
+        sparkSession.sqlContext,
+        model.partitionId,
+        carbonLoadModel,
+        model.dropWithData,
+        oldPartitionIds.asScala.toList
+      )
+      success = true
+    } catch {
+      case e: Exception =>
+        sys.error(s"Drop Partition failed. Please check logs for more info. ${ e.getMessage } ")
+        success = false
+    } finally {
+      CacheProvider.getInstance().dropAllCache()
+      AlterTableUtil.releaseLocks(locks)
+      LOGGER.info("Locks released after alter table drop partition action.")
+      LOGGER.audit("Locks released after alter table drop partition action.")
+    }
+    LOGGER.info(s"Alter table drop partition is successful for table $dbName.$tableName")
+    LOGGER.audit(s"Alter table drop partition is successful for table $dbName.$tableName")
+    Seq.empty
+  }
+
+  private def alterTableDropPartition(sqlContext: SQLContext,
+      partitionId: String,
+      carbonLoadModel: CarbonLoadModel,
+      dropWithData: Boolean,
+      oldPartitionIds: List[Int]): Unit = {
+    LOGGER.audit(s"Drop partition request received for table " +
+                 s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+    try {
+      startDropThreads(
+        sqlContext,
+        carbonLoadModel,
+        partitionId,
+        dropWithData,
+        oldPartitionIds)
+    } catch {
+      case e: Exception =>
+        LOGGER.error(s"Exception in start dropping partition thread. ${ e.getMessage }")
+        throw e
+    }
+  }
+
+  private def startDropThreads(sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      partitionId: String,
+      dropWithData: Boolean,
+      oldPartitionIds: List[Int]): Unit = {
+    val numberOfCores = CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.NUM_CORES_ALT_PARTITION,
+        CarbonCommonConstants.DEFAULT_NUMBER_CORES)
+    val executor : ExecutorService = Executors.newFixedThreadPool(numberOfCores.toInt)
+    try {
+      val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+      val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+      val segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier)
+      val validSegments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala
+      val threadArray: Array[Thread] = new Array[Thread](validSegments.size)
+      var i = 0
+      for (segmentId: String <- validSegments) {
+        threadArray(i) = dropPartitionThread(sqlContext, carbonLoadModel, executor,
+          segmentId, partitionId, dropWithData, oldPartitionIds)
+        threadArray(i).start()
+        i += 1
+      }
+      for (thread <- threadArray) {
+        thread.join()
+      }
+      val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath,
+        carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+      val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
+      refresher.refreshSegments(validSegments.asJava)
+    } catch {
+      case e: Exception =>
+        LOGGER.error(s"Exception when dropping partition: ${ e.getMessage }")
+    } finally {
+      executor.shutdown()
+      try {
+        CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false)
+      } catch {
+        case e: Exception =>
+          LOGGER.error(s"Exception in dropping partition thread while deleting partial load file" +
+                       s" ${ e.getMessage }")
+      }
+    }
+  }
+}
+
+case class dropPartitionThread(sqlContext: SQLContext,
+    carbonLoadModel: CarbonLoadModel,
+    executor: ExecutorService,
+    segmentId: String,
+    partitionId: String,
+    dropWithData: Boolean,
+    oldPartitionIds: List[Int]) extends Thread {
+
+  private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+
+  override def run(): Unit = {
+    try {
+      executeDroppingPartition(sqlContext, carbonLoadModel, executor,
+        segmentId, partitionId, dropWithData, oldPartitionIds)
+    } catch {
+      case e: Exception =>
+        val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+        LOGGER.error(s"Exception in dropping partition thread: ${ e.getMessage } }")
+    }
+  }
+
+  private def executeDroppingPartition(sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      executor: ExecutorService,
+      segmentId: String,
+      partitionId: String,
+      dropWithData: Boolean,
+      oldPartitionIds: List[Int]): Unit = {
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    val model = new DropPartitionCallableModel(carbonLoadModel,
+      segmentId, partitionId, oldPartitionIds, dropWithData, carbonTable, sqlContext)
+    val future: Future[Void] = executor.submit(new DropPartitionCallable(model))
+    try {
+      future.get
+    } catch {
+      case e: Exception =>
+        LOGGER.error(e, s"Exception in partition drop thread ${ e.getMessage }")
+        throw e
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
new file mode 100644
index 0000000..338ec5a
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.partition
+
+import java.text.SimpleDateFormat
+import java.util
+import java.util.concurrent.{Executors, ExecutorService, Future}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.util.{AlterTableUtil, PartitionUtils}
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.cache.CacheProvider
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.metadata.schema.PartitionInfo
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.partition.SplitPartitionCallable
+
+/**
+ * Command for Alter Table Add & Split partition
+ * Add is a special case of Splitting the default partition (part0)
+ */
+case class CarbonAlterTableSplitPartitionCommand(
+    splitPartitionModel: AlterTableSplitPartitionModel)
+  extends AtomicRunnableCommand {
+
+  private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+  private val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]()
+
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    val dbName = splitPartitionModel.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
+    val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val tableName = splitPartitionModel.tableName
+    val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
+      .asInstanceOf[CarbonRelation]
+    val tablePath = relation.carbonTable.getTablePath
+    if (relation == null) {
+      sys.error(s"Table $dbName.$tableName does not exist")
+    }
+    carbonMetaStore.checkSchemasModifiedTimeAndReloadTables()
+    if (null == CarbonMetadata.getInstance.getCarbonTable(dbName, tableName)) {
+      LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
+      sys.error(s"Alter table failed. table not found: $dbName.$tableName")
+    }
+    val table = relation.carbonTable
+    val partitionInfo = table.getPartitionInfo(tableName)
+    val partitionIds = partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList
+    // keep a copy of partitionIdList before update partitionInfo.
+    // will be used in partition data scan
+    oldPartitionIds.addAll(partitionIds.asJava)
+
+    if (partitionInfo == null) {
+      sys.error(s"Table $tableName is not a partition table.")
+    }
+    if (partitionInfo.getPartitionType == PartitionType.HASH) {
+      sys.error(s"Hash partition table cannot be added or split!")
+    }
+
+    updatePartitionInfo(partitionInfo, partitionIds)
+
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
+    val schemaFilePath = carbonTablePath.getSchemaFilePath
+    // read TableInfo
+    val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
+    val schemaConverter = new ThriftWrapperSchemaConverterImpl()
+    val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
+      dbName, tableName, tablePath)
+    val tableSchema = wrapperTableInfo.getFactTable
+    tableSchema.setPartitionInfo(partitionInfo)
+    wrapperTableInfo.setFactTable(tableSchema)
+    wrapperTableInfo.setLastUpdatedTime(System.currentTimeMillis())
+    val thriftTable =
+      schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
+    carbonMetaStore.updateMetadataByThriftTable(schemaFilePath, thriftTable,
+      dbName, tableName, tablePath)
+    CarbonUtil.writeThriftTableToSchemaFile(schemaFilePath, thriftTable)
+    // update the schema modified time
+    carbonMetaStore.updateAndTouchSchemasUpdatedTime()
+    sparkSession.sessionState.catalog.refreshTable(TableIdentifier(tableName, Option(dbName)))
+    Seq.empty
+  }
+
+  private def updatePartitionInfo(partitionInfo: PartitionInfo, partitionIds: List[Int]): Unit = {
+    val dateFormatter = new SimpleDateFormat(CarbonProperties.getInstance
+      .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+        CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
+
+    val timestampFormatter = new SimpleDateFormat(CarbonProperties.getInstance
+      .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
+
+    PartitionUtils.updatePartitionInfo(
+      partitionInfo,
+      partitionIds,
+      splitPartitionModel.partitionId.toInt,
+      splitPartitionModel.splitInfo,
+      timestampFormatter,
+      dateFormatter)
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    val dbName = splitPartitionModel.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
+    val tableName = splitPartitionModel.tableName
+    var locks = List.empty[ICarbonLock]
+    var success = false
+    try {
+      val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
+        LockUsage.COMPACTION_LOCK,
+        LockUsage.DELETE_SEGMENT_LOCK,
+        LockUsage.DROP_TABLE_LOCK,
+        LockUsage.CLEAN_FILES_LOCK,
+        LockUsage.ALTER_PARTITION_LOCK)
+      locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName,
+        locksToBeAcquired)(sparkSession)
+      val carbonLoadModel = new CarbonLoadModel()
+      val table = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+      val tablePath = table.getTablePath
+      val dataLoadSchema = new CarbonDataLoadSchema(table)
+      carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
+      carbonLoadModel.setTableName(table.getTableName)
+      carbonLoadModel.setDatabaseName(table.getDatabaseName)
+      carbonLoadModel.setTablePath(tablePath)
+      val loadStartTime = CarbonUpdateUtil.readCurrentTime
+      carbonLoadModel.setFactTimeStamp(loadStartTime)
+      alterTableSplitPartition(
+        sparkSession.sqlContext,
+        splitPartitionModel.partitionId.toInt.toString,
+        carbonLoadModel,
+        oldPartitionIds.asScala.toList
+      )
+      success = true
+    } catch {
+      case e: Exception =>
+        success = false
+        sys.error(s"Add/Split Partition failed. Please check logs for more info. ${ e.getMessage }")
+    } finally {
+      AlterTableUtil.releaseLocks(locks)
+      CacheProvider.getInstance().dropAllCache()
+      val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+      LOGGER.info("Locks released after alter table add/split partition action.")
+      LOGGER.audit("Locks released after alter table add/split partition action.")
+      if (success) {
+        LOGGER.info(s"Alter table add/split partition is successful for table $dbName.$tableName")
+        LOGGER.audit(s"Alter table add/split partition is successful for table $dbName.$tableName")
+      }
+    }
+    Seq.empty
+  }
+
+  private def alterTableSplitPartition(
+      sqlContext: SQLContext,
+      partitionId: String,
+      carbonLoadModel: CarbonLoadModel,
+      oldPartitionIdList: List[Int]
+  ): Unit = {
+    LOGGER.audit(s"Add partition request received for table " +
+                 s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+    try {
+      startSplitThreads(sqlContext,
+        carbonLoadModel,
+        partitionId,
+        oldPartitionIdList)
+    } catch {
+      case e: Exception =>
+        LOGGER.error(s"Exception in start splitting partition thread. ${ e.getMessage }")
+        throw e
+    }
+  }
+
+  private def startSplitThreads(sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      partitionId: String,
+      oldPartitionIdList: List[Int]): Unit = {
+    val numberOfCores = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.NUM_CORES_ALT_PARTITION,
+        CarbonCommonConstants.DEFAULT_NUMBER_CORES)
+    val executor : ExecutorService = Executors.newFixedThreadPool(numberOfCores.toInt)
+    try {
+      val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+      val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+      val segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier)
+      val validSegments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala
+      val threadArray: Array[SplitThread] = new Array[SplitThread](validSegments.size)
+      var i = 0
+      validSegments.foreach { segmentId =>
+        threadArray(i) = SplitThread(sqlContext, carbonLoadModel, executor,
+          segmentId, partitionId, oldPartitionIdList)
+        threadArray(i).start()
+        i += 1
+      }
+      threadArray.foreach {
+        thread => thread.join()
+      }
+      val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath,
+        carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+      val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
+      refresher.refreshSegments(validSegments.asJava)
+    } catch {
+      case e: Exception =>
+        LOGGER.error(s"Exception when split partition: ${ e.getMessage }")
+        throw e
+    } finally {
+      executor.shutdown()
+      try {
+        CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false)
+      } catch {
+        case e: Exception =>
+          LOGGER.error(s"Exception in add/split partition thread while deleting partial load file" +
+                       s" ${ e.getMessage }")
+      }
+    }
+  }
+}
+
+case class SplitThread(sqlContext: SQLContext,
+    carbonLoadModel: CarbonLoadModel,
+    executor: ExecutorService,
+    segmentId: String,
+    partitionId: String,
+    oldPartitionIdList: List[Int]) extends Thread {
+
+  private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+
+  override def run(): Unit = {
+    var triggeredSplitPartitionStatus = false
+    var exception: Exception = null
+    try {
+      executePartitionSplit(sqlContext,
+        carbonLoadModel, executor, segmentId, partitionId, oldPartitionIdList)
+      triggeredSplitPartitionStatus = true
+    } catch {
+      case e: Exception =>
+        val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+        LOGGER.error(s"Exception in partition split thread: ${ e.getMessage } }")
+        exception = e
+    }
+    if (!triggeredSplitPartitionStatus) {
+      throw new Exception("Exception in split partition " + exception.getMessage)
+    }
+  }
+
+  private def executePartitionSplit( sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      executor: ExecutorService,
+      segment: String,
+      partitionId: String,
+      oldPartitionIdList: List[Int]): Unit = {
+    val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]](
+      CarbonCommonConstants.DEFAULT_COLLECTION_SIZE
+    )
+    scanSegmentsForSplitPartition(futureList, executor, segment, partitionId,
+      sqlContext, carbonLoadModel, oldPartitionIdList)
+    try {
+      futureList.asScala.foreach { future =>
+        future.get
+      }
+    } catch {
+      case e: Exception =>
+        LOGGER.error(e, s"Exception in partition split thread ${ e.getMessage }")
+        throw e
+    }
+  }
+
+  private def scanSegmentsForSplitPartition(futureList: util.List[Future[Void]],
+      executor: ExecutorService,
+      segmentId: String,
+      partitionId: String,
+      sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      oldPartitionIdList: List[Int]): Unit = {
+
+    val splitModel = SplitPartitionCallableModel(carbonLoadModel,
+      segmentId,
+      partitionId,
+      oldPartitionIdList,
+      sqlContext)
+
+    val future: Future[Void] = executor.submit(new SplitPartitionCallable(splitModel))
+    futureList.add(future)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonShowCarbonPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonShowCarbonPartitionsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonShowCarbonPartitionsCommand.scala
new file mode 100644
index 0000000..ed50835
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonShowCarbonPartitionsCommand.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.partition
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.command.MetadataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.spark.util.CommonUtil
+
+/**
+ * Command for show table partitions Command
+ */
+private[sql] case class CarbonShowCarbonPartitionsCommand(
+    tableIdentifier: TableIdentifier)
+  extends MetadataCommand {
+
+  override val output: Seq[Attribute] = CommonUtil.partitionInfoOutput
+
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      .lookupRelation(tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
+    val carbonTable = relation.carbonTable
+    val tableName = carbonTable.getTableName
+    val partitionInfo = carbonTable.getPartitionInfo(
+      carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
+    if (partitionInfo == null) {
+      throw new AnalysisException(
+        s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableName")
+    }
+    val partitionType = partitionInfo.getPartitionType
+    val columnName = partitionInfo.getColumnSchemaList.get(0).getColumnName
+    val LOGGER = LogServiceFactory.getLogService(CarbonShowCarbonPartitionsCommand.getClass.getName)
+    LOGGER.info("partition column name:" + columnName)
+    CommonUtil.getPartitionInfo(columnName, partitionType, partitionInfo)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala
deleted file mode 100644
index 903e93b..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.partition
-
-import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.command.{RunnableCommand, SchemaProcessCommand}
-import org.apache.spark.sql.hive.CarbonRelation
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.spark.util.CommonUtil
-
-/**
- * Command for show table partitions Command
- */
-private[sql] case class ShowCarbonPartitionsCommand(
-    tableIdentifier: TableIdentifier)
-  extends RunnableCommand with SchemaProcessCommand {
-
-  override val output: Seq[Attribute] = CommonUtil.partitionInfoOutput
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    processSchema(sparkSession)
-  }
-
-  override def processSchema(sparkSession: SparkSession): Seq[Row] = {
-    val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .lookupRelation(tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
-    val carbonTable = relation.carbonTable
-    val tableName = carbonTable.getTableName
-    val partitionInfo = carbonTable.getPartitionInfo(
-      carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
-    if (partitionInfo == null) {
-      throw new AnalysisException(
-        s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableName")
-    }
-    val partitionType = partitionInfo.getPartitionType
-    val columnName = partitionInfo.getColumnSchemaList.get(0).getColumnName
-    val LOGGER = LogServiceFactory.getLogService(ShowCarbonPartitionsCommand.getClass.getName)
-    LOGGER.info("partition column name:" + columnName)
-    CommonUtil.getPartitionInfo(columnName, partitionType, partitionInfo)
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index 3854f76..ddc2bde 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -14,101 +14,116 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.spark.sql.execution.command.preaaggregate
 
 import scala.collection.mutable
-import scala.collection.JavaConverters._
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.execution.command.table.{CarbonCreateTableCommand, CarbonDropTableCommand}
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 
 /**
  * Below command class will be used to create pre-aggregate table
  * and updating the parent table about the child table information
- * Failure case:
+ * It will be either success or nothing happen in case of failure:
  * 1. failed to create pre aggregate table.
  * 2. failed to update main table
  *
- * @param queryString
  */
 case class CreatePreAggregateTableCommand(
     dataMapName: String,
     parentTableIdentifier: TableIdentifier,
     dmClassName: String,
-    dmproperties: Map[String, String],
+    dmProperties: Map[String, String],
     queryString: String)
-  extends RunnableCommand with SchemaProcessCommand {
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    processSchema(sparkSession)
-  }
+  extends AtomicRunnableCommand {
 
-  override def processSchema(sparkSession: SparkSession): Seq[Row] = {
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString)
     val df = sparkSession.sql(updatedQuery)
     val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
       df.logicalPlan, queryString)
     val fields = fieldRelationMap.keySet.toSeq
     val tableProperties = mutable.Map[String, String]()
-    dmproperties.foreach(t => tableProperties.put(t._1, t._2))
-    // Create the aggregation table name with parent table name prefix
-    val tableIdentifier = TableIdentifier(
-        parentTableIdentifier.table +"_" + dataMapName, parentTableIdentifier.database)
+    dmProperties.foreach(t => tableProperties.put(t._1, t._2))
+
     // prepare table model of the collected tokens
-    val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(false,
+    val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(
+      ifNotExistPresent = false,
       new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database),
       tableIdentifier.table.toLowerCase,
       fields,
       Seq(),
       tableProperties,
       None,
-      false,
+      isAlterFlow = false,
       None)
 
-    // getting the parent table
     val parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
-    // getting the table name
-    val parentTableName = parentTable.getTableName
-    // getting the db name of parent table
-    val parentDbName = parentTable.getDatabaseName
-
-    assert(parentTableName.equalsIgnoreCase(parentTableIdentifier.table))
+    assert(parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table))
     // updating the relation identifier, this will be stored in child table
     // which can be used during dropping of pre-aggreate table as parent table will
     // also get updated
     tableModel.parentTable = Some(parentTable)
     tableModel.dataMapRelation = Some(fieldRelationMap)
     CarbonCreateTableCommand(tableModel).run(sparkSession)
-    try {
-      val table = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession)
-      val tableInfo = table.getTableInfo
-      // child schema object which will be updated on parent table about the
-      val childSchema = tableInfo.getFactTable
-        .buildChildSchema(dataMapName, CarbonCommonConstants.AGGREGATIONDATAMAPSCHEMA,
-        tableInfo.getDatabaseName, queryString, "AGGREGATION")
-      dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
-      // updating the parent table about child table
-      PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession)
-      val loadAvailable = PreAggregateUtil.checkMainTableLoad(parentTable)
-      if (loadAvailable) {
-        sparkSession.sql(
-          s"insert into ${ tableModel.databaseName }.${ tableModel.tableName } $queryString")
-      }
-    } catch {
-      case e: Exception =>
-        CarbonDropTableCommand(
-          ifExistsSet = true,
-          Some( tableModel.databaseName ), tableModel.tableName ).run(sparkSession)
-        throw e
 
+    val table = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession)
+    val tableInfo = table.getTableInfo
+    // child schema object which will be updated on parent table about the
+    val childSchema = tableInfo.getFactTable.buildChildSchema(
+      dataMapName,
+      CarbonCommonConstants.AGGREGATIONDATAMAPSCHEMA,
+      tableInfo.getDatabaseName,
+      queryString,
+      "AGGREGATION")
+    dmProperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
+
+    // updating the parent table about child table
+    PreAggregateUtil.updateMainTable(
+      CarbonEnv.getDatabaseName(parentTableIdentifier.database)(sparkSession),
+      parentTableIdentifier.table,
+      childSchema,
+      sparkSession)
+
+    Seq.empty
+  }
+
+  override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = {
+    // drop child table and undo the change in table info of main table
+    CarbonDropTableCommand(
+      ifExistsSet = true,
+      tableIdentifier.database,
+      tableIdentifier.table
+    ).run(sparkSession)
+
+    // TODO: undo the change in table info of main table
+
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    // load child table if parent table has existing segments
+    val dbName = CarbonEnv.getDatabaseName(parentTableIdentifier.database)(sparkSession)
+    val tableName = tableIdentifier.table
+    val metastorePath = CarbonEnv.getMetadataPath(Some(dbName), tableName)(sparkSession)
+    val loadAvailable = SegmentStatusManager.readLoadMetadata(metastorePath).nonEmpty
+    if (loadAvailable) {
+      sparkSession.sql(s"insert into $dbName.$tableName $queryString")
     }
     Seq.empty
   }
+
+  // Create the aggregation table name with parent table name prefix
+  private lazy val tableIdentifier =
+    TableIdentifier(parentTableIdentifier.table + "_" + dataMapName, parentTableIdentifier.database)
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index f64deec..7bc120b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command.preaaggregate
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.CarbonSession
-import org.apache.spark.sql.execution.command.CarbonDropTableCommand
+import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 1ee8dd6..a2809ed 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -459,10 +459,6 @@ object PreAggregateUtil {
     }
   }
 
-  def checkMainTableLoad(carbonTable: CarbonTable): Boolean = {
-    SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).nonEmpty
-  }
-
   /**
    * Below method will be used to update logical plan
    * this is required for creating pre aggregate tables,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableSetCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableSetCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableSetCommand.scala
deleted file mode 100644
index 26fe36b..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableSetCommand.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.schema
-
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command._
-import org.apache.spark.util.AlterTableUtil
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.format.TableInfo
-
-private[sql] case class AlterTableSetCommand(val tableIdentifier: TableIdentifier,
-                                             val properties: Map[String, String],
-                                             val isView: Boolean)
-  extends RunnableCommand with SchemaProcessCommand {
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    processSchema(sparkSession)
-  }
-
-  override def processSchema(sparkSession: SparkSession): Seq[Row] = {
-    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    AlterTableUtil.modifyTableComment(tableIdentifier, properties, Nil,
-      true)(sparkSession)
-    Seq.empty
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableUnsetCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableUnsetCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableUnsetCommand.scala
deleted file mode 100644
index 10367a3..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableUnsetCommand.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.schema
-
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command._
-import org.apache.spark.util.AlterTableUtil
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.format.TableInfo
-
-private[sql] case class AlterTableUnsetCommand(val tableIdentifier: TableIdentifier,
-                                               val propKeys: Seq[String],
-                                               val ifExists: Boolean,
-                                               val isView: Boolean)
-  extends RunnableCommand with SchemaProcessCommand {
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    processSchema(sparkSession)
-  }
-
-  override def processSchema(sparkSession: SparkSession): Seq[Row] = {
-    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    AlterTableUtil.modifyTableComment(tableIdentifier, Map.empty[String, String],
-      propKeys, false)(sparkSession)
-    Seq.empty
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index 0a720da..c8f998b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -20,9 +20,7 @@ package org.apache.spark.sql.execution.command.schema
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableColumnSchemaGenerator, RunnableCommand}
-import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableColumnSchemaGenerator, MetadataCommand}
 import org.apache.spark.util.AlterTableUtil
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
@@ -36,9 +34,9 @@ import org.apache.carbondata.spark.rdd.{AlterTableAddColumnRDD, AlterTableDropCo
 
 private[sql] case class CarbonAlterTableAddColumnCommand(
     alterTableAddColumnsModel: AlterTableAddColumnsModel)
-  extends RunnableCommand {
+  extends MetadataCommand {
 
-  override def run(sparkSession: SparkSession): Seq[Row] = {
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val tableName = alterTableAddColumnsModel.tableName
     val dbName = alterTableAddColumnsModel.databaseName

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
index 6e2c83d..dcee7c3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
@@ -20,8 +20,7 @@ package org.apache.spark.sql.execution.command.schema
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel, RunnableCommand}
-import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel, MetadataCommand}
 import org.apache.spark.util.AlterTableUtil
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
@@ -34,9 +33,9 @@ import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil}
 
 private[sql] case class CarbonAlterTableDataTypeChangeCommand(
     alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel)
-  extends RunnableCommand {
+  extends MetadataCommand {
 
-  override def run(sparkSession: SparkSession): Seq[Row] = {
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val tableName = alterTableDataTypeChangeModel.tableName
     val dbName = alterTableDataTypeChangeModel.databaseName

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index bcc059f..40e1e15 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -21,8 +21,8 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{AlterTableDropColumnModel, RunnableCommand}
-import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.execution.command.{AlterTableDropColumnModel, MetadataCommand}
+import org.apache.spark.sql.hive.CarbonSessionState
 import org.apache.spark.util.AlterTableUtil
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
@@ -36,9 +36,9 @@ import org.apache.carbondata.spark.rdd.AlterTableDropColumnRDD
 
 private[sql] case class CarbonAlterTableDropColumnCommand(
     alterTableDropColumnModel: AlterTableDropColumnModel)
-  extends RunnableCommand {
+  extends MetadataCommand {
 
-  override def run(sparkSession: SparkSession): Seq[Row] = {
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val tableName = alterTableDropColumnModel.tableName
     val dbName = alterTableDropColumnModel.databaseName
@@ -137,9 +137,9 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
       // event will be fired before dropping the columns
       val alterTableDropColumnPostEvent: AlterTableDropColumnPostEvent =
         AlterTableDropColumnPostEvent(
-        carbonTable,
-        alterTableDropColumnModel,
-        sparkSession)
+          carbonTable,
+          alterTableDropColumnModel,
+          sparkSession)
       OperationListenerBus.getInstance().fireEvent(alterTableDropColumnPostEvent, operationContext)
 
       LOGGER.info(s"Alter table for drop columns is successful for table $dbName.$tableName")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index 594b92a..593a675 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command.schema
 
 import org.apache.spark.sql.{CarbonEnv, CarbonSession, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.{AlterTableRenameModel, RunnableCommand}
+import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCommand}
 import org.apache.spark.sql.hive.{CarbonRelation, HiveExternalCatalog}
 import org.apache.spark.util.AlterTableUtil
 
@@ -38,9 +38,9 @@ import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 private[sql] case class CarbonAlterTableRenameCommand(
     alterTableRenameModel: AlterTableRenameModel)
-  extends RunnableCommand {
+  extends MetadataCommand {
 
-  override def run(sparkSession: SparkSession): Seq[Row] = {
+  override def processMetadata(sparkSession: SparkSession): Seq[Nothing] = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val oldTableIdentifier = alterTableRenameModel.oldTableIdentifier
     val newTableIdentifier = alterTableRenameModel.newTableIdentifier