You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/13 22:11:45 UTC

[19/49] carbondata git commit: [CARBONDATA-1656][Streaming] Reject alter table command for streaming table

[CARBONDATA-1656][Streaming] Reject alter table command for streaming table

This closes #1448


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/87892522
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/87892522
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/87892522

Branch: refs/heads/fgdatamap
Commit: 87892522bfa73a4876e5cfe68bbf9d460a9a1f52
Parents: 4c41f86
Author: Jacky Li <ja...@qq.com>
Authored: Wed Nov 8 10:37:04 2017 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Wed Nov 8 16:04:04 2017 +0800

----------------------------------------------------------------------
 .../schema/AlterTableAddColumnCommand.scala     | 115 ------------
 .../AlterTableDataTypeChangeCommand.scala       | 116 -------------
 .../schema/AlterTableDropColumnCommand.scala    | 148 ----------------
 .../schema/AlterTableRenameTableCommand.scala   | 174 -------------------
 .../CarbonAlterTableAddColumnCommand.scala      | 115 ++++++++++++
 .../CarbonAlterTableDataTypeChangeCommand.scala | 116 +++++++++++++
 .../CarbonAlterTableDropColumnCommand.scala     | 148 ++++++++++++++++
 .../schema/CarbonAlterTableRenameCommand.scala  | 174 +++++++++++++++++++
 .../sql/execution/strategy/DDLStrategy.scala    |  10 +-
 .../strategy/StreamingTableStrategy.scala       |  39 ++++-
 .../sql/parser/CarbonSpark2SqlParser.scala      |   8 +-
 .../TestStreamingTableOperation.scala           |  15 ++
 .../restructure/AlterTableRevertTestCase.scala  |   2 +-
 13 files changed, 610 insertions(+), 570 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableAddColumnCommand.scala
