You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/12/20 16:21:47 UTC

carbondata git commit: [CARBONDATA-3149]Support alter table column rename

Repository: carbondata
Updated Branches:
  refs/heads/master 53dbb459d -> 34923db0e


[CARBONDATA-3149]Support alter table column rename

This PR is to support column rename feature in carbondata. Carbon already supports datatype change, alter table add column and drop column. This PR uses same DDL as datatype change and supports the column rename.

Any column canbe renamed, since the same DDL is used for rename adn datatype change, both operation can be done together, during that case, the datatupe change validation and limitation remains the same as before

This closes #2990


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/34923db0
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/34923db0
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/34923db0

Branch: refs/heads/master
Commit: 34923db0e5d4565e841d7b3dddc3872d327263c6
Parents: 53dbb45
Author: akashrn5 <ak...@gamil.com>
Authored: Fri Dec 14 16:50:09 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Dec 21 00:21:30 2018 +0800

----------------------------------------------------------------------
 .../core/features/TableOperation.java           |   1 +
 .../bloom/BloomCoarseGrainDataMapFactory.java   |   7 +-
 .../lucene/LuceneFineGrainDataMapFactory.java   |   2 +
 .../lucene/LuceneFineGrainDataMapSuite.scala    |   5 +
 .../carbondata/events/AlterTableEvents.scala    |   4 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |  23 +-
 .../command/carbonTableSchemaCommon.scala       |   8 +-
 .../spark/sql/hive/SqlAstBuilderHelper.scala    |  24 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |   3 +-
 .../preaaggregate/PreAggregateListeners.scala   |  11 +-
 ...terTableColRenameDataTypeChangeCommand.scala | 324 ++++++++++++++++++
 .../CarbonAlterTableDataTypeChangeCommand.scala | 180 ----------
 .../sql/execution/strategy/DDLStrategy.scala    |  14 +-
 .../strategy/StreamingTableStrategy.scala       |  12 +-
 .../sql/parser/CarbonSpark2SqlParser.scala      |  24 +-
 .../org/apache/spark/util/AlterTableUtil.scala  |  47 ++-
 .../bloom/BloomCoarseGrainDataMapSuite.scala    |   9 +-
 .../carbondata/TestStreamingTableOpName.scala   |   4 +
 .../AlterTableColumnRenameTestCase.scala        | 342 +++++++++++++++++++
 .../vectorreader/ChangeDataTypeTestCases.scala  |   2 +-
 20 files changed, 810 insertions(+), 236 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/34923db0/core/src/main/java/org/apache/carbondata/core/features/TableOperation.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/features/TableOperation.java b/core/src/main/java/org/apache/carbondata/core/features/TableOperation.java
index 3d13901..78c6355 100644
--- a/core/src/main/java/org/apache/carbondata/core/features/TableOperation.java
+++ b/core/src/main/java/org/apache/carbondata/core/features/TableOperation.java
@@ -22,6 +22,7 @@ public enum TableOperation {
   ALTER_DROP,
   ALTER_ADD_COLUMN,
   ALTER_CHANGE_DATATYPE,
+  ALTER_COLUMN_RENAME,
   STREAMING,
   UPDATE,
   DELETE,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34923db0/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
index 4064d53..e730635 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
@@ -385,6 +385,8 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
         return false;
       case ALTER_CHANGE_DATATYPE:
         return true;
+      case ALTER_COLUMN_RENAME:
+        return true;
       case STREAMING:
         return false;
       case DELETE:
@@ -415,8 +417,9 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
         }
         return false;
       }
-      case ALTER_CHANGE_DATATYPE: {
-        // alter table change one column datatype
+      case ALTER_CHANGE_DATATYPE:
+      case ALTER_COLUMN_RENAME: {
+        // alter table change one column datatype, or rename
         // will be blocked if the column in bloomfilter datamap
         String columnToChangeDatatype = (String) targets[0];
         List<String> indexedColumnNames = dataMapMeta.getIndexedColumnNames();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34923db0/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
index 809d68b..d8a14d8 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
@@ -98,6 +98,8 @@ public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<Fine
         return true;
       case ALTER_CHANGE_DATATYPE:
         return true;
+      case ALTER_COLUMN_RENAME:
+        return true;
       case STREAMING:
         return false;
       case DELETE:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34923db0/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
index 0230a0c..46ffefa 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
@@ -658,6 +658,11 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
       sql("delete from datamap_test7 where name = 'n10'").show()
     }
     assert(ex6.getMessage.contains("Delete operation is not supported"))
+
+    val ex7 = intercept[MalformedCarbonCommandException] {
+      sql("alter table datamap_test7 change id test int")
+    }
+    assert(ex7.getMessage.contains("alter table column rename is not supported"))
   }
 
   ignore("test lucene fine grain multiple data map on table") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34923db0/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