deleted file mode 100644
index 6e6a4b1..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableAddColumnCommand.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.schema
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableColumnSchemaGenerator, RunnableCommand}
-import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
-import org.apache.spark.util.AlterTableUtil
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.format.TableInfo
-import org.apache.carbondata.spark.rdd.{AlterTableAddColumnRDD, AlterTableDropColumnRDD}
-
-private[sql] case class AlterTableAddColumnCommand(
-    alterTableAddColumnsModel: AlterTableAddColumnsModel)
-  extends RunnableCommand {
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    val tableName = alterTableAddColumnsModel.tableName
-    val dbName = alterTableAddColumnsModel.databaseName
-      .getOrElse(sparkSession.catalog.currentDatabase)
-    LOGGER.audit(s"Alter table add columns request has been received for $dbName.$tableName")
-    val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
-    var locks = List.empty[ICarbonLock]
-    var timeStamp = 0L
-    var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]()
-    var carbonTable: CarbonTable = null
-    try {
-      locks = AlterTableUtil
-        .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
-      // Consider a concurrent scenario where 2 alter operations are executed in parallel. 1st
-      // operation is success and updates the schema file. 2nd operation will get the lock after
-      // completion of 1st operation but as look up relation is called before it will have the
-      // older carbon table and this can lead to inconsistent state in the system. Therefor look
-      // up relation should be called after acquiring the lock
-      val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      carbonTable = metastore
-        .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
-        .tableMeta.carbonTable
-      // get the latest carbon table and check for column existence
-      // read the latest schema file
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
-        carbonTable.getCarbonTableIdentifier)
-      val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
-      val schemaConverter = new ThriftWrapperSchemaConverterImpl()
-      val wrapperTableInfo = schemaConverter
-        .fromExternalToWrapperTableInfo(thriftTableInfo,
-          dbName,
-          tableName,
-          carbonTable.getStorePath)
-      newCols = new AlterTableColumnSchemaGenerator(alterTableAddColumnsModel,
-        dbName,
-        wrapperTableInfo,
-        carbonTablePath,
-        carbonTable.getCarbonTableIdentifier,
-        carbonTable.getStorePath, sparkSession.sparkContext).process
-      // generate dictionary files for the newly added columns
-      new AlterTableAddColumnRDD(sparkSession.sparkContext,
-        newCols,
-        carbonTable.getCarbonTableIdentifier,
-        carbonTable.getStorePath).collect()
-      timeStamp = System.currentTimeMillis
-      val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
-      schemaEvolutionEntry.setTimeStamp(timeStamp)
-      schemaEvolutionEntry.setAdded(newCols.toList.asJava)
-      val thriftTable = schemaConverter
-        .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
-      AlterTableUtil
-        .updateSchemaInfo(carbonTable,
-          schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
-          thriftTable)(sparkSession,
-          sparkSession.sessionState.asInstanceOf[CarbonSessionState])
-      LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName")
-      LOGGER.audit(s"Alter table for add columns is successful for table $dbName.$tableName")
-    } catch {
-      case e: Exception =>
-        LOGGER.error(e, "Alter table add columns failed")
-        if (newCols.nonEmpty) {
-          LOGGER.info("Cleaning up the dictionary files as alter table add operation failed")
-          new AlterTableDropColumnRDD(sparkSession.sparkContext,
-            newCols,
-            carbonTable.getCarbonTableIdentifier,
-            carbonTable.getStorePath).collect()
-          AlterTableUtil.revertAddColumnChanges(dbName, tableName, timeStamp)(sparkSession)
-        }
-        sys.error(s"Alter table add operation failed: ${e.getMessage}")
-    } finally {
-      // release lock after command execution completion
-      AlterTableUtil.releaseLocks(locks)
-    }
-    Seq.empty
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDataTypeChangeCommand.scala
deleted file mode 100644
index be87bbb..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDataTypeChangeCommand.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.schema
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel, RunnableCommand}
-import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
-import org.apache.spark.util.AlterTableUtil
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil}
-
-private[sql] case class AlterTableDataTypeChangeCommand(
-    alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel)
-  extends RunnableCommand {
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    val tableName = alterTableDataTypeChangeModel.tableName
-    val dbName = alterTableDataTypeChangeModel.databaseName
-      .getOrElse(sparkSession.catalog.currentDatabase)
-    LOGGER.audit(s"Alter table change data type request has been received for $dbName.$tableName")
-    val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
-    var locks = List.empty[ICarbonLock]
-    // get the latest carbon table and check for column existence
-    var carbonTable: CarbonTable = null
-    var timeStamp = 0L
-    try {
-      locks = AlterTableUtil
-        .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
-      val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      carbonTable = metastore
-        .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
-        .tableMeta.carbonTable
-      val columnName = alterTableDataTypeChangeModel.columnName
-      val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible)
-      if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) {
-        LOGGER.audit(s"Alter table change data type request has failed. " +
-                     s"Column $columnName does not exist")
-        sys.error(s"Column does not exist: $columnName")
-      }
-      val carbonColumn = carbonColumns.filter(_.getColName.equalsIgnoreCase(columnName))
-      if (carbonColumn.size == 1) {
-        CarbonScalaUtil
-          .validateColumnDataType(alterTableDataTypeChangeModel.dataTypeInfo, carbonColumn.head)
-      } else {
-        LOGGER.audit(s"Alter table change data type request has failed. " +
-                     s"Column $columnName is invalid")
-        sys.error(s"Invalid Column: $columnName")
-      }
-      // read the latest schema file
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
-        carbonTable.getCarbonTableIdentifier)
-      val tableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
-      // maintain the added column for schema evolution history
-      var addColumnSchema: ColumnSchema = null
-      var deletedColumnSchema: ColumnSchema = null
-      val columnSchemaList = tableInfo.fact_table.table_columns.asScala.filter(!_.isInvisible)
-      columnSchemaList.foreach { columnSchema =>
-        if (columnSchema.column_name.equalsIgnoreCase(columnName)) {
-          deletedColumnSchema = columnSchema.deepCopy
-          columnSchema.setData_type(DataTypeConverterUtil
-            .convertToThriftDataType(alterTableDataTypeChangeModel.dataTypeInfo.dataType))
-          columnSchema.setPrecision(alterTableDataTypeChangeModel.dataTypeInfo.precision)
-          columnSchema.setScale(alterTableDataTypeChangeModel.dataTypeInfo.scale)
-          addColumnSchema = columnSchema
-        }
-      }
-      timeStamp = System.currentTimeMillis
-      val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
-      schemaEvolutionEntry.setAdded(List(addColumnSchema).asJava)
-      schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava)
-      tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
-        .setTime_stamp(System.currentTimeMillis)
-      AlterTableUtil
-        .updateSchemaInfo(carbonTable,
-          schemaEvolutionEntry,
-          tableInfo)(sparkSession,
-          sparkSession.sessionState.asInstanceOf[CarbonSessionState])
-      LOGGER.info(s"Alter table for data type change is successful for table $dbName.$tableName")
-      LOGGER.audit(s"Alter table for data type change is successful for table $dbName.$tableName")
-    } catch {
-      case e: Exception => LOGGER
-        .error("Alter table change datatype failed : " + e.getMessage)
-        if (carbonTable != null) {
-          AlterTableUtil.revertDataTypeChanges(dbName, tableName, timeStamp)(sparkSession)
-        }
-        sys.error(s"Alter table data type change operation failed: ${e.getMessage}")
-    } finally {
-      // release lock after command execution completion
-      AlterTableUtil.releaseLocks(locks)
-    }
-    Seq.empty
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDropColumnCommand.scala
deleted file mode 100644
index 2f1e3d9..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDropColumnCommand.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.schema
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
-
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{AlterTableDropColumnModel, RunnableCommand}
-import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
-import org.apache.spark.util.AlterTableUtil
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.format.SchemaEvolutionEntry
-import org.apache.carbondata.spark.rdd.AlterTableDropColumnRDD
-
-private[sql] case class AlterTableDropColumnCommand(
-    alterTableDropColumnModel: AlterTableDropColumnModel)
-  extends RunnableCommand {
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    val tableName = alterTableDropColumnModel.tableName
-    val dbName = alterTableDropColumnModel.databaseName
-      .getOrElse(sparkSession.catalog.currentDatabase)
-    LOGGER.audit(s"Alter table drop columns request has been received for $dbName.$tableName")
-    var locks = List.empty[ICarbonLock]
-    var timeStamp = 0L
-    val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
-    // get the latest carbon table and check for column existence
-    var carbonTable: CarbonTable = null
-    try {
-      locks = AlterTableUtil
-        .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
-      val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      carbonTable = metastore
-        .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
-        .tableMeta.carbonTable
-      val partitionInfo = carbonTable.getPartitionInfo(tableName)
-      if (partitionInfo != null) {
-        val partitionColumnSchemaList = partitionInfo.getColumnSchemaList.asScala
-          .map(_.getColumnName)
-        // check each column existence in the table
-        val partitionColumns = alterTableDropColumnModel.columns.filter {
-          tableColumn => partitionColumnSchemaList.contains(tableColumn)
-        }
-        if (partitionColumns.nonEmpty) {
-          throw new UnsupportedOperationException("Partition columns cannot be dropped: " +
-                                                  s"$partitionColumns")
-        }
-      }
-      val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala
-      var dictionaryColumns = Seq[org.apache.carbondata.core.metadata.schema.table.column
-      .ColumnSchema]()
-      var keyColumnCountToBeDeleted = 0
-      // TODO: if deleted column list includes bucketted column throw an error
-      alterTableDropColumnModel.columns.foreach { column =>
-        var columnExist = false
-        tableColumns.foreach { tableColumn =>
-          // column should not be already deleted and should exist in the table
-          if (!tableColumn.isInvisible && column.equalsIgnoreCase(tableColumn.getColName)) {
-            if (tableColumn.isDimension) {
-              keyColumnCountToBeDeleted += 1
-              if (tableColumn.hasEncoding(Encoding.DICTIONARY)) {
-                dictionaryColumns ++= Seq(tableColumn.getColumnSchema)
-              }
-            }
-            columnExist = true
-          }
-        }
-        if (!columnExist) {
-          sys.error(s"Column $column does not exists in the table $dbName.$tableName")
-        }
-      }
-      // take the total key column count. key column to be deleted should not
-      // be >= key columns in schema
-      val totalKeyColumnInSchema = tableColumns.count {
-        tableColumn => !tableColumn.isInvisible && tableColumn.isDimension
-      }
-      if (keyColumnCountToBeDeleted >= totalKeyColumnInSchema) {
-        sys.error(s"Alter drop operation failed. AtLeast one key column should exist after drop.")
-      }
-      // read the latest schema file
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
-        carbonTable.getCarbonTableIdentifier)
-      val tableInfo: org.apache.carbondata.format.TableInfo =
-        metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
-      // maintain the deleted columns for schema evolution history
-      var deletedColumnSchema = ListBuffer[org.apache.carbondata.format.ColumnSchema]()
-      val columnSchemaList = tableInfo.fact_table.table_columns.asScala
-      alterTableDropColumnModel.columns.foreach { column =>
-        columnSchemaList.foreach { columnSchema =>
-          if (!columnSchema.invisible && column.equalsIgnoreCase(columnSchema.column_name)) {
-            deletedColumnSchema += columnSchema.deepCopy
-            columnSchema.invisible = true
-          }
-        }
-      }
-      // add deleted columns to schema evolution history and update the schema
-      timeStamp = System.currentTimeMillis
-      val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
-      schemaEvolutionEntry.setRemoved(deletedColumnSchema.toList.asJava)
-      AlterTableUtil
-        .updateSchemaInfo(carbonTable,
-          schemaEvolutionEntry,
-          tableInfo)(sparkSession,
-          sparkSession.sessionState.asInstanceOf[CarbonSessionState])
-      // TODO: 1. add check for deletion of index tables
-      // delete dictionary files for dictionary column and clear dictionary cache from memory
-      new AlterTableDropColumnRDD(sparkSession.sparkContext,
-        dictionaryColumns,
-        carbonTable.getCarbonTableIdentifier,
-        carbonTable.getStorePath).collect()
-      LOGGER.info(s"Alter table for drop columns is successful for table $dbName.$tableName")
-      LOGGER.audit(s"Alter table for drop columns is successful for table $dbName.$tableName")
-    } catch {
-      case e: Exception => LOGGER
-        .error("Alter table drop columns failed : " + e.getMessage)
-        if (carbonTable != null) {
-          AlterTableUtil.revertDropColumnChanges(dbName, tableName, timeStamp)(sparkSession)
-        }
-        sys.error(s"Alter table drop column operation failed: ${e.getMessage}")
-    } finally {
-      // release lock after command execution completion
-      AlterTableUtil.releaseLocks(locks)
-    }
-    Seq.empty
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala
deleted file mode 100644
index af361d5..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.schema
-
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.{AlterTableRenameModel, RunnableCommand}
-import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
-import org.apache.spark.util.AlterTableUtil
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.format.SchemaEvolutionEntry
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-
-private[sql] case class AlterTableRenameTableCommand(
-    alterTableRenameModel: AlterTableRenameModel)
-  extends RunnableCommand {
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    val oldTableIdentifier = alterTableRenameModel.oldTableIdentifier
-    val newTableIdentifier = alterTableRenameModel.newTableIdentifier
-    val oldDatabaseName = oldTableIdentifier.database
-      .getOrElse(sparkSession.catalog.currentDatabase)
-    val newDatabaseName = newTableIdentifier.database
-      .getOrElse(sparkSession.catalog.currentDatabase)
-    if (!oldDatabaseName.equalsIgnoreCase(newDatabaseName)) {
-      throw new MalformedCarbonCommandException("Database name should be same for both tables")
-    }
-    val tableExists = sparkSession.catalog.tableExists(oldDatabaseName, newTableIdentifier.table)
-    if (tableExists) {
-      throw new MalformedCarbonCommandException(s"Table with name $newTableIdentifier " +
-                                                s"already exists")
-    }
-    val oldTableName = oldTableIdentifier.table.toLowerCase
-    val newTableName = newTableIdentifier.table.toLowerCase
-    LOGGER.audit(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
-    LOGGER.info(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
-    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    val relation: CarbonRelation =
-      metastore.lookupRelation(oldTableIdentifier.database, oldTableName)(sparkSession)
-        .asInstanceOf[CarbonRelation]
-    if (relation == null) {
-      LOGGER.audit(s"Rename table request has failed. " +
-                   s"Table $oldDatabaseName.$oldTableName does not exist")
-      sys.error(s"Table $oldDatabaseName.$oldTableName does not exist")
-    }
-    val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
-      LockUsage.COMPACTION_LOCK,
-      LockUsage.DELETE_SEGMENT_LOCK,
-      LockUsage.CLEAN_FILES_LOCK,
-      LockUsage.DROP_TABLE_LOCK)
-    var locks = List.empty[ICarbonLock]
-    var timeStamp = 0L
-    var carbonTable: CarbonTable = null
-    try {
-      locks = AlterTableUtil
-        .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired)(
-          sparkSession)
-      val tableMeta = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
-        .asInstanceOf[CarbonRelation].tableMeta
-      carbonTable = tableMeta.carbonTable
-      // get the latest carbon table and check for column existence
-      val carbonTablePath = CarbonStorePath.
-        getCarbonTablePath(AbsoluteTableIdentifier.fromTablePath(tableMeta.tablePath))
-      val tableMetadataFile = carbonTablePath.getPath
-      val tableInfo: org.apache.carbondata.format.TableInfo =
-        metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
-      val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
-      schemaEvolutionEntry.setTableName(newTableName)
-      timeStamp = System.currentTimeMillis()
-      schemaEvolutionEntry.setTime_stamp(timeStamp)
-      renameBadRecords(oldTableName, newTableName, oldDatabaseName)
-      val fileType = FileFactory.getFileType(tableMetadataFile)
-      if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
-        val rename = FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
-          .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
-                       newTableName)
-        if (!rename) {
-          renameBadRecords(newTableName, oldTableName, oldDatabaseName)
-          sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName")
-        }
-      }
-      val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
-        newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
-      val newTablePath = metastore.updateTableSchema(newTableIdentifier,
-        carbonTable.getCarbonTableIdentifier,
-        tableInfo,
-        schemaEvolutionEntry,
-        tableMeta.tablePath)(sparkSession)
-      metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
-      sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
-        .runSqlHive(
-          s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $oldDatabaseName.$newTableName")
-      sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
-        .runSqlHive(
-          s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" +
-          s"('tableName'='$newTableName', " +
-          s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')")
-      sparkSession.catalog.refreshTable(TableIdentifier(newTableName,
-        Some(oldDatabaseName)).quotedString)
-      LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName")
-      LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
-    } catch {
-      case e: Exception =>
-        LOGGER.error(e, "Rename table failed: " + e.getMessage)
-        if (carbonTable != null) {
-          AlterTableUtil
-            .revertRenameTableChanges(oldTableIdentifier,
-              newTableName,
-              carbonTable.getStorePath,
-              carbonTable.getCarbonTableIdentifier.getTableId,
-              timeStamp)(
-              sparkSession)
-          renameBadRecords(newTableName, oldTableName, oldDatabaseName)
-        }
-        sys.error(s"Alter table rename table operation failed: ${e.getMessage}")
-    } finally {
-      // release lock after command execution completion
-      AlterTableUtil.releaseLocks(locks)
-      // case specific to rename table as after table rename old table path will not be found
-      if (carbonTable != null) {
-        AlterTableUtil
-          .releaseLocksManually(locks,
-            locksToBeAcquired,
-            oldDatabaseName,
-            newTableName,
-            carbonTable.getStorePath)
-      }
-    }
-    Seq.empty
-  }
-
-  private def renameBadRecords(
-      oldTableName: String,
-      newTableName: String,
-      dataBaseName: String): Unit = {
-    val oldPath = CarbonUtil
-      .getBadLogPath(dataBaseName + CarbonCommonConstants.FILE_SEPARATOR + oldTableName)
-    val newPath = CarbonUtil
-      .getBadLogPath(dataBaseName + CarbonCommonConstants.FILE_SEPARATOR + newTableName)
-    val fileType = FileFactory.getFileType(oldPath)
-    if (FileFactory.isFileExist(oldPath, fileType)) {
-      val renameSuccess = FileFactory.getCarbonFile(oldPath, fileType)
-        .renameForce(newPath)
-      if (!renameSuccess) {
-        sys.error(s"BadRecords Folder Rename Failed for table $dataBaseName.$oldTableName")
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
new file mode 100644
index 0000000..8737464
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.schema
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableColumnSchemaGenerator, RunnableCommand}
+import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
+import org.apache.spark.util.AlterTableUtil
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.format.TableInfo
+import org.apache.carbondata.spark.rdd.{AlterTableAddColumnRDD, AlterTableDropColumnRDD}
+
+private[sql] case class CarbonAlterTableAddColumnCommand(
+    alterTableAddColumnsModel: AlterTableAddColumnsModel)
+  extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    val tableName = alterTableAddColumnsModel.tableName
+    val dbName = alterTableAddColumnsModel.databaseName
+      .getOrElse(sparkSession.catalog.currentDatabase)
+    LOGGER.audit(s"Alter table add columns request has been received for $dbName.$tableName")
+    val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
+    var locks = List.empty[ICarbonLock]
+    var timeStamp = 0L
+    var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]()
+    var carbonTable: CarbonTable = null
+    try {
+      locks = AlterTableUtil
+        .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
+      // Consider a concurrent scenario where 2 alter operations are executed in parallel. 1st
+      // operation is success and updates the schema file. 2nd operation will get the lock after
+      // completion of 1st operation but as look up relation is called before it will have the
+      // older carbon table and this can lead to inconsistent state in the system. Therefor look
+      // up relation should be called after acquiring the lock
+      val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      carbonTable = metastore
+        .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+        .tableMeta.carbonTable
+      // get the latest carbon table and check for column existence
+      // read the latest schema file
+      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+        carbonTable.getCarbonTableIdentifier)
+      val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+      val schemaConverter = new ThriftWrapperSchemaConverterImpl()
+      val wrapperTableInfo = schemaConverter
+        .fromExternalToWrapperTableInfo(thriftTableInfo,
+          dbName,
+          tableName,
+          carbonTable.getStorePath)
+      newCols = new AlterTableColumnSchemaGenerator(alterTableAddColumnsModel,
+        dbName,
+        wrapperTableInfo,
+        carbonTablePath,
+        carbonTable.getCarbonTableIdentifier,
+        carbonTable.getStorePath, sparkSession.sparkContext).process
+      // generate dictionary files for the newly added columns
+      new AlterTableAddColumnRDD(sparkSession.sparkContext,
+        newCols,
+        carbonTable.getCarbonTableIdentifier,
+        carbonTable.getStorePath).collect()
+      timeStamp = System.currentTimeMillis
+      val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+      schemaEvolutionEntry.setTimeStamp(timeStamp)
+      schemaEvolutionEntry.setAdded(newCols.toList.asJava)
+      val thriftTable = schemaConverter
+        .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
+      AlterTableUtil
+        .updateSchemaInfo(carbonTable,
+          schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
+          thriftTable)(sparkSession,
+          sparkSession.sessionState.asInstanceOf[CarbonSessionState])
+      LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName")
+      LOGGER.audit(s"Alter table for add columns is successful for table $dbName.$tableName")
+    } catch {
+      case e: Exception =>
+        LOGGER.error(e, "Alter table add columns failed")
+        if (newCols.nonEmpty) {
+          LOGGER.info("Cleaning up the dictionary files as alter table add operation failed")
+          new AlterTableDropColumnRDD(sparkSession.sparkContext,
+            newCols,
+            carbonTable.getCarbonTableIdentifier,
+            carbonTable.getStorePath).collect()
+          AlterTableUtil.revertAddColumnChanges(dbName, tableName, timeStamp)(sparkSession)
+        }
+        sys.error(s"Alter table add operation failed: ${e.getMessage}")
+    } finally {
+      // release lock after command execution completion
+      AlterTableUtil.releaseLocks(locks)
+    }
+    Seq.empty
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
new file mode 100644
index 0000000..4e180c8
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.schema
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel, RunnableCommand}
+import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
+import org.apache.spark.util.AlterTableUtil
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil}
+
+private[sql] case class CarbonAlterTableDataTypeChangeCommand(
+    alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel)
+  extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    val tableName = alterTableDataTypeChangeModel.tableName
+    val dbName = alterTableDataTypeChangeModel.databaseName
+      .getOrElse(sparkSession.catalog.currentDatabase)
+    LOGGER.audit(s"Alter table change data type request has been received for $dbName.$tableName")
+    val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
+    var locks = List.empty[ICarbonLock]
+    // get the latest carbon table and check for column existence
+    var carbonTable: CarbonTable = null
+    var timeStamp = 0L
+    try {
+      locks = AlterTableUtil
+        .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
+      val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      carbonTable = metastore
+        .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+        .tableMeta.carbonTable
+      val columnName = alterTableDataTypeChangeModel.columnName
+      val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible)
+      if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) {
+        LOGGER.audit(s"Alter table change data type request has failed. " +
+                     s"Column $columnName does not exist")
+        sys.error(s"Column does not exist: $columnName")
+      }
+      val carbonColumn = carbonColumns.filter(_.getColName.equalsIgnoreCase(columnName))
+      if (carbonColumn.size == 1) {
+        CarbonScalaUtil
+          .validateColumnDataType(alterTableDataTypeChangeModel.dataTypeInfo, carbonColumn.head)
+      } else {
+        LOGGER.audit(s"Alter table change data type request has failed. " +
+                     s"Column $columnName is invalid")
+        sys.error(s"Invalid Column: $columnName")
+      }
+      // read the latest schema file
+      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+        carbonTable.getCarbonTableIdentifier)
+      val tableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+      // maintain the added column for schema evolution history
+      var addColumnSchema: ColumnSchema = null
+      var deletedColumnSchema: ColumnSchema = null
+      val columnSchemaList = tableInfo.fact_table.table_columns.asScala.filter(!_.isInvisible)
+      columnSchemaList.foreach { columnSchema =>
+        if (columnSchema.column_name.equalsIgnoreCase(columnName)) {
+          deletedColumnSchema = columnSchema.deepCopy
+          columnSchema.setData_type(DataTypeConverterUtil
+            .convertToThriftDataType(alterTableDataTypeChangeModel.dataTypeInfo.dataType))
+          columnSchema.setPrecision(alterTableDataTypeChangeModel.dataTypeInfo.precision)
+          columnSchema.setScale(alterTableDataTypeChangeModel.dataTypeInfo.scale)
+          addColumnSchema = columnSchema
+        }
+      }
+      timeStamp = System.currentTimeMillis
+      val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
+      schemaEvolutionEntry.setAdded(List(addColumnSchema).asJava)
+      schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava)
+      tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
+        .setTime_stamp(System.currentTimeMillis)
+      AlterTableUtil
+        .updateSchemaInfo(carbonTable,
+          schemaEvolutionEntry,
+          tableInfo)(sparkSession,
+          sparkSession.sessionState.asInstanceOf[CarbonSessionState])
+      LOGGER.info(s"Alter table for data type change is successful for table $dbName.$tableName")
+      LOGGER.audit(s"Alter table for data type change is successful for table $dbName.$tableName")
+    } catch {
+      case e: Exception => LOGGER
+        .error("Alter table change datatype failed : " + e.getMessage)
+        if (carbonTable != null) {
+          AlterTableUtil.revertDataTypeChanges(dbName, tableName, timeStamp)(sparkSession)
+        }
+        sys.error(s"Alter table data type change operation failed: ${e.getMessage}")
+    } finally {
+      // release lock after command execution completion
+      AlterTableUtil.releaseLocks(locks)
+    }
+    Seq.empty
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
new file mode 100644
index 0000000..3ac23f7
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.schema
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{AlterTableDropColumnModel, RunnableCommand}
+import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
+import org.apache.spark.util.AlterTableUtil
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.format.SchemaEvolutionEntry
+import org.apache.carbondata.spark.rdd.AlterTableDropColumnRDD
+
+private[sql] case class CarbonAlterTableDropColumnCommand(
+    alterTableDropColumnModel: AlterTableDropColumnModel)
+  extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    val tableName = alterTableDropColumnModel.tableName
+    val dbName = alterTableDropColumnModel.databaseName
+      .getOrElse(sparkSession.catalog.currentDatabase)
+    LOGGER.audit(s"Alter table drop columns request has been received for $dbName.$tableName")
+    var locks = List.empty[ICarbonLock]
+    var timeStamp = 0L
+    val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
+    // get the latest carbon table and check for column existence
+    var carbonTable: CarbonTable = null
+    try {
+      locks = AlterTableUtil
+        .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
+      val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      carbonTable = metastore
+        .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+        .tableMeta.carbonTable
+      val partitionInfo = carbonTable.getPartitionInfo(tableName)
+      if (partitionInfo != null) {
+        val partitionColumnSchemaList = partitionInfo.getColumnSchemaList.asScala
+          .map(_.getColumnName)
+        // check each column existence in the table
+        val partitionColumns = alterTableDropColumnModel.columns.filter {
+          tableColumn => partitionColumnSchemaList.contains(tableColumn)
+        }
+        if (partitionColumns.nonEmpty) {
+          throw new UnsupportedOperationException("Partition columns cannot be dropped: " +
+                                                  s"$partitionColumns")
+        }
+      }
+      val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala
+      var dictionaryColumns = Seq[org.apache.carbondata.core.metadata.schema.table.column
+      .ColumnSchema]()
+      var keyColumnCountToBeDeleted = 0
+      // TODO: if deleted column list includes bucketted column throw an error
+      alterTableDropColumnModel.columns.foreach { column =>
+        var columnExist = false
+        tableColumns.foreach { tableColumn =>
+          // column should not be already deleted and should exist in the table
+          if (!tableColumn.isInvisible && column.equalsIgnoreCase(tableColumn.getColName)) {
+            if (tableColumn.isDimension) {
+              keyColumnCountToBeDeleted += 1
+              if (tableColumn.hasEncoding(Encoding.DICTIONARY)) {
+                dictionaryColumns ++= Seq(tableColumn.getColumnSchema)
+              }
+            }
+            columnExist = true
+          }
+        }
+        if (!columnExist) {
+          sys.error(s"Column $column does not exists in the table $dbName.$tableName")
+        }
+      }
+      // take the total key column count. key column to be deleted should not
+      // be >= key columns in schema
+      val totalKeyColumnInSchema = tableColumns.count {
+        tableColumn => !tableColumn.isInvisible && tableColumn.isDimension
+      }
+      if (keyColumnCountToBeDeleted >= totalKeyColumnInSchema) {
+        sys.error(s"Alter drop operation failed. AtLeast one key column should exist after drop.")
+      }
+      // read the latest schema file
+      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+        carbonTable.getCarbonTableIdentifier)
+      val tableInfo: org.apache.carbondata.format.TableInfo =
+        metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+      // maintain the deleted columns for schema evolution history
+      var deletedColumnSchema = ListBuffer[org.apache.carbondata.format.ColumnSchema]()
+      val columnSchemaList = tableInfo.fact_table.table_columns.asScala
+      alterTableDropColumnModel.columns.foreach { column =>
+        columnSchemaList.foreach { columnSchema =>
+          if (!columnSchema.invisible && column.equalsIgnoreCase(columnSchema.column_name)) {
+            deletedColumnSchema += columnSchema.deepCopy
+            columnSchema.invisible = true
+          }
+        }
+      }
+      // add deleted columns to schema evolution history and update the schema
+      timeStamp = System.currentTimeMillis
+      val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
+      schemaEvolutionEntry.setRemoved(deletedColumnSchema.toList.asJava)
+      AlterTableUtil
+        .updateSchemaInfo(carbonTable,
+          schemaEvolutionEntry,
+          tableInfo)(sparkSession,
+          sparkSession.sessionState.asInstanceOf[CarbonSessionState])
+      // TODO: 1. add check for deletion of index tables
+      // delete dictionary files for dictionary column and clear dictionary cache from memory
+      new AlterTableDropColumnRDD(sparkSession.sparkContext,
+        dictionaryColumns,
+        carbonTable.getCarbonTableIdentifier,
+        carbonTable.getStorePath).collect()
+      LOGGER.info(s"Alter table for drop columns is successful for table $dbName.$tableName")
+      LOGGER.audit(s"Alter table for drop columns is successful for table $dbName.$tableName")
+    } catch {
+      case e: Exception => LOGGER
+        .error("Alter table drop columns failed : " + e.getMessage)
+        if (carbonTable != null) {
+          AlterTableUtil.revertDropColumnChanges(dbName, tableName, timeStamp)(sparkSession)
+        }
+        sys.error(s"Alter table drop column operation failed: ${e.getMessage}")
+    } finally {
+      // release lock after command execution completion
+      AlterTableUtil.releaseLocks(locks)
+    }
+    Seq.empty
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
new file mode 100644
index 0000000..88cf212
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.schema
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.{AlterTableRenameModel, RunnableCommand}
+import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
+import org.apache.spark.util.AlterTableUtil
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.format.SchemaEvolutionEntry
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+
+private[sql] case class CarbonAlterTableRenameCommand(
+    alterTableRenameModel: AlterTableRenameModel)
+  extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    val oldTableIdentifier = alterTableRenameModel.oldTableIdentifier
+    val newTableIdentifier = alterTableRenameModel.newTableIdentifier
+    val oldDatabaseName = oldTableIdentifier.database
+      .getOrElse(sparkSession.catalog.currentDatabase)
+    val newDatabaseName = newTableIdentifier.database
+      .getOrElse(sparkSession.catalog.currentDatabase)
+    if (!oldDatabaseName.equalsIgnoreCase(newDatabaseName)) {
+      throw new MalformedCarbonCommandException("Database name should be same for both tables")
+    }
+    val tableExists = sparkSession.catalog.tableExists(oldDatabaseName, newTableIdentifier.table)
+    if (tableExists) {
+      throw new MalformedCarbonCommandException(s"Table with name $newTableIdentifier " +
+                                                s"already exists")
+    }
+    val oldTableName = oldTableIdentifier.table.toLowerCase
+    val newTableName = newTableIdentifier.table.toLowerCase
+    LOGGER.audit(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
+    LOGGER.info(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
+    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val relation: CarbonRelation =
+      metastore.lookupRelation(oldTableIdentifier.database, oldTableName)(sparkSession)
+        .asInstanceOf[CarbonRelation]
+    if (relation == null) {
+      LOGGER.audit(s"Rename table request has failed. " +
+                   s"Table $oldDatabaseName.$oldTableName does not exist")
+      sys.error(s"Table $oldDatabaseName.$oldTableName does not exist")
+    }
+    val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
+      LockUsage.COMPACTION_LOCK,
+      LockUsage.DELETE_SEGMENT_LOCK,
+      LockUsage.CLEAN_FILES_LOCK,
+      LockUsage.DROP_TABLE_LOCK)
+    var locks = List.empty[ICarbonLock]
+    var timeStamp = 0L
+    var carbonTable: CarbonTable = null
+    try {
+      locks = AlterTableUtil
+        .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired)(
+          sparkSession)
+      val tableMeta = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
+        .asInstanceOf[CarbonRelation].tableMeta
+      carbonTable = tableMeta.carbonTable
+      // get the latest carbon table and check for column existence
+      val carbonTablePath = CarbonStorePath.
+        getCarbonTablePath(AbsoluteTableIdentifier.fromTablePath(tableMeta.tablePath))
+      val tableMetadataFile = carbonTablePath.getPath
+      val tableInfo: org.apache.carbondata.format.TableInfo =
+        metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+      val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
+      schemaEvolutionEntry.setTableName(newTableName)
+      timeStamp = System.currentTimeMillis()
+      schemaEvolutionEntry.setTime_stamp(timeStamp)
+      renameBadRecords(oldTableName, newTableName, oldDatabaseName)
+      val fileType = FileFactory.getFileType(tableMetadataFile)
+      if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
+        val rename = FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
+          .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
+                       newTableName)
+        if (!rename) {
+          renameBadRecords(newTableName, oldTableName, oldDatabaseName)
+          sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName")
+        }
+      }
+      val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
+        newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
+      val newTablePath = metastore.updateTableSchema(newTableIdentifier,
+        carbonTable.getCarbonTableIdentifier,
+        tableInfo,
+        schemaEvolutionEntry,
+        tableMeta.tablePath)(sparkSession)
+      metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
+      sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
+        .runSqlHive(
+          s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $oldDatabaseName.$newTableName")
+      sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
+        .runSqlHive(
+          s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" +
+          s"('tableName'='$newTableName', " +
+          s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')")
+      sparkSession.catalog.refreshTable(TableIdentifier(newTableName,
+        Some(oldDatabaseName)).quotedString)
+      LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName")
+      LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
+    } catch {
+      case e: Exception =>
+        LOGGER.error(e, "Rename table failed: " + e.getMessage)
+        if (carbonTable != null) {
+          AlterTableUtil
+            .revertRenameTableChanges(oldTableIdentifier,
+              newTableName,
+              carbonTable.getStorePath,
+              carbonTable.getCarbonTableIdentifier.getTableId,
+              timeStamp)(
+              sparkSession)
+          renameBadRecords(newTableName, oldTableName, oldDatabaseName)
+        }
+        sys.error(s"Alter table rename table operation failed: ${e.getMessage}")
+    } finally {
+      // release lock after command execution completion
+      AlterTableUtil.releaseLocks(locks)
+      // case specific to rename table as after table rename old table path will not be found
+      if (carbonTable != null) {
+        AlterTableUtil
+          .releaseLocksManually(locks,
+            locksToBeAcquired,
+            oldDatabaseName,
+            newTableName,
+            carbonTable.getStorePath)
+      }
+    }
+    Seq.empty
+  }
+
+  private def renameBadRecords(
+      oldTableName: String,
+      newTableName: String,
+      dataBaseName: String): Unit = {
+    val oldPath = CarbonUtil
+      .getBadLogPath(dataBaseName + CarbonCommonConstants.FILE_SEPARATOR + oldTableName)
+    val newPath = CarbonUtil
+      .getBadLogPath(dataBaseName + CarbonCommonConstants.FILE_SEPARATOR + newTableName)
+    val fileType = FileFactory.getFileType(oldPath)
+    if (FileFactory.isFileExist(oldPath, fileType)) {
+      val renameSuccess = FileFactory.getCarbonFile(oldPath, fileType)
+        .renameForce(newPath)
+      if (!renameSuccess) {
+        sys.error(s"BadRecords Folder Rename Failed for table $dataBaseName.$oldTableName")
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index bdfaa5a..bf13e41 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.command.management.{AlterTableCompactionCommand, CarbonShowLoadsCommand, LoadTableByInsertCommand, LoadTableCommand}
 import org.apache.spark.sql.execution.command.partition.ShowCarbonPartitionsCommand
-import org.apache.spark.sql.execution.command.schema.{AlterTableAddColumnCommand, AlterTableDataTypeChangeCommand, AlterTableDropColumnCommand, AlterTableRenameTableCommand}
+import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand, CarbonAlterTableRenameCommand}
 import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
 
 import org.apache.carbondata.core.util.CarbonUtil
@@ -56,7 +56,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
             sparkSession)
         if (isCarbonTable) {
           val renameModel = AlterTableRenameModel(tableIdentifier, newTableIdentifier)
-          ExecutedCommandExec(AlterTableRenameTableCommand(renameModel)) :: Nil
+          ExecutedCommandExec(CarbonAlterTableRenameCommand(renameModel)) :: Nil
         } else {
           ExecutedCommandExec(alter) :: Nil
         }
@@ -98,7 +98,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           throw new MalformedCarbonCommandException(
             "Operation not allowed : " + altertablemodel.alterSql)
         }
-      case dataTypeChange@AlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel) =>
+      case dataTypeChange@CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel) =>
         val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
           .tableExists(TableIdentifier(alterTableChangeDataTypeModel.tableName,
             alterTableChangeDataTypeModel.databaseName))(sparkSession)
@@ -107,7 +107,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         } else {
           throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
         }
-      case addColumn@AlterTableAddColumnCommand(alterTableAddColumnsModel) =>
+      case addColumn@CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel) =>
         val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
           .tableExists(TableIdentifier(alterTableAddColumnsModel.tableName,
             alterTableAddColumnsModel.databaseName))(sparkSession)
@@ -116,7 +116,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         } else {
           throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
         }
-      case dropColumn@AlterTableDropColumnCommand(alterTableDropColumnModel) =>
+      case dropColumn@CarbonAlterTableDropColumnCommand(alterTableDropColumnModel) =>
         val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
           .tableExists(TableIdentifier(alterTableDropColumnModel.tableName,
             alterTableDropColumnModel.databaseName))(sparkSession)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
index 0f0bc24..9ebf47e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
@@ -21,8 +21,9 @@ import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
-import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.command.{AlterTableRenameCommand, ExecutedCommandExec}
 import org.apache.spark.sql.execution.command.mutation.{DeleteExecution, ProjectForDeleteCommand, ProjectForUpdateCommand}