index 1810df6..c99237e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
@@ -42,7 +42,7 @@ case class AlterTableDropColumnPreEvent(
  * @param carbonTable
  * @param alterTableDataTypeChangeModel
  */
-case class AlterTableDataTypeChangePreEvent(
+case class AlterTableColRenameAndDataTypeChangePreEvent(
     sparkSession: SparkSession,
     carbonTable: CarbonTable,
         alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel)
@@ -54,7 +54,7 @@ case class AlterTableDataTypeChangePreEvent(
  * @param carbonTable
  * @param alterTableDataTypeChangeModel
  */
-case class AlterTableDataTypeChangePostEvent(
+case class AlterTableColRenameAndDataTypeChangePostEvent(
     sparkSession: SparkSession,
     carbonTable: CarbonTable,
     alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34923db0/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index b21aee1..35bf335 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -1483,20 +1483,23 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
   /**
    * This method will parse the given data type and validate against the allowed data types
    *
-   * @param dataType
-   * @param values
-   * @return
+   * @param dataType datatype string given by the user in DDL
+   * @param values values defined when the decimal datatype is given in DDL
+   * @return DataTypeInfo object with datatype, precision and scale
    */
-  def parseDataType(dataType: String, values: Option[List[(Int, Int)]]): DataTypeInfo = {
+  def parseDataType(
+      dataType: String,
+      values: Option[List[(Int, Int)]],
+      isColumnRename: Boolean): DataTypeInfo = {
+    var precision: Int = 0
+    var scale: Int = 0
     dataType match {
       case "bigint" | "long" =>
         if (values.isDefined) {
           throw new MalformedCarbonCommandException("Invalid data type")
         }
-        DataTypeInfo(dataType)
+        DataTypeInfo(DataTypeConverterUtil.convertToCarbonType(dataType).getName.toLowerCase)
       case "decimal" =>
-        var precision: Int = 0
-        var scale: Int = 0
         if (values.isDefined) {
           precision = values.get(0)._1
           scale = values.get(0)._2
@@ -1511,7 +1514,11 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
         }
         DataTypeInfo("decimal", precision, scale)
       case _ =>
-        throw new MalformedCarbonCommandException("Data type provided is invalid.")
+        if (isColumnRename) {
+          DataTypeInfo(DataTypeConverterUtil.convertToCarbonType(dataType).getName.toLowerCase)
+        } else {
+          throw new MalformedCarbonCommandException("Data type provided is invalid.")
+        }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34923db0/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 14efabb..2ce9d89 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -171,11 +171,17 @@ case class DropPartitionCallableModel(carbonLoadModel: CarbonLoadModel,
 
 case class DataTypeInfo(dataType: String, precision: Int = 0, scale: Int = 0)
 
+class AlterTableColumnRenameModel(columnName: String,
+    newColumnName: String,
+    isColumnRename: Boolean)
+
 case class AlterTableDataTypeChangeModel(dataTypeInfo: DataTypeInfo,
     databaseName: Option[String],
     tableName: String,
     columnName: String,
-    newColumnName: String)
+    newColumnName: String,
+    isColumnRename: Boolean)
+  extends AlterTableColumnRenameModel(columnName, newColumnName, isColumnRename)
 
 case class AlterTableRenameModel(
     oldTableIdentifier: TableIdentifier,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34923db0/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
index 8f1477c..763a13e 100644
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
@@ -19,16 +19,15 @@ package org.apache.spark.sql.hive
 
 import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, withOrigin}
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext,ChangeColumnContext, CreateTableContext, ShowTablesContext}
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, ChangeColumnContext, CreateTableContext, ShowTablesContext}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel,AlterTableDataTypeChangeModel}
-import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand,CarbonAlterTableDataTypeChangeCommand}
+import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel}
+import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableColRenameDataTypeChangeCommand}
 import org.apache.spark.sql.execution.command.table.{CarbonExplainCommand, CarbonShowTablesCommand}
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 import org.apache.spark.sql.types.DecimalType
 
-import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 
@@ -38,9 +37,10 @@ trait SqlAstBuilderHelper extends SparkSqlAstBuilder {
   override def visitChangeColumn(ctx: ChangeColumnContext): LogicalPlan = {
 
     val newColumn = visitColType(ctx.colType)
-    if (!ctx.identifier.getText.equalsIgnoreCase(newColumn.name)) {
-      throw new MalformedCarbonCommandException(
-        "Column names provided are different. Both the column names should be same")
+    val isColumnRename = if (!ctx.identifier.getText.equalsIgnoreCase(newColumn.name)) {
+      true
+    } else {
+      false
     }
 
     val (typeString, values): (String, Option[List[(Int, Int)]]) = newColumn.dataType match {
@@ -48,15 +48,17 @@ trait SqlAstBuilderHelper extends SparkSqlAstBuilder {
       case _ => (newColumn.dataType.typeName.toLowerCase, None)
     }
 
-    val alterTableChangeDataTypeModel =
-      AlterTableDataTypeChangeModel(new CarbonSpark2SqlParser().parseDataType(typeString, values),
+    val alterTableColRenameAndDataTypeChangeModel =
+      AlterTableDataTypeChangeModel(new CarbonSpark2SqlParser()
+        .parseDataType(typeString, values, isColumnRename),
         new CarbonSpark2SqlParser()
           .convertDbNameToLowerCase(Option(ctx.tableIdentifier().db).map(_.getText)),
         ctx.tableIdentifier().table.getText.toLowerCase,
         ctx.identifier.getText.toLowerCase,
-        newColumn.name.toLowerCase)
+        newColumn.name.toLowerCase,
+        isColumnRename)
 
-    CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel)
+    CarbonAlterTableColRenameDataTypeChangeCommand(alterTableColRenameAndDataTypeChangeModel)
   }
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34923db0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index e114e06..a7677d7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -166,7 +166,8 @@ object CarbonEnv {
       .addListener(classOf[DeleteFromTablePreEvent], DeletePreAggregatePreListener)
       .addListener(classOf[AlterTableDropColumnPreEvent], PreAggregateDropColumnPreListener)
       .addListener(classOf[AlterTableRenamePreEvent], RenameTablePreListener)
-      .addListener(classOf[AlterTableDataTypeChangePreEvent], PreAggregateDataTypeChangePreListener)
+      .addListener(classOf[AlterTableColRenameAndDataTypeChangePreEvent],
+        PreAggregateDataTypeChangePreListener)
       .addListener(classOf[AlterTableAddColumnPreEvent], PreAggregateAddColumnsPreListener)
       .addListener(classOf[LoadTablePreExecutionEvent], LoadPreAggregateTablePreListener)
       .addListener(classOf[AlterTableCompactionPreStatusUpdateEvent],

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34923db0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 3038c93..b2d21cc 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -729,9 +729,11 @@ object PreAggregateDataTypeChangePreListener extends OperationEventListener {
    * @param operationContext
    */
   override def onEvent(event: Event, operationContext: OperationContext): Unit = {
-    val dataTypeChangePreListener = event.asInstanceOf[AlterTableDataTypeChangePreEvent]
-    val carbonTable = dataTypeChangePreListener.carbonTable
-    val alterTableDataTypeChangeModel = dataTypeChangePreListener.alterTableDataTypeChangeModel
+    val colRenameDataTypeChangePreListener = event
+      .asInstanceOf[AlterTableColRenameAndDataTypeChangePreEvent]
+    val carbonTable = colRenameDataTypeChangePreListener.carbonTable
+    val alterTableDataTypeChangeModel = colRenameDataTypeChangePreListener
+      .alterTableDataTypeChangeModel
     val columnToBeAltered: String = alterTableDataTypeChangeModel.columnName
     if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
       val dataMapSchemas = carbonTable.getTableInfo.getDataMapSchemaList
@@ -748,7 +750,8 @@ object PreAggregateDataTypeChangePreListener extends OperationEventListener {
     }
     if (carbonTable.isChildDataMap) {
       throw new UnsupportedOperationException(
-        s"Cannot change data type for columns in pre-aggregate table ${ carbonTable.getDatabaseName
+        s"Cannot change data type or rename column for columns in pre-aggregate table ${
+          carbonTable.getDatabaseName
         }.${ carbonTable.getTableName }")
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34923db0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
new file mode 100644
index 0000000..49d5093
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.schema
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel, DataTypeInfo,
+  MetadataCommand}
+import org.apache.spark.sql.hive.CarbonSessionCatalog
+import org.apache.spark.util.AlterTableUtil
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.features.TableOperation
+import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.metadata.datatype.DecimalType
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
+import org.apache.carbondata.events.{AlterTableColRenameAndDataTypeChangePostEvent,
+  AlterTableColRenameAndDataTypeChangePreEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo}
+import org.apache.carbondata.spark.util.DataTypeConverterUtil
+
+abstract class CarbonAlterTableColumnRenameCommand(oldColumnName: String, newColumnName: String)
+  extends MetadataCommand {
+
+  protected def validColumnsForRenaming(carbonColumns: mutable.Buffer[CarbonColumn],
+      oldCarbonColumn: CarbonColumn,
+      carbonTable: CarbonTable): Unit = {
+    // check whether new column name is already an existing column name
+    if (carbonColumns.exists(_.getColName.equalsIgnoreCase(newColumnName))) {
+      throw new MalformedCarbonCommandException(s"Column Rename Operation failed. New " +
+                                                s"column name $newColumnName already exists" +
+                                                s" in table ${ carbonTable.getTableName }")
+    }
+
+    // if the column rename is for complex column, block the operation
+    if (oldCarbonColumn.isComplex) {
+      throw new MalformedCarbonCommandException(s"Column Rename Operation failed. Rename " +
+                                                s"column is unsupported for complex datatype " +
+                                                s"column ${ oldCarbonColumn.getColName }")
+    }
+
+    // if column rename operation is on partition column, then fail the rename operation
+    if (null != carbonTable.getPartitionInfo) {
+      val partitionColumns = carbonTable.getPartitionInfo.getColumnSchemaList
+      partitionColumns.asScala.foreach {
+        col =>
+          if (col.getColumnName.equalsIgnoreCase(oldColumnName)) {
+            throw new MalformedCarbonCommandException(
+              s"Column Rename Operation failed. Renaming " +
+              s"the partition column $newColumnName is not " +
+              s"allowed")
+          }
+      }
+    }
+
+  }
+}
+
+private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
+    alterTableColRenameAndDataTypeChangeModel: AlterTableDataTypeChangeModel,
+    childTableColumnRename: Boolean = false)
+  extends CarbonAlterTableColumnRenameCommand(alterTableColRenameAndDataTypeChangeModel.columnName,
+    alterTableColRenameAndDataTypeChangeModel.newColumnName) {
+
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    val tableName = alterTableColRenameAndDataTypeChangeModel.tableName
+    val dbName = alterTableColRenameAndDataTypeChangeModel.databaseName
+      .getOrElse(sparkSession.catalog.currentDatabase)
+    var isDataTypeChange = false
+    setAuditTable(dbName, tableName)
+    setAuditInfo(Map(
+      "column" -> alterTableColRenameAndDataTypeChangeModel.columnName,
+      "newColumn" -> alterTableColRenameAndDataTypeChangeModel.newColumnName,
+      "newType" -> alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.dataType))
+    val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
+    var locks = List.empty[ICarbonLock]
+    // get the latest carbon table and check for column existence
+    var carbonTable: CarbonTable = null
+    var timeStamp = 0L
+    try {
+      locks = AlterTableUtil
+        .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
+      val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+      carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+      if (!alterTableColRenameAndDataTypeChangeModel.isColumnRename &&
+          !carbonTable.canAllow(carbonTable, TableOperation.ALTER_CHANGE_DATATYPE,
+            alterTableColRenameAndDataTypeChangeModel.columnName)) {
+        throw new MalformedCarbonCommandException(
+          "alter table change datatype is not supported for index datamap")
+      }
+      if (alterTableColRenameAndDataTypeChangeModel.isColumnRename &&
+          !carbonTable.canAllow(carbonTable, TableOperation.ALTER_COLUMN_RENAME,
+            alterTableColRenameAndDataTypeChangeModel.columnName)) {
+        throw new MalformedCarbonCommandException(
+          "alter table column rename is not supported for index datamap")
+      }
+      val operationContext = new OperationContext
+      operationContext.setProperty("childTableColumnRename", childTableColumnRename)
+      val alterTableColRenameAndDataTypeChangePreEvent =
+        AlterTableColRenameAndDataTypeChangePreEvent(sparkSession, carbonTable,
+          alterTableColRenameAndDataTypeChangeModel)
+      OperationListenerBus.getInstance()
+        .fireEvent(alterTableColRenameAndDataTypeChangePreEvent, operationContext)
+      val newColumnName = alterTableColRenameAndDataTypeChangeModel.newColumnName.toLowerCase
+      val oldColumnName = alterTableColRenameAndDataTypeChangeModel.columnName.toLowerCase
+      val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible)
+      if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(oldColumnName))) {
+        throwMetadataException(dbName, tableName, s"Column does not exist: $oldColumnName")
+      }
+
+      val oldCarbonColumn = carbonColumns.filter(_.getColName.equalsIgnoreCase(oldColumnName))
+      if (oldCarbonColumn.size != 1) {
+        throwMetadataException(dbName, tableName, s"Invalid Column: $oldColumnName")
+      }
+      val newColumnPrecision = alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.precision
+      val newColumnScale = alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.scale
+      if (alterTableColRenameAndDataTypeChangeModel.isColumnRename) {
+        // validate the columns to be renamed
+        validColumnsForRenaming(carbonColumns, oldCarbonColumn.head, carbonTable)
+        // if the datatype is source datatype, then it is just a column rename operation, else set
+        // the isDataTypeChange flag to true
+        if (oldCarbonColumn.head.getDataType.getName
+          .equalsIgnoreCase(alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.dataType)) {
+          val newColumnPrecision =
+            alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.precision
+          val newColumnScale = alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.scale
+          // if the source datatype is decimal and there is change in precision and scale, then
+          // along with rename, datatype change is also required for the command, so set the
+          // isDataTypeChange flag to true in this case
+          if (oldCarbonColumn.head.getDataType.getName.equalsIgnoreCase("decimal") &&
+              (oldCarbonColumn.head.getDataType.asInstanceOf[DecimalType].getPrecision !=
+               newColumnPrecision ||
+               oldCarbonColumn.head.getDataType.asInstanceOf[DecimalType].getScale !=
+               newColumnScale)) {
+            isDataTypeChange = true
+          }
+        } else {
+          isDataTypeChange = true
+        }
+      } else {
+        isDataTypeChange = true
+      }
+      if (isDataTypeChange) {
+        validateColumnDataType(alterTableColRenameAndDataTypeChangeModel.dataTypeInfo,
+          oldCarbonColumn.head)
+      }
+      // read the latest schema file
+      val tableInfo: TableInfo =
+        metaStore.getThriftTableInfo(carbonTable)
+      // maintain the added column for schema evolution history
+      var addColumnSchema: ColumnSchema = null
+      var deletedColumnSchema: ColumnSchema = null
+      val schemaEvolutionEntry: SchemaEvolutionEntry = null
+      val columnSchemaList = tableInfo.fact_table.table_columns.asScala.filter(!_.isInvisible)
+
+      columnSchemaList.foreach { columnSchema =>
+        if (columnSchema.column_name.equalsIgnoreCase(oldColumnName)) {
+          deletedColumnSchema = columnSchema.deepCopy()
+          if (alterTableColRenameAndDataTypeChangeModel.isColumnRename) {
+            // if only column rename, just get the column schema and rename, make a
+            // schemaEvolutionEntry
+            columnSchema.setColumn_name(newColumnName)
+          }
+          // if the column rename is false,it will be just datatype change only, then change the
+          // datatype and make an evolution entry, If both the operations are happening, then rename
+          // change datatype and make an evolution entry
+          if (isDataTypeChange) {
+            // if only datatype change,  just get the column schema and change datatype, make a
+            // schemaEvolutionEntry
+            columnSchema.setData_type(
+              DataTypeConverterUtil.convertToThriftDataType(
+                alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.dataType))
+            columnSchema
+              .setPrecision(newColumnPrecision)
+            columnSchema.setScale(newColumnScale)
+          }
+          addColumnSchema = columnSchema
+          timeStamp = System.currentTimeMillis()
+          // make a new schema evolution entry after column rename or datatype change
+          AlterTableUtil
+            .addNewSchemaEvolutionEntry(schemaEvolutionEntry, timeStamp, addColumnSchema,
+              deletedColumnSchema)
+        }
+      }
+
+      // modify the table Properties with new column name if column rename happened
+      if (alterTableColRenameAndDataTypeChangeModel.isColumnRename) {
+        AlterTableUtil
+          .modifyTablePropertiesAfterColumnRename(tableInfo.fact_table.tableProperties.asScala,
+            oldColumnName, newColumnName)
+      }
+      updateSchemaAndRefreshTable(sparkSession,
+        carbonTable,
+        tableInfo,
+        addColumnSchema,
+        schemaEvolutionEntry)
+      val alterTableColRenameAndDataTypeChangePostEvent
+      : AlterTableColRenameAndDataTypeChangePostEvent =
+        AlterTableColRenameAndDataTypeChangePostEvent(sparkSession, carbonTable,
+          alterTableColRenameAndDataTypeChangeModel)
+      OperationListenerBus.getInstance
+        .fireEvent(alterTableColRenameAndDataTypeChangePostEvent, operationContext)
+      if (isDataTypeChange) {
+        LOGGER
+          .info(s"Alter table for column rename or data type change is successful for table " +
+                s"$dbName.$tableName")
+      }
+      if (alterTableColRenameAndDataTypeChangeModel.isColumnRename) {
+        LOGGER.info(s"Alter table for column rename is successful for table $dbName.$tableName")
+      }
+    } catch {
+      case e: Exception =>
+        if (carbonTable != null) {
+          AlterTableUtil
+            .revertColumnRenameAndDataTypeChanges(dbName, tableName, timeStamp)(sparkSession)
+        }
+        if (isDataTypeChange) {
+          throwMetadataException(dbName, tableName,
+            s"Alter table data type change operation failed: ${ e.getMessage }")
+        } else {
+          throwMetadataException(dbName, tableName,
+            s"Alter table data type change or column rename operation failed: ${ e.getMessage }")
+        }
+    } finally {
+      // release lock after command execution completion
+      AlterTableUtil.releaseLocks(locks)
+    }
+    Seq.empty
+  }
+
+  /**
+   * This method update the schema info and refresh the table
+   *
+   * @param sparkSession
+   * @param carbonTable              carbonTable
+   * @param tableInfo                tableInfo
+   * @param addColumnSchema          added column schema
+   * @param schemaEvolutionEntryList new SchemaEvolutionEntry
+   */
+  private def updateSchemaAndRefreshTable(sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      tableInfo: TableInfo,
+      addColumnSchema: ColumnSchema,
+      schemaEvolutionEntry: SchemaEvolutionEntry): Unit = {
+    val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val a = List(schemaConverter.fromExternalToWrapperColumnSchema(addColumnSchema))
+    val (tableIdentifier, schemaParts, cols) = AlterTableUtil.updateSchemaInfo(
+      carbonTable, schemaEvolutionEntry, tableInfo, Some(a))(sparkSession)
+    sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
+      .alterColumnChangeDataType(tableIdentifier, schemaParts, cols)
+    sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
+  }
+
+  /**
+   * This method will validate a column for its data type and check whether the column data type
+   * can be modified and update if conditions are met.
+   */
+  private def validateColumnDataType(
+      dataTypeInfo: DataTypeInfo,
+      carbonColumn: CarbonColumn): Unit = {
+    carbonColumn.getDataType.getName match {
+      case "INT" =>
+        if (!dataTypeInfo.dataType.equalsIgnoreCase("bigint") &&
+            !dataTypeInfo.dataType.equalsIgnoreCase("long")) {
+          sys.error(s"Given column ${ carbonColumn.getColName } with data type " +
+                    s"${ carbonColumn.getDataType.getName } cannot be modified. " +
+                    s"Int can only be changed to bigInt or long")
+        }
+      case "DECIMAL" =>
+        if (!dataTypeInfo.dataType.equalsIgnoreCase("decimal")) {
+          sys.error(s"Given column ${ carbonColumn.getColName } with data type" +
+                    s" ${ carbonColumn.getDataType.getName } cannot be modified." +
+                    s" Decimal can be only be changed to Decimal of higher precision")
+        }
+        if (dataTypeInfo.precision <= carbonColumn.getColumnSchema.getPrecision) {
+          sys.error(s"Given column ${ carbonColumn.getColName } cannot be modified. " +
+                    s"Specified precision value ${ dataTypeInfo.precision } should be " +
+                    s"greater than current precision value " +
+                    s"${ carbonColumn.getColumnSchema.getPrecision }")
+        } else if (dataTypeInfo.scale < carbonColumn.getColumnSchema.getScale) {
+          sys.error(s"Given column ${ carbonColumn.getColName } cannot be modified. " +
+                    s"Specified scale value ${ dataTypeInfo.scale } should be greater or " +
+                    s"equal to current scale value ${ carbonColumn.getColumnSchema.getScale }")
+        } else {
+          // difference of precision and scale specified by user should not be less than the
+          // difference of already existing precision and scale else it will result in data loss
+          val carbonColumnPrecisionScaleDiff = carbonColumn.getColumnSchema.getPrecision -
+                                               carbonColumn.getColumnSchema.getScale
+          val dataInfoPrecisionScaleDiff = dataTypeInfo.precision - dataTypeInfo.scale
+          if (dataInfoPrecisionScaleDiff < carbonColumnPrecisionScaleDiff) {
+            sys.error(s"Given column ${ carbonColumn.getColName } cannot be modified. " +
+                      s"Specified precision and scale values will lead to data loss")
+          }
+        }
+      case _ =>
+        sys.error(s"Given column ${ carbonColumn.getColName } with data type " +
+                  s"${ carbonColumn.getDataType.getName } cannot be modified. " +
+                  s"Only Int and Decimal data types are allowed for modification")
+    }
+  }
+
+  override protected def opName: String = "ALTER TABLE CHANGE DATA TYPE OR RENAME COLUMN"
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34923db0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
deleted file mode 100644
index 23a7615..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.schema
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel, DataTypeInfo, MetadataCommand}
-import org.apache.spark.sql.hive.CarbonSessionCatalog
-import org.apache.spark.util.AlterTableUtil
-
-import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.features.TableOperation
-import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
-import org.apache.carbondata.events.{AlterTableDataTypeChangePostEvent, AlterTableDataTypeChangePreEvent, OperationContext, OperationListenerBus}
-import org.apache.carbondata.format.SchemaEvolutionEntry
-import org.apache.carbondata.spark.util.DataTypeConverterUtil
-
-private[sql] case class CarbonAlterTableDataTypeChangeCommand(
-    alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel)
-  extends MetadataCommand {
-
-  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    val tableName = alterTableDataTypeChangeModel.tableName
-    val dbName = alterTableDataTypeChangeModel.databaseName
-      .getOrElse(sparkSession.catalog.currentDatabase)
-    setAuditTable(dbName, tableName)
-    setAuditInfo(Map(
-      "column" -> alterTableDataTypeChangeModel.columnName,
-      "newType" -> alterTableDataTypeChangeModel.dataTypeInfo.dataType))
-    val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
-    var locks = List.empty[ICarbonLock]
-    // get the latest carbon table and check for column existence
-    var carbonTable: CarbonTable = null
-    var timeStamp = 0L
-    try {
-      locks = AlterTableUtil
-        .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
-      val metastore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
-      carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
-      if (!carbonTable.canAllow(carbonTable, TableOperation.ALTER_CHANGE_DATATYPE,
-        alterTableDataTypeChangeModel.columnName)) {
-        throw new MalformedCarbonCommandException(
-          "alter table change datatype is not supported for index datamap")
-      }
-      val operationContext = new OperationContext
-      val alterTableDataTypeChangeListener = AlterTableDataTypeChangePreEvent(sparkSession,
-        carbonTable, alterTableDataTypeChangeModel)
-      OperationListenerBus.getInstance()
-        .fireEvent(alterTableDataTypeChangeListener, operationContext)
-      val columnName = alterTableDataTypeChangeModel.columnName
-      val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible)
-      if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) {
-        throwMetadataException(dbName, tableName, s"Column does not exist: $columnName")
-      }
-      val carbonColumn = carbonColumns.filter(_.getColName.equalsIgnoreCase(columnName))
-      if (carbonColumn.size == 1) {
-        validateColumnDataType(alterTableDataTypeChangeModel.dataTypeInfo, carbonColumn.head)
-      } else {
-        throwMetadataException(dbName, tableName, s"Invalid Column: $columnName")
-      }
-      // read the latest schema file
-      val tableInfo: org.apache.carbondata.format.TableInfo =
-        metastore.getThriftTableInfo(carbonTable)
-      // maintain the added column for schema evolution history
-      var addColumnSchema: org.apache.carbondata.format.ColumnSchema = null
-      var deletedColumnSchema: org.apache.carbondata.format.ColumnSchema = null
-      val columnSchemaList = tableInfo.fact_table.table_columns.asScala.filter(!_.isInvisible)
-      columnSchemaList.foreach { columnSchema =>
-        if (columnSchema.column_name.equalsIgnoreCase(columnName)) {
-          deletedColumnSchema = columnSchema.deepCopy
-          columnSchema.setData_type(
-            DataTypeConverterUtil.convertToThriftDataType(
-              alterTableDataTypeChangeModel.dataTypeInfo.dataType))
-          columnSchema.setPrecision(alterTableDataTypeChangeModel.dataTypeInfo.precision)
-          columnSchema.setScale(alterTableDataTypeChangeModel.dataTypeInfo.scale)
-          addColumnSchema = columnSchema
-        }
-      }
-      timeStamp = System.currentTimeMillis
-      val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
-      schemaEvolutionEntry.setAdded(List(addColumnSchema).asJava)
-      schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava)
-      tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
-        .setTime_stamp(System.currentTimeMillis)
-      val schemaConverter = new ThriftWrapperSchemaConverterImpl
-      val a = List(schemaConverter.fromExternalToWrapperColumnSchema(addColumnSchema))
-      val (tableIdentifier, schemaParts, cols) = AlterTableUtil.updateSchemaInfo(
-        carbonTable, schemaEvolutionEntry, tableInfo, Some(a))(sparkSession)
-      sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
-        .alterColumnChangeDataType(tableIdentifier, schemaParts, cols)
-      sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
-      val alterTablePostExecutionEvent: AlterTableDataTypeChangePostEvent =
-        new AlterTableDataTypeChangePostEvent(sparkSession, carbonTable,
-          alterTableDataTypeChangeModel)
-      OperationListenerBus.getInstance.fireEvent(alterTablePostExecutionEvent, operationContext)
-      LOGGER.info(s"Alter table for data type change is successful for table $dbName.$tableName")
-    } catch {
-      case e: Exception =>
-        if (carbonTable != null) {
-          AlterTableUtil.revertDataTypeChanges(dbName, tableName, timeStamp)(sparkSession)
-        }
-        throwMetadataException(dbName, tableName,
-          s"Alter table data type change operation failed: ${e.getMessage}")
-    } finally {
-      // release lock after command execution completion
-      AlterTableUtil.releaseLocks(locks)
-    }
-    Seq.empty
-  }
-
-  /**
-   * This method will validate a column for its data type and check whether the column data type
-   * can be modified and update if conditions are met.
-   */
-  private def validateColumnDataType(
-      dataTypeInfo: DataTypeInfo,
-      carbonColumn: CarbonColumn): Unit = {
-    carbonColumn.getDataType.getName match {
-      case "INT" =>
-        if (!dataTypeInfo.dataType.equals("bigint") && !dataTypeInfo.dataType.equals("long")) {
-          sys.error(s"Given column ${ carbonColumn.getColName } with data type " +
-                    s"${carbonColumn.getDataType.getName} cannot be modified. " +
-                    s"Int can only be changed to bigInt or long")
-        }
-      case "DECIMAL" =>
-        if (!dataTypeInfo.dataType.equals("decimal")) {
-          sys.error(s"Given column ${ carbonColumn.getColName } with data type" +
-                    s" ${ carbonColumn.getDataType.getName} cannot be modified." +
-                    s" Decimal can be only be changed to Decimal of higher precision")
-        }
-        if (dataTypeInfo.precision <= carbonColumn.getColumnSchema.getPrecision) {
-          sys.error(s"Given column ${carbonColumn.getColName} cannot be modified. " +
-                    s"Specified precision value ${dataTypeInfo.precision} should be " +
-                    s"greater than current precision value " +
-                    s"${carbonColumn.getColumnSchema.getPrecision}")
-        } else if (dataTypeInfo.scale < carbonColumn.getColumnSchema.getScale) {
-          sys.error(s"Given column ${carbonColumn.getColName} cannot be modified. " +
-                    s"Specified scale value ${dataTypeInfo.scale} should be greater or " +
-                    s"equal to current scale value ${carbonColumn.getColumnSchema.getScale}")
-        } else {
-          // difference of precision and scale specified by user should not be less than the
-          // difference of already existing precision and scale else it will result in data loss
-          val carbonColumnPrecisionScaleDiff = carbonColumn.getColumnSchema.getPrecision -
-                                               carbonColumn.getColumnSchema.getScale
-          val dataInfoPrecisionScaleDiff = dataTypeInfo.precision - dataTypeInfo.scale
-          if (dataInfoPrecisionScaleDiff < carbonColumnPrecisionScaleDiff) {
-            sys.error(s"Given column ${carbonColumn.getColName} cannot be modified. " +
-                      s"Specified precision and scale values will lead to data loss")
-          }
-        }
-      case _ =>
-        sys.error(s"Given column ${carbonColumn.getColName} with data type " +
-                  s"${carbonColumn.getDataType.getName} cannot be modified. " +
-                  s"Only Int and Decimal data types are allowed for modification")
-    }
-  }
-
-  override protected def opName: String = "ALTER TABLE CHANGE DATA TYPE"
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34923db0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 3c9e538..40a8fd5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -127,13 +127,15 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           throw new MalformedCarbonCommandException(
             "Operation not allowed : " + altertablemodel.alterSql)
         }