+import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
 import org.apache.spark.sql.hive.CarbonRelation
 
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -34,12 +35,36 @@ private[sql] class StreamingTableStrategy(sparkSession: SparkSession) extends Sp
 
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
     plan match {
-      case update@ProjectForUpdateCommand(_, tableIdentifier) =>
-        rejectIfStreamingTable(DeleteExecution.getTableIdentifier(tableIdentifier), "Data update")
-        ExecutedCommandExec(update) :: Nil
-      case delete@ProjectForDeleteCommand(_, tableIdentifier, _) =>
-        rejectIfStreamingTable(DeleteExecution.getTableIdentifier(tableIdentifier), "Date delete")
-        ExecutedCommandExec(delete) :: Nil
+      case ProjectForUpdateCommand(_, tableIdentifier) =>
+        rejectIfStreamingTable(
+          DeleteExecution.getTableIdentifier(tableIdentifier),
+          "Data update")
+        Nil
+      case ProjectForDeleteCommand(_, tableIdentifier, _) =>
+        rejectIfStreamingTable(
+          DeleteExecution.getTableIdentifier(tableIdentifier),
+          "Date delete")
+        Nil
+      case CarbonAlterTableAddColumnCommand(model) =>
+        rejectIfStreamingTable(
+          new TableIdentifier(model.tableName, model.databaseName),
+          "Alter table add column")
+        Nil
+      case CarbonAlterTableDropColumnCommand(model) =>
+        rejectIfStreamingTable(
+          new TableIdentifier(model.tableName, model.databaseName),
+          "Alter table drop column")
+        Nil
+      case CarbonAlterTableDataTypeChangeCommand(model) =>
+        rejectIfStreamingTable(
+          new TableIdentifier(model.tableName, model.databaseName),
+          "Alter table change datatype")
+        Nil
+      case AlterTableRenameCommand(oldTableIdentifier, _, _) =>
+        rejectIfStreamingTable(
+          oldTableIdentifier,
+          "Alter rename table")
+        Nil
       case _ => Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 9c87b8b..fc2ed41 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.command.management.{AlterTableCompactionCommand, CleanFilesCommand, DeleteLoadByIdCommand, DeleteLoadByLoadDateCommand, LoadTableCommand}
 import org.apache.spark.sql.execution.command.partition.{AlterTableDropCarbonPartitionCommand, AlterTableSplitCarbonPartitionCommand}
-import org.apache.spark.sql.execution.command.schema.{AlterTableAddColumnCommand, AlterTableDataTypeChangeCommand, AlterTableDropColumnCommand}
+import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
 import org.apache.spark.sql.types.StructField
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -326,7 +326,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
             table.toLowerCase,
             columnName.toLowerCase,
             columnNameCopy.toLowerCase)