-      case dataTypeChange@CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel) =>
+      case colRenameDataTypeChange@CarbonAlterTableColRenameDataTypeChangeCommand(
+      alterTableColRenameAndDataTypeChangeModel, _) =>
         val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetaStore
-          .tableExists(TableIdentifier(alterTableChangeDataTypeModel.tableName,
-            alterTableChangeDataTypeModel.databaseName))(sparkSession)
+          .tableExists(TableIdentifier(alterTableColRenameAndDataTypeChangeModel.tableName,
+            alterTableColRenameAndDataTypeChangeModel.databaseName))(sparkSession)
         if (isCarbonTable) {
-          val carbonTable = CarbonEnv.getCarbonTable(alterTableChangeDataTypeModel.databaseName,
-            alterTableChangeDataTypeModel.tableName)(sparkSession)
+          val carbonTable = CarbonEnv
+            .getCarbonTable(alterTableColRenameAndDataTypeChangeModel.databaseName,
+              alterTableColRenameAndDataTypeChangeModel.tableName)(sparkSession)
           if (carbonTable != null && carbonTable.isFileLevelFormat) {
             throw new MalformedCarbonCommandException(
               "Unsupported alter operation on Carbon external fileformat table")
@@ -141,7 +143,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
             throw new MalformedCarbonCommandException(
               "Unsupported operation on non transactional table")
           } else {
-            ExecutedCommandExec(dataTypeChange) :: Nil
+            ExecutedCommandExec(colRenameDataTypeChange) :: Nil
           }
         } else {
           throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34923db0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
index 0849634..50bb875 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
 import org.apache.spark.sql.execution.command.AlterTableRenameCommand
 import org.apache.spark.sql.execution.command.mutation.{CarbonProjectForDeleteCommand, CarbonProjectForUpdateCommand}
-import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
+import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableColRenameDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 
@@ -54,10 +54,14 @@ private[sql] class StreamingTableStrategy(sparkSession: SparkSession) extends Sp
           new TableIdentifier(model.tableName, model.databaseName),
           "Alter table drop column")
         Nil
-      case CarbonAlterTableDataTypeChangeCommand(model) =>
+      case CarbonAlterTableColRenameDataTypeChangeCommand(model, _) =>
+        val operation = if (model.isColumnRename) {
+          "Alter table column rename"
+        } else {
+          "Alter table change datatype"
+        }
         rejectIfStreamingTable(
-          new TableIdentifier(model.tableName, model.databaseName),
-          "Alter table change datatype")
+          new TableIdentifier(model.tableName, model.databaseName), operation)
         Nil
       case AlterTableRenameCommand(oldTableIdentifier, _, _) =>
         rejectIfStreamingTable(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34923db0/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 4ce4459..f50c240 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.command.datamap.{CarbonCreateDataMapCommand, CarbonDataMapRebuildCommand, CarbonDataMapShowCommand, CarbonDropDataMapCommand}
 import org.apache.spark.sql.execution.command.management._
 import org.apache.spark.sql.execution.command.partition.{CarbonAlterTableDropPartitionCommand, CarbonAlterTableSplitPartitionCommand}
-import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
+import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableColRenameDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
 import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
 import org.apache.spark.sql.types.StructField
 import org.apache.spark.sql.CarbonExpressions.CarbonUnresolvedRelation
@@ -83,7 +83,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
     deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew
 
   protected lazy val restructure: Parser[LogicalPlan] =
-    alterTableModifyDataType | alterTableDropColumn | alterTableAddColumns
+    alterTableColumnRenameAndModifyDataType | alterTableDropColumn | alterTableAddColumns
 
   protected lazy val alterPartition: Parser[LogicalPlan] =
     alterAddPartition | alterSplitPartition | alterDropPartition
@@ -511,22 +511,26 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
     }
 
 