-        AlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel)
+        CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel)
     }
 
   protected lazy val alterTableAddColumns: Parser[LogicalPlan] =
@@ -395,7 +395,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
           tableModel.dimCols,
           tableModel.msrCols,
           tableModel.highcardinalitydims.getOrElse(Seq.empty))
-        AlterTableAddColumnCommand(alterTableAddColumnsModel)
+        CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel)
     }
 
   private def checkFieldDefaultValue(fieldName: String, defaultValueColumnName: String): Boolean = {
@@ -419,7 +419,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
         val alterTableDropColumnModel = AlterTableDropColumnModel(convertDbNameToLowerCase(dbName),
           table.toLowerCase,
           values.map(_.toLowerCase))
-        AlterTableDropColumnCommand(alterTableDropColumnModel)
+        CarbonAlterTableDropColumnCommand(alterTableDropColumnModel)
     }
 
   def getFields(schema: Seq[StructField]): Seq[Field] = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index b733d4f..d5f9426 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -80,6 +80,21 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     }
   }
 
+  test("test blocking alter table operation on streaming table") {
+    intercept[MalformedCarbonCommandException] {
+      sql("""ALTER TABLE source ADD COLUMNS (c6 string)""").show()
+    }
+    intercept[MalformedCarbonCommandException] {
+      sql("""ALTER TABLE source DROP COLUMNS (c1)""").show()
+    }
+    intercept[MalformedCarbonCommandException] {
+      sql("""ALTER TABLE source RENAME to t""").show()
+    }
+    intercept[MalformedCarbonCommandException] {
+      sql("""ALTER TABLE source CHANGE c1 c1 int""").show()
+    }
+  }
+
   override def afterAll {
     sql("USE default")
     sql("DROP DATABASE IF EXISTS streaming CASCADE")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
index 29de05b..00170e2 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
@@ -51,7 +51,7 @@ class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll {
   }
 
   test("test to revert table name on failure") {
-    intercept[RuntimeException] {
+    val exception = intercept[RuntimeException] {
       new File(TestQueryExecutor.warehouse + "/reverttest_fail").mkdir()
       sql("alter table reverttest rename to reverttest_fail")
       new File(TestQueryExecutor.warehouse + "/reverttest_fail").delete()