-  protected lazy val alterTableModifyDataType: Parser[LogicalPlan] =
+  protected lazy val alterTableColumnRenameAndModifyDataType: Parser[LogicalPlan] =
     ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ CHANGE ~ ident ~ ident ~
     ident ~ opt("(" ~> rep1sep(valueOptions, ",") <~ ")") <~ opt(";") ^^ {
       case dbName ~ table ~ change ~ columnName ~ columnNameCopy ~ dataType ~ values =>
-        // both the column names should be same
+
+        var isColumnRename = false
+        // If both the column name are not same, then its a call for column rename
         if (!columnName.equalsIgnoreCase(columnNameCopy)) {
-          throw new MalformedCarbonCommandException(
-            "Column names provided are different. Both the column names should be same")
+          isColumnRename = true
         }
-        val alterTableChangeDataTypeModel =
-          AlterTableDataTypeChangeModel(parseDataType(dataType.toLowerCase, values),
+        val alterTableColRenameAndDataTypeChangeModel =
+          AlterTableDataTypeChangeModel(parseDataType(dataType.toLowerCase,
+            values,
+            isColumnRename),
             convertDbNameToLowerCase(dbName),
             table.toLowerCase,
             columnName.toLowerCase,
-            columnNameCopy.toLowerCase)
-        CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel)
+            columnNameCopy.toLowerCase,
+            isColumnRename)
+        CarbonAlterTableColRenameDataTypeChangeCommand(alterTableColRenameAndDataTypeChangeModel)
     }
 
   protected lazy val alterTableAddColumns: Parser[LogicalPlan] =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34923db0/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 756a1ad..5fc1993 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -249,11 +249,11 @@ object AlterTableUtil {
    * @param timeStamp
    * @param sparkSession
    */
-  def revertDataTypeChanges(dbName: String, tableName: String, timeStamp: Long)
+  def revertColumnRenameAndDataTypeChanges(dbName: String, tableName: String, timeStamp: Long)
     (sparkSession: SparkSession): Unit = {
-    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
     val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
-    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)
+    val thriftTable: TableInfo = metaStore.getThriftTableInfo(carbonTable)
     val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
     val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
     if (updatedTime == timeStamp) {
@@ -269,13 +269,52 @@ object AlterTableUtil {
           }
         }
       }
-      metastore
+      metaStore
         .revertTableSchemaInAlterFailure(carbonTable.getCarbonTableIdentifier,
           thriftTable, carbonTable.getAbsoluteTableIdentifier)(sparkSession)
     }
   }
 
   /**
+   * This method modifies the table properties if column rename happened
+   * @param tableProperties tableProperties of the table
+   * @param oldColumnName old COlumnname before rename
+   * @param newColumnName new column name to rename
+   */
+  def modifyTablePropertiesAfterColumnRename(
+      tableProperties: mutable.Map[String, String],
+      oldColumnName: String,
+      newColumnName: String): Unit = {
+    tableProperties.foreach { tableProperty =>
+      if (tableProperty._2.contains(oldColumnName)) {
+        val tablePropertyKey = tableProperty._1
+        val tablePropertyValue = tableProperty._2
+        tableProperties
+          .put(tablePropertyKey, tablePropertyValue.replace(oldColumnName, newColumnName))
+      }
+    }
+  }
+
+  /**
+   * This method create a new SchemaEvolutionEntry and adds to SchemaEvolutionEntry List
+   *
+   * @param schemaEvolutionEntry List to add new SchemaEvolutionEntry
+   * @param addColumnSchema          added new column schema
+   * @param deletedColumnSchema      old column schema which is deleted
+   * @return
+   */
+  def addNewSchemaEvolutionEntry(
+      schemaEvolutionEntry: SchemaEvolutionEntry,
+      timeStamp: Long,
+      addColumnSchema: org.apache.carbondata.format.ColumnSchema,
+      deletedColumnSchema: org.apache.carbondata.format.ColumnSchema): SchemaEvolutionEntry = {
+    var schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
+    schemaEvolutionEntry.setAdded(List(addColumnSchema).asJava)
+    schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava)
+    schemaEvolutionEntry
+  }
+
+  /**
    * This method add/modify the table comments.
    *
    * @param tableIdentifier

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34923db0/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
index 4985718..8578132 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala
@@ -548,11 +548,16 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll with
          | USING 'bloomfilter'
          | DMProperties( 'INDEX_COLUMNS'='city,id', 'BLOOM_SIZE'='640000')
       """.stripMargin)
-    val exception: MalformedCarbonCommandException = intercept[MalformedCarbonCommandException] {
+    val changeDataTypeException: MalformedCarbonCommandException = intercept[MalformedCarbonCommandException] {
       sql(s"ALTER TABLE $normalTable CHANGE id id bigint")
     }
-    assert(exception.getMessage.contains(
+    assert(changeDataTypeException.getMessage.contains(
       "alter table change datatype is not supported for index datamap"))
+    val columnRenameException: MalformedCarbonCommandException = intercept[MalformedCarbonCommandException] {
+      sql(s"ALTER TABLE $normalTable CHANGE id test int")
+    }
+    assert(columnRenameException.getMessage.contains(
+      "alter table column rename is not supported for index datamap"))
   }
 
   test("test drop index columns for bloomfilter datamap") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34923db0/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
index 5096089..f57c593 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
@@ -186,10 +186,14 @@ class TestStreamingTableOpName extends QueryTest with BeforeAndAfterAll {
     val changeDataTypeException = intercept[MalformedCarbonCommandException] {
       sql("""ALTER TABLE source CHANGE c2 c2 bigint""").collect()
     }
+    val columnRenameException = intercept[MalformedCarbonCommandException] {
+      sql("""ALTER TABLE source CHANGE c2 c3 int""").collect()
+    }
     assertResult("Alter table add column is not allowed for streaming table")(addColException.getMessage)
     assertResult("Alter table drop column is not allowed for streaming table")(dropColException.getMessage)
     assertResult("Alter rename table is not allowed for streaming table")(renameException.getMessage)
     assertResult("Alter table change datatype is not allowed for streaming table")(changeDataTypeException.getMessage)
+    assertResult("Alter table column rename is not allowed for streaming table")(columnRenameException.getMessage)
   }
 
   override def  afterAll {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34923db0/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
new file mode 100644
index 0000000..710cd0d
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.carbondata.restructure.vectorreader
+
+import org.apache.spark.sql.common.util.Spark2QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
+
+class AlterTableColumnRenameTestCase extends Spark2QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll(): Unit = {
+    dropTable()
+    createTableAndLoad()
+  }
+
+  test("test only column rename operation") {
+    sql("alter table rename change empname empAddress string")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "rename")
+    assert(null != carbonTable.getColumnByName("rename", "empAddress"))
+    assert(null == carbonTable.getColumnByName("rename", "empname"))
+  }
+
+  test("test only column rename operation with datatype change also") {
+    dropTable()
+    createTable()
+    intercept[ProcessMetaDataException] {
+      sql("alter table rename change empname empAddress Bigint")
+    }
+    sql("alter table rename change deptno classNo Bigint")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "rename")
+    assert(null != carbonTable.getColumnByName("rename", "classNo"))
+    assert(null == carbonTable.getColumnByName("rename", "deptno"))
+  }
+
+  test("test trying to rename column which does not exists") {
+    dropTable()
+    createTable()
+    val ex = intercept[ProcessMetaDataException] {
+      sql("alter table rename change carbon empAddress Bigint")
+    }
+    assert(ex.getMessage.contains("Column does not exist: carbon"))
+  }
+
+  test("test rename when new column name already in schema") {
+    dropTable()
+    createTable()
+    val ex = intercept[ProcessMetaDataException] {
+      sql("alter table rename change empname workgroupcategoryname string")
+    }
+    assert(ex.getMessage.contains("New column name workgroupcategoryname already exists in table rename"))
+  }
+
+  test("column rename for different datatype"){
+    dropTable()
+    createTable()
+    sql("alter table rename change projectenddate newDate Timestamp")
+    sql("alter table rename change workgroupcategory newCategory int")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "rename")
+    assert(null != carbonTable.getColumnByName("rename", "newDate"))
+    assert(null == carbonTable.getColumnByName("rename", "projectenddate"))
+    assert(null != carbonTable.getColumnByName("rename", "newCategory"))
+    assert(null == carbonTable.getColumnByName("rename", "workgroupcategory"))
+  }
+
+  test("query count after column rename and filter results"){
+    dropTable()
+    createTableAndLoad()
+    val df1 = sql("select empname from rename").collect()
+    val df3 = sql("select workgroupcategory from rename where empname = 'bill' or empname = 'sibi'").collect()
+    sql("alter table rename change empname empAddress string")
+    val df2 = sql("select empAddress from rename").collect()
+    val df4 = sql("select workgroupcategory from rename where empAddress = 'bill' or empAddress = 'sibi'").collect()
+    intercept[Exception] {
+      sql("select empname from rename")
+    }
+    assert(df1.length == df2.length)
+    assert(df3.length == df4.length)
+  }
+
+  test("compaction after column rename and count"){
+    dropTable()
+    createTableAndLoad()
+    for(i <- 0 to 2) {
+      loadToTable()
+    }
+    val df1 = sql("select empname,deptno from rename")
+    sql("alter table rename change empname empAddress string")
+    sql("alter table rename change deptno classNo Bigint")
+    sql("alter table rename compact 'minor'")
+    val df2 = sql("select empAddress,classNo from rename")
+    assert(df1.count() == df2.count())
+  }
+
+  test("test rename after adding column and drop column") {
+    dropTable()
+    createTableAndLoad()
+    sql("alter table rename add columns(newAdded string)")
+    var carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "rename")
+    assert(null != carbonTable.getColumnByName("rename", "newAdded"))
+    sql("alter table rename change newAdded addedRename string")
+    carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "rename")
+    assert(null != carbonTable.getColumnByName("rename", "addedRename"))
+    assert(null == carbonTable.getColumnByName("rename", "newAdded"))
+    sql("alter table rename drop columns(addedRename)")
+    carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "rename")
+    assert(null == carbonTable.getColumnByName("rename", "addedRename"))
+    intercept[ProcessMetaDataException] {
+      sql("alter table rename change addedRename test string")
+    }
+  }
+
+  test("test column rename and update and insert and delete") {
+    dropTable()
+    createTableAndLoad()
+    sql("alter table rename change empname name string")
+    sql("update rename set (name) = ('joey') where workgroupcategory = 'developer'").show()
+    sql("insert into rename select 20,'bill','PM','01-12-2015',3,'manager',14,'Learning',928479,'01-01-2016','30-11-2016',75,94,13547")
+    val df1 = sql("select * from rename where name = 'joey'")
+    sql("alter table rename change name empname string")
+    val df2 = sql("select * from rename where empname = 'joey'")
+    assert(df1.count() == df2.count())
+    sql("delete from rename where empname = 'joey'")
+    val df3 = sql("select empname from rename")
+    sql("alter table rename change empname newname string")
+    intercept[Exception] {
+      sql("delete from rename where empname = 'joey'")
+    }
+    val df4 = sql("select newname from rename")
+    assert(df3.count() == df4.count())
+  }
+
+  test("test sort columns, local dictionary and other column properties in DESC formatted, check case sensitive also") {
+    dropTable()
+    sql(
+      "CREATE TABLE rename (empno int, empname String, designation String, doj Timestamp, " +
+      "workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, " +
+      "projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int," +
+      "utilization int,salary int) STORED BY 'org.apache.carbondata.format' tblproperties('dictionary_include'='empno,empname'," +
+      "'local_dictionary_include'='workgroupcategoryname','local_dictionary_exclude'='deptname','COLUMN_META_CACHE'='projectcode,attendance'," +
+      "'SORT_COLUMNS'='workgroupcategory,utilization,salary')")
+    sql("alter table rename change eMPName name string")
+    sql("alter table rename change workgroupcategoryname workgroup string")
+    sql("alter table rename change DEPtNaMe depTADDress string")
+    sql("alter table rename change attEnDance bUNk int")
+    sql("alter table rename change uTiLIZation utILIty int")
+
+    val descLoc = sql("describe formatted rename").collect
+    descLoc.find(_.get(0).toString.contains("Global Dictionary")) match {
+      case Some(row) => assert(row.get(1).toString.contains("empno,name"))
+      case None => assert(false)
+    }
+    descLoc.find(_.get(0).toString.contains("Local Dictionary Include")) match {
+      case Some(row) => assert(row.get(1).toString.contains("workgroup"))
+      case None => assert(false)
+    }
+    descLoc.find(_.get(0).toString.contains("Local Dictionary Exclude")) match {
+      case Some(row) => assert(row.get(1).toString.contains("name,designation,deptaddress"))
+      case None => assert(false)
+    }
+    descLoc.find(_.get(0).toString.contains("Sort Columns")) match {
+      case Some(row) => assert(row.get(1).toString.contains("workgroupcategory, utility, salary"))
+      case None => assert(false)
+    }
+    descLoc.find(_.get(0).toString.contains("Cached Min/Max Index Columns")) match {
+      case Some(row) => assert(row.get(1).toString.contains("projectcode, bunk"))
+      case None => assert(false)
+    }
+  }
+
+  test("test rename on partition column") {
+    sql("drop table if exists partitiontwo")
+    sql(
+      """
+        | CREATE TABLE partitiontwo (empno int, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (doj Timestamp, empname String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    val ex = intercept[ProcessMetaDataException] {
+      sql("alter table partitiontwo change empname name string")
+    }
+    ex.getMessage.contains("Renaming the partition column name is not allowed")
+  }
+
+  test("test rename column with lucene") {
+    sql("DROP TABLE IF EXISTS datamap_test")
+    sql(
+      """
+        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP dm ON TABLE datamap_test
+         | USING 'lucene'
+         | DMProperties('INDEX_COLUMNS'='Name , cIty')
+      """.stripMargin)
+    val ex = intercept[ProcessMetaDataException] {
+      sql("alter table datamap_test change Name myName string")
+    }
+    ex.getMessage.contains("alter table column rename is not supported for index datamap")
+    sql("DROP TABLE IF EXISTS datamap_test")
+  }
+
+  test("test rename column with bloom datamap") {
+    sql("DROP TABLE IF EXISTS bloomtable")
+    sql(
+      s"""
+         | CREATE TABLE bloomtable(id INT, name STRING, city STRING, age INT,
+         | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING)
+         | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128', 'sort_columns'='id')
+         |  """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP dm3 ON TABLE bloomtable
+         | USING 'bloomfilter'
+         | DMProperties('INDEX_COLUMNS'='city,id', 'BLOOM_SIZE'='640000')
+      """.stripMargin)
+    val ex = intercept[ProcessMetaDataException] {
+      sql("alter table bloomtable change city nation string")
+    }
+    ex.getMessage.contains("alter table column rename is not supported for index datamap")
+    sql("drop table if exists bloomtable")
+  }
+
+  test("test rename column on table where preagg exists") {
+    sql("DROP TABLE IF EXISTS maintable")
+    sql(
+      """
+        | CREATE TABLE maintable(id int, name string, city string, age int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(
+      s"""create datamap preagg_avg on table maintable using 'preaggregate' as select id,avg(age) from maintable group by id"""
+        .stripMargin)
+    intercept[Exception] {
+      sql("alter table maintable change id ids int")
+    }
+    sql("DROP TABLE IF EXISTS maintable")
+  }
+
+  test("test rename on complex column") {
+    sql("drop table if exists complex")
+    sql(
+      "create table complex (id int, name string, structField struct<intval:int, stringval:string>) stored by 'carbondata'")
+    val ex = intercept[ProcessMetaDataException] {
+      sql("alter table complex change structField complexTest struct")
+    }
+    assert(ex.getMessage.contains("Rename column is unsupported for complex datatype column structfield"))
+  }
+
+  test("test SET command with column rename") {
+    dropTable()
+     createTable()
+    sql("alter table rename change workgroupcategoryname testset string")
+    val ex = intercept[Exception] {
+      sql("alter table rename set tblproperties('column_meta_cache'='workgroupcategoryname')")
+    }
+    assert(ex.getMessage.contains("Column workgroupcategoryname does not exists in the table rename"))
+    sql("alter table rename set tblproperties('column_meta_cache'='testset')")
+    val descLoc = sql("describe formatted rename").collect
+    descLoc.find(_.get(0).toString.contains("Cached Min/Max Index Columns")) match {
+      case Some(row) => assert(row.get(1).toString.contains("testset"))
+      case None => assert(false)
+    }
+  }
+
+  test("test column rename with change datatype for decimal datatype") {
+    sql("drop table if exists deciTable")
+    sql("create table decitable(name string, age int, avg decimal(30,10)) stored by 'carbondata'")
+    sql("alter table decitable change avg newAvg decimal(32,11)")
+    val descLoc = sql("describe formatted decitable").collect
+    descLoc.find(_.get(0).toString.contains("newavg")) match {
+      case Some(row) => assert(row.get(1).toString.contains("decimal(32,11)"))
+      case None => assert(false)
+    }
+    sql("drop table if exists decitable")
+  }
+
+  test("test column rename of bigint column") {
+    sql("drop table if exists biginttable")
+    sql("create table biginttable(name string, age int, bigintfield bigint) stored by 'carbondata'")
+    sql("alter table biginttable change bigintfield testfield bigint")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "biginttable")
+    assert(null != carbonTable.getColumnByName("biginttable", "testfield"))
+    assert(null == carbonTable.getColumnByName("biginttable", "bigintfield"))
+    sql("drop table if exists biginttable")
+  }
+
+  override def afterAll(): Unit = {
+    dropTable()
+  }
+
+  def dropTable(): Unit = {
+    sql("DROP TABLE IF EXISTS RENAME")
+  }
+
+  def createTableAndLoad(): Unit = {
+    sql(
+      "CREATE TABLE rename (empno int, empname String, designation String, doj Timestamp, " +
+      "workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, " +
+      "projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int," +
+      "utilization int,salary int) STORED BY 'org.apache.carbondata.format'")
+    sql(
+      s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE rename OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
+  }
+
+  def loadToTable():Unit = {
+    sql(
+      s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE rename OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
+  }
+
+  def createTable(): Unit = {
+    sql(
+      "CREATE TABLE rename (empno int, empname String, designation String, doj Timestamp, " +
+      "workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, " +
+      "projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int," +
+      "utilization int,salary int) STORED BY 'org.apache.carbondata.format'")
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/34923db0/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/ChangeDataTypeTestCases.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/ChangeDataTypeTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/ChangeDataTypeTestCases.scala
index 4bf7de0..dcf7e6d 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/ChangeDataTypeTestCases.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/ChangeDataTypeTestCases.scala
@@ -161,7 +161,7 @@ class ChangeDataTypeTestCases extends Spark2QueryTest with BeforeAndAfterAll {
     }.getMessage.contains("exists in a pre-aggregate table"))
     assert(intercept[ProcessMetaDataException] {
       sql("alter table preaggmain_preagg1 change a a long").show
-    }.getMessage.contains("Cannot change data type for columns in pre-aggregate table"))
+    }.getMessage.contains("Cannot change data type or rename column for columns in pre-aggregate table"))
     sql("drop table if exists preaggMain")
     sql("drop table if exists PreAggMain_preagg1")
   }