You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by in...@apache.org on 2021/06/10 07:56:37 UTC

[carbondata] branch master updated: [CARBONDATA-4179] Support renaming of complex columns (array/struct)

This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new cfa02dd  [CARBONDATA-4179] Support renaming of complex columns (array/struct)
cfa02dd is described below

commit cfa02dd3db2906750aeef2ebc657a1c4f58b2d66
Author: akkio-97 <ak...@gmail.com>
AuthorDate: Tue May 4 11:30:23 2021 +0530

    [CARBONDATA-4179] Support renaming of complex columns (array/struct)
    
    Why is this PR needed?
    This PR enables renaming of complex columns - parent as well as children columns with nested levels
    example: if the schema contains columns - str1 struct<a:int, b:string>, arr1 array<long>
    1. alter table <table_name> change str1 str2 struct<a:int, b:string>
    2. alter table <table_name> change arr1 arr2 array<long>
    3. Changing parent name as well as child name
    4. alter table <table_name> change str1 str2 struct<abc:int, b:string>
    NOTE- Rename operation fails if the structure of the complex column has been altered.
    This check ensures the old and new columns are compatible with each other. Meaning
    the number of children and complex levels should be unaltered while attempting to rename.
    
    What changes were proposed in this PR?
    1. Parses the incoming new complex type. Create a nested DatatypeInfo structure.
    2. This DatatypeInfo is then passed on to the AlterTableDataTypeChangeModel.
    3. Validation for compatibility, duplicate columns happens here.
    4. Add the parent column to the schema evolution entry.
    5. Update the spark catalog table.
    Limitation - Renaming is not supported for Map types yet
    
    Does this PR introduce any user interface change?
    Yes
    
    Is any new testcase added?
    Yes
    
    This closes #4129
---
 .../core/constants/CarbonCommonConstants.java      |   3 +
 .../core/scan/executor/util/RestructureUtil.java   |  59 +++-
 docs/ddl-of-carbondata.md                          |  29 +-
 .../TestSIWithComplexArrayType.scala               |  20 +-
 .../spark/sql/catalyst/CarbonParserUtil.scala      |  64 ++++-
 .../command/carbonTableSchemaCommon.scala          |  11 +-
 ...nAlterTableColRenameDataTypeChangeCommand.scala | 214 +++++++++------
 .../spark/sql/execution/strategy/DDLHelper.scala   |  10 +-
 .../spark/sql/hive/SqlAstBuilderHelper.scala       |   2 +-
 .../sql/parser/CarbonSparkSqlParserUtil.scala      |   2 +-
 .../org/apache/spark/util/AlterTableUtil.scala     | 127 ++++++++-
 .../AlterTableColumnRenameTestCase.scala           | 299 ++++++++++++++++++++-
 12 files changed, 693 insertions(+), 147 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index a187f19..597cfcc 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1933,6 +1933,9 @@ public final class CarbonCommonConstants {
   public static final String MAP = "map";
   public static final String DECIMAL = "decimal";
   public static final String FROM = "from";
+  public static final String BIGINT = "bigint";
+  public static final String LONG = "long";
+  public static final String INT = "int";
 
   /**
    * TABLE UPDATE STATUS FILENAME
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
index 7fabe17..0710920 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
@@ -21,7 +21,9 @@ import java.math.BigDecimal;
 import java.math.RoundingMode;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
@@ -50,6 +52,10 @@ import org.apache.commons.lang3.ArrayUtils;
  * Utility class for restructuring
  */
 public class RestructureUtil {
+  // if table column is of complex type- this look up stores the column id of the parent
+  // (as well as children) [tableColumn_id -> tableColumn_name]. This helps to determine the
+  // existence of incoming query column by matching based on id.
+  private static Map<String, String> existingTableColumnIDMap;
 
   /**
    * Below method will be used to get the updated query dimension update
@@ -159,11 +165,21 @@ public class RestructureUtil {
     return presentDimension;
   }
 
+  public static void fillExistingTableColumnIDMap(CarbonDimension tableColumn) {
+    existingTableColumnIDMap.put(tableColumn.getColumnId(), tableColumn.getColName());
+    List<CarbonDimension> children = tableColumn.getListOfChildDimensions();
+    if (children == null) return;
+    for (CarbonDimension dimension : children) {
+      fillExistingTableColumnIDMap(dimension);
+    }
+  }
+
   /**
    * Match the columns for transactional and non transactional tables
    * @param isTransactionalTable
-   * @param queryColumn
-   * @param tableColumn
+   * @param queryColumn - column entity that is present in the fired query or in the query model.
+   * @param tableColumn - column entity that is present in the table block or in the segment
+   *                      properties.
    * @return
    */
   public static boolean isColumnMatches(boolean isTransactionalTable,
@@ -177,6 +193,12 @@ public class RestructureUtil {
           .isColumnMatchBasedOnId(queryColumn)) {
         return true;
       } else {
+        if (tableColumn instanceof CarbonDimension) {
+          // insert list of table column id into a lookUp set, which will later be used to match
+          // against query column id
+          existingTableColumnIDMap = new HashMap<>();
+          fillExistingTableColumnIDMap((CarbonDimension) tableColumn);
+        }
         return isColumnMatchesStruct(tableColumn, queryColumn);
       }
     } else {
@@ -191,26 +213,39 @@ public class RestructureUtil {
   }
 
   /**
-   * In case of Multilevel Complex column - Struct/StructOfStruct, traverse all the child dimension
-   * to check column Id
+   * In case of Multilevel Complex column - Struct/StructOfStruct, traverse all the child dimensions
+   * of tableColumn to check if any of its column Id has matched with that of queryColumn .
    *
-   * @param tableColumn
-   * @param queryColumn
+   * @param tableColumn - column entity that is present in the table block or in the segment
+   *                      properties.
+   * @param queryColumn - column entity that is present in the fired query or in the query model.
+   * tableColumn name and queryColumn name may or may not be the same in case schema has evolved.
+   * Hence matching happens based on the column ID
    * @return
    */
   private static boolean isColumnMatchesStruct(CarbonColumn tableColumn, CarbonColumn queryColumn) {
     if (tableColumn instanceof CarbonDimension) {
-      List<CarbonDimension> parentDimension =
+      List<CarbonDimension> childrenDimensions =
           ((CarbonDimension) tableColumn).getListOfChildDimensions();
-      CarbonDimension carbonDimension = null;
+      CarbonDimension carbonDimension;
       String[] colSplits = queryColumn.getColName().split("\\.");
       StringBuffer tempColName = new StringBuffer(colSplits[0]);
       for (String colSplit : colSplits) {
         if (!tempColName.toString().equalsIgnoreCase(colSplit)) {
-          tempColName = tempColName.append(".").append(colSplit);
+          tempColName = tempColName.append(CarbonCommonConstants.POINT).append(colSplit);
         }
-        carbonDimension = CarbonTable.getCarbonDimension(tempColName.toString(), parentDimension);
-        if (carbonDimension != null) {
+        carbonDimension =
+            CarbonTable.getCarbonDimension(tempColName.toString(), childrenDimensions);
+        if (carbonDimension == null) {
+          // Avoid returning true in case of SDK as the column name contains the id.
+          if (existingTableColumnIDMap != null && existingTableColumnIDMap
+              .containsKey(queryColumn.getColumnId())) {
+            String columnName = existingTableColumnIDMap.get(queryColumn.getColumnId());
+            if (columnName != null && !columnName.contains(queryColumn.getColumnId())) {
+              return true;
+            }
+          }
+        } else {
           // In case of SDK the columnId and columnName is same and this check will ensure for
           // all the child columns that the table column name is equal to query column name and
           // table columnId is equal to table columnName
@@ -222,7 +257,7 @@ public class RestructureUtil {
             return true;
           }
           if (carbonDimension.getListOfChildDimensions() != null) {
-            parentDimension = carbonDimension.getListOfChildDimensions();
+            childrenDimensions = carbonDimension.getListOfChildDimensions();
           }
         }
       }
diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index 50ddf51..996f7c8 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -52,8 +52,8 @@ CarbonData DDL statements are documented here,which includes:
     * [RENAME TABLE](#rename-table)
     * [ADD COLUMNS](#add-columns)
     * [DROP COLUMNS](#drop-columns)
-    * [RENAME COLUMN](#change-column-nametype)
-    * [CHANGE COLUMN NAME/TYPE/COMMENT](#change-column-nametype)
+    * [RENAME COLUMN](#change-column-name-and-type-and-comment)
+    * [CHANGE COLUMN NAME/TYPE/COMMENT](#change-column-name-and-type-and-comment)
     * [MERGE INDEXES](#merge-index)
     * [SET/UNSET](#set-and-unset)
   * [DROP TABLE](#drop-table)
@@ -805,7 +805,7 @@ Users can specify which columns to include and exclude for local dictionary gene
      2. If a column to be dropped has any Secondary index table created on them, drop column operation fails and the user will 
      be asked to drop the corresponding SI table first before going for actual drop.
 
-   - #### CHANGE COLUMN NAME/TYPE/COMMENT
+   - #### CHANGE COLUMN NAME AND TYPE AND COMMENT
    
      This command is used to change column name and comment and the data type from INT to BIGINT or decimal precision from lower to higher.
      Change of decimal data type from lower precision to higher precision will only be supported for cases where there is no data loss.
@@ -819,33 +819,47 @@ Users can specify which columns to include and exclude for local dictionary gene
      - Invalid scenarios 
        * Change of decimal precision from (10,2) to (10,5) is invalid as in this case only scale is increased but total number of digits remains the same.
        * Change the comment of the partition column
+       * Rename operation fails if the structure of the complex column has been altered. Please ensure the old and new columns are compatible with 
+         each other. Meaning the number of children and complex levels should be unaltered while attempting to rename.
      - Valid scenarios
        * Change of decimal precision from (10,2) to (12,3) is valid as the total number of digits are increased by 2 but scale is increased only by 1 which will not lead to any data loss.
        * Change the comment of columns other than partition column
      - **NOTE:** The allowed range is 38,38 (precision, scale) and is a valid upper case scenario which is not resulting in data loss.
 
-     Example1:Change column a1's name to a2 and its data type from INT to BIGINT.
+     Example 1: Change column a1's name to a2 and its data type from INT to BIGINT.
 
      ```
      ALTER TABLE test_db.carbon CHANGE a1 a2 BIGINT
      ```
      
-     Example2:Changing decimal precision of column a1 from 10 to 18.
+     Example 2: Changing decimal precision of column a1 from 10 to 18.
 
      ```
      ALTER TABLE test_db.carbon CHANGE a1 a1 DECIMAL(18,2)
      ```
 
-     Example3:Change column a3's name to a4.
+     Example 3: Change column a3's name to a4.
 
      ```
      ALTER TABLE test_db.carbon CHANGE a3 a4 STRING
      ```
-     Example3:Change column a3's comment to "col_comment".
+     Example 4: Change column a3's comment to "col_comment".
      
      ```
      ALTER TABLE test_db.carbon CHANGE a3 a3 STRING COMMENT 'col_comment'
      ```
+     
+     Example 5: Change child column name in column: structField struct\<age:int> from age to id.
+                   
+     ```
+     ALTER TABLE test_db.carbon CHANGE structField structField struct<id:int>
+     ```
+     
+     Example 6: Change column name in column: oldArray array\<int> from oldArray to newArray.
+          
+     ```
+     ALTER TABLE test_db.carbon CHANGE oldArray newArray array<int>
+     ```
 
      **NOTE:** Once the column is renamed, user has to take care about replacing the fileheader with the new name or changing the column header in csv file.
    
@@ -866,6 +880,7 @@ Users can specify which columns to include and exclude for local dictionary gene
      **NOTE:**
      * Merge index is supported on streaming table from carbondata 2.0.1 version.
      But streaming segments (ROW_V1) cannot create merge index.
+     * Rename column name is not supported for MAP type.
 
 
    - #### SET and UNSET
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
index ef8bf3d..3fe1443 100644
--- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
@@ -47,13 +47,14 @@ class TestSIWithComplexArrayType extends QueryTest with BeforeAndAfterEach {
 
   test("Test restructured array<string> and existing string column as index columns on SI with compaction") {
     sql("drop table if exists complextable")
-    sql("create table complextable (id string, country array<string>, name string) stored as carbondata")
+    sql("create table complextable (id string, country array<string>, columnName string) stored as carbondata")
     sql("insert into complextable select 1,array('china', 'us'), 'b'")
     sql("insert into complextable select 2,array('pak'), 'v'")
 
     sql("drop index if exists index_11 on complextable")
-    sql(
-      "ALTER TABLE complextable ADD COLUMNS(arr2 array<string>)")
+    sql("ALTER TABLE complextable ADD COLUMNS(newArray array<string>)")
+    sql("alter table complextable change newArray arr2 array<string>")
+    sql("alter table complextable change columnName name string")
     sql("insert into complextable select 3,array('china'), 'f',array('hello','world')")
     sql("insert into complextable select 4,array('India'),'g',array('iron','man','jarvis')")
 
@@ -93,9 +94,10 @@ class TestSIWithComplexArrayType extends QueryTest with BeforeAndAfterEach {
     sql("insert into complextable select 2,array('pak'), 'v'")
 
     sql("drop index if exists index_11 on complextable")
-    sql(
-      "ALTER TABLE complextable ADD COLUMNS(arr2 array<string>)")
-    sql("ALTER TABLE complextable ADD COLUMNS(addr string)")
+    sql("ALTER TABLE complextable ADD COLUMNS(newArray array<string>)")
+    sql("alter table complextable change newArray arr2 array<string>")
+    sql("ALTER TABLE complextable ADD COLUMNS(address string)")
+    sql("alter table complextable change address addr string")
     sql("insert into complextable select 3,array('china'), 'f',array('hello','world'),'china'")
     sql("insert into complextable select 4,array('India'),'g',array('iron','man','jarvis'),'India'")
 
@@ -129,11 +131,12 @@ class TestSIWithComplexArrayType extends QueryTest with BeforeAndAfterEach {
   }
 
   test("test array<string> on secondary index with compaction") {
-    sql("create table complextable (id string, country array<string>, name string) stored as carbondata")
+    sql("create table complextable (id string, columnCountry array<string>, name string) stored as carbondata")
     sql("insert into complextable select 1,array('china', 'us'), 'b'")
     sql("insert into complextable select 2,array('pak'), 'v'")
     sql("insert into complextable select 3,array('china'), 'f'")
     sql("insert into complextable select 4,array('india'),'g'")
+    sql("alter table complextable change columnCountry country array<string>")
     val result1 = sql(" select * from complextable where array_contains(country,'china')")
     val result2 = sql(" select * from complextable where country[0]='china'")
     sql("drop index if exists index_1 on complextable")
@@ -162,11 +165,12 @@ class TestSIWithComplexArrayType extends QueryTest with BeforeAndAfterEach {
   }
 
   test("test array<string> and string as index columns on secondary index with compaction") {
-    sql("create table complextable (id string, country array<string>, name string) stored as carbondata")
+    sql("create table complextable (id string, columnCountry array<string>, name string) stored as carbondata")
     sql("insert into complextable select 1, array('china', 'us'), 'b'")
     sql("insert into complextable select 2, array('pak'), 'v'")
     sql("insert into complextable select 3, array('china'), 'f'")
     sql("insert into complextable select 4, array('india'),'g'")
+    sql("alter table complextable change columnCountry country array<string>")
     val result = sql(" select * from complextable where array_contains(country,'china') and name='f'")
     sql("drop index if exists index_1 on complextable")
     sql("create index index_1 on table complextable(country, name) as 'carbondata'")
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
index f41f2bf..f857355 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.lib.Node
 import org.apache.hadoop.hive.ql.parse._
 import org.apache.spark.sql.catalyst.trees.CurrentOrigin
 import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.common.constants.LoggerAction
@@ -1092,15 +1093,15 @@ object CarbonParserUtil {
   private def appendParentForEachChild(field: Field, parentName: String): Field = {
     field.dataType.getOrElse("NIL") match {
       case "Array" | "Struct" | "Map" =>
-        val newChildren = field.children
-          .map(_.map(appendParentForEachChild(_, parentName + "." + field.column)))
-        field.copy(column = parentName + "." + field.column,
-          name = Some(parentName + "." + field.name.getOrElse(None)),
+        val newChildren = field.children.map(_.map(appendParentForEachChild(_,
+          parentName + CarbonCommonConstants.POINT + field.column)))
+        field.copy(column = parentName + CarbonCommonConstants.POINT + field.column,
+          name = Some(parentName + CarbonCommonConstants.POINT + field.name.getOrElse(None)),
           children = newChildren,
           parent = parentName)
       case _ =>
-        field.copy(column = parentName + "." + field.column,
-          name = Some(parentName + "." + field.name.getOrElse(None)),
+        field.copy(column = parentName + CarbonCommonConstants.POINT + field.column,
+          name = Some(parentName + CarbonCommonConstants.POINT + field.name.getOrElse(None)),
           parent = parentName)
     }
   }
@@ -1113,6 +1114,7 @@ object CarbonParserUtil {
    * @return DataTypeInfo object with datatype, precision and scale
    */
   def parseDataType(
+      columnName: String,
       dataType: String,
       values: Option[List[(Int, Int)]]): DataTypeInfo = {
     var precision: Int = 0
@@ -1122,7 +1124,8 @@ object CarbonParserUtil {
         if (values.isDefined) {
           throw new MalformedCarbonCommandException("Invalid data type")
         }
-        DataTypeInfo(DataTypeConverterUtil.convertToCarbonType(dataType).getName.toLowerCase)
+        DataTypeInfo(columnName,
+          DataTypeConverterUtil.convertToCarbonType(dataType).getName.toLowerCase)
       case "decimal" =>
         if (values.isDefined) {
           precision = values.get(0)._1
@@ -1136,10 +1139,53 @@ object CarbonParserUtil {
         } else if (scale < 0 || scale > 38) {
           throw new MalformedCarbonCommandException("Invalid value for scale")
         }
-        DataTypeInfo("decimal", precision, scale)
+        DataTypeInfo(columnName, "decimal", precision, scale)
+      case _ =>
+        DataTypeInfo(columnName,
+          DataTypeConverterUtil.convertToCarbonType(dataType).getName.toLowerCase)
+    }
+  }
+
+  /**
+   * This method will return the instantiated DataTypeInfo by parsing the column
+   */
+  def parseColumn(columnName: String, dataType: DataType,
+      values: Option[List[(Int, Int)]]): DataTypeInfo = {
+    // creates parent dataTypeInfo first
+    val dataTypeName = DataTypeConverterUtil.convertToCarbonType(dataType.typeName).getName
+    val dataTypeInfo = CarbonParserUtil.parseDataType(columnName, dataTypeName.toLowerCase, values)
+    // check which child type is present and create children dataTypeInfo accordingly
+    dataType match {
+      case arrayType: ArrayType =>
+        val childType: DataType = arrayType.elementType
+        val childName = columnName + ".val"
+        val childValues = childType match {
+          case d: DecimalType => Some(List((d.precision, d.scale)))
+          case _ => None
+        }
+        val childDatatypeInfo = parseColumn(childName, childType, childValues)
+        dataTypeInfo.setChildren(List(childDatatypeInfo))
+      case structType: StructType =>
+        var childTypeInfoList: List[DataTypeInfo] = null
+        for (childField <- structType) {
+          val childType = childField.dataType
+          val childName = columnName + CarbonCommonConstants.POINT + childField.name
+          val childValues = childType match {
+            case d: DecimalType => Some(List((d.precision, d.scale)))
+            case _ => None
+          }
+          val childDatatypeInfo = CarbonParserUtil.parseColumn(childName, childType, childValues)
+          if (childTypeInfoList == null) {
+            childTypeInfoList = List(childDatatypeInfo)
+          } else {
+            childTypeInfoList = childTypeInfoList :+ childDatatypeInfo
+          }
+        }
+        dataTypeInfo.setChildren(childTypeInfoList)
       case _ =>
-        DataTypeInfo(DataTypeConverterUtil.convertToCarbonType(dataType).getName.toLowerCase)
     }
+    // TODO have to handle for map types [CARBONDATA-4199]
+    dataTypeInfo
   }
 
   def checkFieldDefaultValue(fieldName: String, defaultValueColumnName: String): Boolean = {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index f09f893..6c6030d 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -23,7 +23,6 @@ import java.util.UUID
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
-import org.apache.commons.lang3.StringUtils
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -160,7 +159,15 @@ case class DropPartitionCallableModel(carbonLoadModel: CarbonLoadModel,
     carbonTable: CarbonTable,
     sqlContext: SQLContext)
 
-case class DataTypeInfo(dataType: String, precision: Int = 0, scale: Int = 0)
+case class DataTypeInfo(columnName: String, dataType: String, precision: Int = 0, scale: Int = 0) {
+  private var children: Option[List[DataTypeInfo]] = None
+  def setChildren(childrenList: List[DataTypeInfo]): Unit = {
+    children = Some(childrenList)
+  }
+  def getChildren(): List[DataTypeInfo] = {
+    children.get
+  }
+}
 
 class AlterTableColumnRenameModel(columnName: String,
     newColumnName: String,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
index b4e73bc..c381c45 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
@@ -34,32 +34,27 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.datatype.DecimalType
+import org.apache.carbondata.core.metadata.datatype.{DataTypes, DecimalType}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
+import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
 import org.apache.carbondata.events.{AlterTableColRenameAndDataTypeChangePostEvent, AlterTableColRenameAndDataTypeChangePreEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo}
 import org.apache.carbondata.spark.util.DataTypeConverterUtil
 
 abstract class CarbonAlterTableColumnRenameCommand(oldColumnName: String, newColumnName: String)
   extends MetadataCommand {
+  import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 
-  protected def validColumnsForRenaming(carbonColumns: mutable.Buffer[CarbonColumn],
-      oldCarbonColumn: CarbonColumn,
+  protected def validColumnsForRenaming(columnSchemaList: mutable.Buffer[ColumnSchema],
+      alteredColumnNamesMap: mutable.LinkedHashMap[String, String],
       carbonTable: CarbonTable): Unit = {
     // check whether new column name is already an existing column name
-    if (carbonColumns.exists(_.getColName.equalsIgnoreCase(newColumnName))) {
+    alteredColumnNamesMap.foreach(keyVal => if (columnSchemaList.exists(_.getColumnName
+      .equalsIgnoreCase(keyVal._2))) {
       throw new MalformedCarbonCommandException(s"Column Rename Operation failed. New " +
-                                                s"column name $newColumnName already exists" +
+                                                s"column name ${ keyVal._2 } already exists" +
                                                 s" in table ${ carbonTable.getTableName }")
-    }
-
-    // if the column rename is for complex column, block the operation
-    if (oldCarbonColumn.isComplex) {
-      throw new MalformedCarbonCommandException(s"Column Rename Operation failed. Rename " +
-                                                s"column is unsupported for complex datatype " +
-                                                s"column ${ oldCarbonColumn.getColName }")
-    }
+    })
 
     // if column rename operation is on bucket column, then fail the rename operation
     if (null != carbonTable.getBucketingInfo) {
@@ -69,8 +64,8 @@ abstract class CarbonAlterTableColumnRenameCommand(oldColumnName: String, newCol
           if (col.getColumnName.equalsIgnoreCase(oldColumnName)) {
             throw new MalformedCarbonCommandException(
               s"Column Rename Operation failed. Renaming " +
-                s"the bucket column $oldColumnName is not " +
-                s"allowed")
+              s"the bucket column $oldColumnName is not " +
+              s"allowed")
           }
       }
     }
@@ -83,6 +78,9 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
     childTableColumnRename: Boolean = false)
   extends CarbonAlterTableColumnRenameCommand(alterTableColRenameAndDataTypeChangeModel.columnName,
     alterTableColRenameAndDataTypeChangeModel.newColumnName) {
+  // stores mapping of altered column names: old-column-name -> new-column-name.
+  // Including both parent/table and children columns
+  val alteredColumnNamesMap = collection.mutable.LinkedHashMap.empty[String, String]
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -130,6 +128,9 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
       val newColumnName = alterTableColRenameAndDataTypeChangeModel.newColumnName.toLowerCase
       val oldColumnName = alterTableColRenameAndDataTypeChangeModel.columnName.toLowerCase
       val isColumnRename = alterTableColRenameAndDataTypeChangeModel.isColumnRename
+      if (isColumnRename) {
+        alteredColumnNamesMap += (oldColumnName -> newColumnName)
+      }
       val newColumnComment = alterTableColRenameAndDataTypeChangeModel.newColumnComment
       val carbonColumns = carbonTable.getCreateOrderColumn().asScala.filter(!_.isInvisible)
       if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(oldColumnName))) {
@@ -143,27 +144,47 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
       val newColumnPrecision = alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.precision
       val newColumnScale = alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.scale
       // set isDataTypeChange flag
-      if (oldCarbonColumn.head.getDataType.getName
-        .equalsIgnoreCase(alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.dataType)) {
+      val oldDatatype = oldCarbonColumn.head.getDataType
+      val newDatatype = alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.dataType
+      if (isColumnRename && (DataTypes.isMapType(oldDatatype) ||
+                             newDatatype.equalsIgnoreCase(CarbonCommonConstants.MAP))) {
+        throw new UnsupportedOperationException(
+          "Alter rename is unsupported for Map datatype column")
+      }
+      if (oldDatatype.getName.equalsIgnoreCase(newDatatype)) {
         val newColumnPrecision =
           alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.precision
         val newColumnScale = alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.scale
         // if the source datatype is decimal and there is change in precision and scale, then
         // along with rename, datatype change is also required for the command, so set the
         // isDataTypeChange flag to true in this case
-        if (oldCarbonColumn.head.getDataType.getName.equalsIgnoreCase("decimal") &&
-            (oldCarbonColumn.head.getDataType.asInstanceOf[DecimalType].getPrecision !=
+        if (DataTypes.isDecimal(oldDatatype) &&
+            (oldDatatype.asInstanceOf[DecimalType].getPrecision !=
              newColumnPrecision ||
-             oldCarbonColumn.head.getDataType.asInstanceOf[DecimalType].getScale !=
+             oldDatatype.asInstanceOf[DecimalType].getScale !=
              newColumnScale)) {
           isDataTypeChange = true
         }
+        if (DataTypes.isArrayType(oldDatatype) || DataTypes.isStructType(oldDatatype)) {
+          val oldParent = oldCarbonColumn.head
+          val oldChildren = oldParent.asInstanceOf[CarbonDimension].getListOfChildDimensions.asScala
+            .toList
+          AlterTableUtil.validateComplexStructure(oldChildren,
+            alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.getChildren(),
+            alteredColumnNamesMap)
+        }
       } else {
+        if (oldDatatype.isComplexType) {
+          throw new UnsupportedOperationException(
+            "Old and new complex columns are not compatible in structure")
+        }
         isDataTypeChange = true
       }
+
       // If there is no columnrename and datatype change and comment change
       // return directly without execution
-      if (!isColumnRename && !isDataTypeChange && !newColumnComment.isDefined) {
+      if (!isColumnRename && !isDataTypeChange && !newColumnComment.isDefined &&
+          alteredColumnNamesMap.isEmpty) {
         return Seq.empty
       }
       // if column datatype change operation is on partition column, then fail the
@@ -178,35 +199,54 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
             }
         }
       }
-      if (alterTableColRenameAndDataTypeChangeModel.isColumnRename) {
+
+      if (!alteredColumnNamesMap.isEmpty) {
         // validate the columns to be renamed
-        validColumnsForRenaming(carbonColumns, oldCarbonColumn.head, carbonTable)
+        validColumnsForRenaming(carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala,
+          alteredColumnNamesMap, carbonTable)
       }
+
       if (isDataTypeChange) {
         // validate the columns to change datatype
-        validateColumnDataType(alterTableColRenameAndDataTypeChangeModel.dataTypeInfo,
+        AlterTableUtil.validateColumnDataType(alterTableColRenameAndDataTypeChangeModel
+          .dataTypeInfo,
           oldCarbonColumn.head)
       }
       // read the latest schema file
       val tableInfo: TableInfo =
         metaStore.getThriftTableInfo(carbonTable)
       // maintain the added column for schema evolution history
-      var addColumnSchema: ColumnSchema = null
+      var addedTableColumnSchema: ColumnSchema = null
       var deletedColumnSchema: ColumnSchema = null
       var schemaEvolutionEntry: SchemaEvolutionEntry = null
       val columnSchemaList = tableInfo.fact_table.table_columns.asScala.filter(!_.isInvisible)
 
+      var addedColumnsList: List[ColumnSchema] = List.empty[ColumnSchema]
+      var deletedColumnsList: List[ColumnSchema] = List.empty[ColumnSchema]
+
+      /*
+      * columnSchemaList is a flat structure containing all column schemas including both parent
+      * and child.
+      * It is iterated and rename/change-datatype update are made in this list itself.
+      * Entry is made to the schemaEvolutionEntry for each of the update.
+      */
       columnSchemaList.foreach { columnSchema =>
-        if (columnSchema.column_name.equalsIgnoreCase(oldColumnName)) {
-          deletedColumnSchema = columnSchema.deepCopy()
-          if (alterTableColRenameAndDataTypeChangeModel.isColumnRename) {
-            // if only column rename, just get the column schema and rename, make a
-            // schemaEvolutionEntry
+        val columnName = columnSchema.column_name
+        val isTableColumnAltered = columnName.equalsIgnoreCase(oldColumnName)
+        var isSchemaEntryRequired = false
+        deletedColumnSchema = columnSchema.deepCopy()
+
+        if (isTableColumnAltered) {
+          // isColumnRename will be true if the table-column/parent-column name has been altered,
+          // just get the columnSchema and rename, and make a schemaEvolutionEntry
+          if (isColumnRename) {
             columnSchema.setColumn_name(newColumnName)
+            isSchemaEntryRequired = true
           }
-          // if the column rename is false,it will be just datatype change only, then change the
-          // datatype and make an evolution entry, If both the operations are happening, then rename
-          // change datatype and make an evolution entry
+
+          // if the table column rename is false, it will be just table column datatype change
+          // only, then change the datatype and make an evolution entry, If both the operations
+          // are happening, then rename, change datatype and make an evolution entry
           if (isDataTypeChange) {
             // if only datatype change,  just get the column schema and change datatype, make a
             // schemaEvolutionEntry
@@ -216,7 +256,9 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
             columnSchema
               .setPrecision(newColumnPrecision)
             columnSchema.setScale(newColumnScale)
+            isSchemaEntryRequired = true
           }
+
           if (newColumnComment.isDefined && columnSchema.getColumnProperties != null) {
             columnSchema.getColumnProperties.put(
               CarbonCommonConstants.COLUMN_COMMENT, newColumnComment.get)
@@ -225,11 +267,37 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
             newColumnProperties.put(CarbonCommonConstants.COLUMN_COMMENT, newColumnComment.get)
             columnSchema.setColumnProperties(newColumnProperties)
           }
-          addColumnSchema = columnSchema
-          timeStamp = System.currentTimeMillis()
-          // make a new schema evolution entry after column rename or datatype change
-          schemaEvolutionEntry = AlterTableUtil
-            .addNewSchemaEvolutionEntry(timeStamp, addColumnSchema, deletedColumnSchema)
+          addedTableColumnSchema = columnSchema
+        } else if (isComplexChild(columnSchema)) {
+          if (alteredColumnNamesMap.contains(columnName)) {
+            // matches exactly
+            val newComplexChildName = alteredColumnNamesMap(columnName)
+            columnSchema.setColumn_name(newComplexChildName)
+            isSchemaEntryRequired = true
+          } else {
+            val alteredParent = checkIfParentIsAltered(columnName)
+            /*
+             * Lets say, if complex schema is: str struct<a: int>
+             * and if parent column is changed from str -> str2
+             * then its child name should also be changed from str.a -> str2.a
+             */
+            if (alteredParent != null) {
+              val newParent = alteredColumnNamesMap(alteredParent)
+              val newComplexChildName = newParent + columnName
+                .split(alteredParent)(1)
+              columnSchema.setColumn_name(newComplexChildName)
+              isSchemaEntryRequired = true
+            }
+          }
+        }
+
+        // make a new schema evolution entry after column rename or datatype change
+        if (isSchemaEntryRequired) {
+          addedColumnsList ++= List(columnSchema)
+          deletedColumnsList ++= List(deletedColumnSchema)
+          schemaEvolutionEntry = AlterTableUtil.addNewSchemaEvolutionEntry(schemaEvolutionEntry,
+            addedColumnsList,
+            deletedColumnsList)
         }
       }
 
@@ -242,7 +310,7 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
       updateSchemaAndRefreshTable(sparkSession,
         carbonTable,
         tableInfo,
-        addColumnSchema,
+        addedTableColumnSchema,
         schemaEvolutionEntry,
         oldCarbonColumn.head)
       val alterTableColRenameAndDataTypeChangePostEvent
@@ -279,6 +347,24 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
     Seq.empty
   }
 
+  private def isComplexChild(columnSchema: ColumnSchema): Boolean = {
+    columnSchema.column_name.contains(CarbonCommonConstants.POINT)
+  }
+
+  private def isChildOfTheGivenColumn(columnSchemaName: String, oldColumnName: String): Boolean = {
+    columnSchemaName.startsWith(oldColumnName + CarbonCommonConstants.POINT)
+  }
+
+  private def checkIfParentIsAltered(columnSchemaName: String): String = {
+    var parent: String = null
+    alteredColumnNamesMap.foreach(keyVal => {
+      if (isChildOfTheGivenColumn(columnSchemaName, keyVal._1)) {
+        parent = keyVal._1
+      }
+    })
+    parent
+  }
+
   /**
    * This method update the schema info and refresh the table
    *
@@ -323,53 +409,5 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
     sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
   }
 
-  /**
-   * This method will validate a column for its data type and check whether the column data type
-   * can be modified and update if conditions are met.
-   */
-  private def validateColumnDataType(
-      dataTypeInfo: DataTypeInfo,
-      carbonColumn: CarbonColumn): Unit = {
-    carbonColumn.getDataType.getName match {
-      case "INT" =>
-        if (!dataTypeInfo.dataType.equalsIgnoreCase("bigint") &&
-            !dataTypeInfo.dataType.equalsIgnoreCase("long")) {
-          sys.error(s"Given column ${ carbonColumn.getColName } with data type " +
-                    s"${ carbonColumn.getDataType.getName } cannot be modified. " +
-                    s"Int can only be changed to bigInt or long")
-        }
-      case "DECIMAL" =>
-        if (!dataTypeInfo.dataType.equalsIgnoreCase("decimal")) {
-          sys.error(s"Given column ${ carbonColumn.getColName } with data type" +
-                    s" ${ carbonColumn.getDataType.getName } cannot be modified." +
-                    s" Decimal can be only be changed to Decimal of higher precision")
-        }
-        if (dataTypeInfo.precision <= carbonColumn.getColumnSchema.getPrecision) {
-          sys.error(s"Given column ${ carbonColumn.getColName } cannot be modified. " +
-                    s"Specified precision value ${ dataTypeInfo.precision } should be " +
-                    s"greater than current precision value " +
-                    s"${ carbonColumn.getColumnSchema.getPrecision }")
-        } else if (dataTypeInfo.scale < carbonColumn.getColumnSchema.getScale) {
-          sys.error(s"Given column ${ carbonColumn.getColName } cannot be modified. " +
-                    s"Specified scale value ${ dataTypeInfo.scale } should be greater or " +
-                    s"equal to current scale value ${ carbonColumn.getColumnSchema.getScale }")
-        } else {
-          // difference of precision and scale specified by user should not be less than the
-          // difference of already existing precision and scale else it will result in data loss
-          val carbonColumnPrecisionScaleDiff = carbonColumn.getColumnSchema.getPrecision -
-                                               carbonColumn.getColumnSchema.getScale
-          val dataInfoPrecisionScaleDiff = dataTypeInfo.precision - dataTypeInfo.scale
-          if (dataInfoPrecisionScaleDiff < carbonColumnPrecisionScaleDiff) {
-            sys.error(s"Given column ${ carbonColumn.getColName } cannot be modified. " +
-                      s"Specified precision and scale values will lead to data loss")
-          }
-        }
-      case _ =>
-        sys.error(s"Given column ${ carbonColumn.getColName } with data type " +
-                  s"${ carbonColumn.getDataType.getName } cannot be modified. " +
-                  s"Only Int and Decimal data types are allowed for modification")
-    }
-  }
-
   override protected def opName: String = "ALTER TABLE CHANGE DATA TYPE OR RENAME COLUMN"
 }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLHelper.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLHelper.scala
index 03f4330..85d0bff 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLHelper.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLHelper.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.command.table._
 import org.apache.spark.sql.execution.datasources.{LogicalRelation, RefreshResource, RefreshTable}
 import org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand
 import org.apache.spark.sql.parser.{CarbonSpark2SqlParser, CarbonSparkSqlParserUtil}
-import org.apache.spark.sql.types.{DecimalType, Metadata}
+import org.apache.spark.sql.types.{DecimalType}
 import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.util.{CarbonReflectionUtils, FileUtils}
 
@@ -40,7 +40,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-import org.apache.carbondata.spark.util.DataTypeConverterUtil
 import org.apache.carbondata.view.MVManagerInSpark
 
 object DDLHelper {
@@ -225,12 +224,7 @@ object DDLHelper {
         case d: DecimalType => Some(List((d.precision, d.scale)))
         case _ => None
       }
-      val dataTypeInfo = CarbonParserUtil.parseDataType(
-        DataTypeConverterUtil
-          .convertToCarbonType(newColumn.dataType.typeName)
-          .getName
-          .toLowerCase,
-        values)
+      val dataTypeInfo = CarbonParserUtil.parseColumn(newColumn.name, newColumn.dataType, values)
       var newColumnComment: Option[String] = Option.empty
       if (newColumnMetaData != null &&
         newColumnMetaData.contains(CarbonCommonConstants.COLUMN_COMMENT)) {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
index be949b4..c5842bb 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/SqlAstBuilderHelper.scala
@@ -43,7 +43,7 @@ trait SqlAstBuilderHelper extends SparkSqlAstBuilder {
 
     val alterTableColRenameAndDataTypeChangeModel =
       AlterTableDataTypeChangeModel(
-        CarbonParserUtil.parseDataType(typeString, values),
+        CarbonParserUtil.parseDataType(newColumn.name, typeString, values),
         CarbonParserUtil.convertDbNameToLowerCase(Option(ctx.tableIdentifier().db).map(_.getText)),
         ctx.tableIdentifier().table.getText.toLowerCase,
         ctx.identifier.getText.toLowerCase,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
index 54749c9..b678707 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
@@ -667,7 +667,7 @@ object CarbonSparkSqlParserUtil {
     val isColumnRename = !columnName.equalsIgnoreCase(columnNameCopy)
     val alterTableColRenameAndDataTypeChangeModel =
       AlterTableDataTypeChangeModel(
-        CarbonParserUtil.parseDataType(dataType.toLowerCase,
+        CarbonParserUtil.parseDataType(columnName, dataType.toLowerCase,
           values),
         CarbonParserUtil.convertDbNameToLowerCase(dbName),
         table.toLowerCase,
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index e3ec06e..3250f7a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -27,6 +27,7 @@ import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.apache.spark.sql.catalyst.{CarbonParserUtil, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.execution.command.DataTypeInfo
 import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalogUtil}
 import org.apache.spark.sql.index.CarbonIndexUtil
 
@@ -44,7 +45,7 @@ import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverte
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.index.IndexType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension, ColumnSchema}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil}
@@ -357,18 +358,23 @@ object AlterTableUtil {
   /**
    * This method create a new SchemaEvolutionEntry and adds to SchemaEvolutionEntry List
    *
-   * @param addColumnSchema          added new column schema
-   * @param deletedColumnSchema      old column schema which is deleted
+   * @param addedColumnsList    list of added column schemas
+   * @param deletedColumnsList  list of deleted column schemas
    * @return
    */
   def addNewSchemaEvolutionEntry(
-      timeStamp: Long,
-      addColumnSchema: org.apache.carbondata.format.ColumnSchema,
-      deletedColumnSchema: org.apache.carbondata.format.ColumnSchema): SchemaEvolutionEntry = {
-    val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
-    schemaEvolutionEntry.setAdded(List(addColumnSchema).asJava)
-    schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava)
-    schemaEvolutionEntry
+      schemaEvolutionEntry: SchemaEvolutionEntry,
+      addedColumnsList: List[org.apache.carbondata.format.ColumnSchema],
+      deletedColumnsList: List[org.apache.carbondata.format.ColumnSchema]): SchemaEvolutionEntry = {
+    val timeStamp = System.currentTimeMillis()
+    val newSchemaEvolutionEntry = if (schemaEvolutionEntry == null) {
+      new SchemaEvolutionEntry(timeStamp)
+    } else {
+      schemaEvolutionEntry
+    }
+    newSchemaEvolutionEntry.setAdded(addedColumnsList.asJava)
+    newSchemaEvolutionEntry.setRemoved(deletedColumnsList.asJava)
+    newSchemaEvolutionEntry
   }
 
   def readLatestTableSchema(carbonTable: CarbonTable)(sparkSession: SparkSession): TableInfo = {
@@ -1071,4 +1077,105 @@ object AlterTableUtil {
       }
     }
   }
+
+  /**
+   * This method checks the structure of the old and new complex columns, and-
+   * 1. throws exception if the number of complex-levels in both columns does not match
+   * 2. throws exception if the number of children of both columns does not match
+   * 3. creates alteredColumnNamesMap: new_column_name -> datatype. Here new_column_name are those
+   *    names of the columns that are altered.
+   * These maps will later be used while altering the table schema
+   */
+  def validateComplexStructure(oldDimensionList: List[CarbonDimension],
+      newDimensionList: List[DataTypeInfo],
+      alteredColumnNamesMap: mutable.LinkedHashMap[String, String]): Unit = {
+    if (oldDimensionList == null && newDimensionList == null) {
+      throw new UnsupportedOperationException("Both old and new dimensions are null")
+    } else if (oldDimensionList == null || newDimensionList == null) {
+      throw new UnsupportedOperationException("Either the old or the new dimension is null")
+    } else if (oldDimensionList.size != newDimensionList.size) {
+      throw new UnsupportedOperationException(
+        "Number of children of old and new complex columns are not the same")
+    } else {
+      for ((newDimensionInfo, i) <- newDimensionList.zipWithIndex) {
+        val oldDimensionInfo = oldDimensionList(i)
+        val old_column_name = oldDimensionInfo
+          .getColName.split(CarbonCommonConstants.POINT.toCharArray).last
+        val old_column_datatype = oldDimensionInfo.getDataType.getName
+        val new_column_name = newDimensionInfo
+          .columnName.split(CarbonCommonConstants.POINT.toCharArray).last
+        val new_column_datatype = newDimensionInfo.dataType
+        if (!old_column_datatype.equalsIgnoreCase(new_column_datatype)) {
+          // datatypes of complex children cannot be altered. So throwing exception for now.
+          throw new UnsupportedOperationException(
+            "Altering datatypes of any child column is not supported")
+        }
+        if (!old_column_name.equalsIgnoreCase(new_column_name)) {
+          alteredColumnNamesMap += (oldDimensionInfo.getColName -> newDimensionInfo.columnName)
+        }
+        if (old_column_datatype.equalsIgnoreCase(CarbonCommonConstants.MAP) ||
+            new_column_datatype.equalsIgnoreCase(CarbonCommonConstants.MAP)) {
+          throw new UnsupportedOperationException(
+            "Cannot alter complex structure that includes map type column")
+        } else if (new_column_datatype.equalsIgnoreCase(CarbonCommonConstants.ARRAY) ||
+                   old_column_datatype.equalsIgnoreCase(CarbonCommonConstants.ARRAY) ||
+                   new_column_datatype.equalsIgnoreCase(CarbonCommonConstants.STRUCT) ||
+                   old_column_datatype.equalsIgnoreCase(CarbonCommonConstants.STRUCT)) {
+          validateComplexStructure(oldDimensionInfo.getListOfChildDimensions.asScala.toList,
+            newDimensionInfo.getChildren(), alteredColumnNamesMap)
+        }
+      }
+    }
+  }
+
+  /**
+   * This method will validate a column for its data type and check whether the column data type
+   * can be modified and update if conditions are met.
+   */
+  def validateColumnDataType(
+      dataTypeInfo: DataTypeInfo,
+      carbonColumn: CarbonColumn): Unit = {
+    carbonColumn.getDataType.getName.toLowerCase() match {
+      case CarbonCommonConstants.INT =>
+        if (!dataTypeInfo.dataType.equalsIgnoreCase(CarbonCommonConstants.BIGINT) &&
+            !dataTypeInfo.dataType.equalsIgnoreCase(CarbonCommonConstants.LONG)) {
+          sys.error(s"Given column ${ carbonColumn.getColName } with data type " +
+                    s"${ carbonColumn.getDataType.getName } cannot be modified. " +
+                    s"Int can only be changed to bigInt or long")
+        }
+      case CarbonCommonConstants.DECIMAL =>
+        if (!dataTypeInfo.dataType.equalsIgnoreCase(CarbonCommonConstants.DECIMAL)) {
+          sys.error(s"Given column ${ carbonColumn.getColName } with data type" +
+                    s" ${ carbonColumn.getDataType.getName } cannot be modified." +
+                    s" Decimal can be only be changed to Decimal of higher precision")
+        }
+        if (dataTypeInfo.precision <= carbonColumn.getColumnSchema.getPrecision) {
+          sys.error(s"Given column ${ carbonColumn.getColName } cannot be modified. " +
+                    s"Specified precision value ${ dataTypeInfo.precision } should be " +
+                    s"greater than current precision value " +
+                    s"${ carbonColumn.getColumnSchema.getPrecision }")
+        } else if (dataTypeInfo.scale < carbonColumn.getColumnSchema.getScale) {
+          sys.error(s"Given column ${ carbonColumn.getColName } cannot be modified. " +
+                    s"Specified scale value ${ dataTypeInfo.scale } should be greater or " +
+                    s"equal to current scale value ${ carbonColumn.getColumnSchema.getScale }")
+        } else {
+          // difference of precision and scale specified by user should not be less than the
+          // difference of already existing precision and scale else it will result in data loss
+          val carbonColumnPrecisionScaleDiff = carbonColumn.getColumnSchema.getPrecision -
+                                               carbonColumn.getColumnSchema.getScale
+          val dataInfoPrecisionScaleDiff = dataTypeInfo.precision - dataTypeInfo.scale
+          if (dataInfoPrecisionScaleDiff < carbonColumnPrecisionScaleDiff) {
+            sys.error(s"Given column ${ carbonColumn.getColName } cannot be modified. " +
+                      s"Specified precision and scale values will lead to data loss")
+          }
+        }
+      case _ =>
+        if (!carbonColumn.getDataType.getName.equalsIgnoreCase(dataTypeInfo.dataType)) {
+          sys.error(s"Given column ${ carbonColumn.getColName } with data type " +
+                    s"${ carbonColumn.getDataType.getName } cannot be modified. " +
+                    s"Only Int and Decimal data types are allowed for modification")
+        }
+    }
+  }
+
 }
diff --git a/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala b/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
index ffc09f0..ea81c74 100644
--- a/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
@@ -17,11 +17,15 @@
 
 package org.apache.spark.carbondata.restructure.vectorreader
 
-import org.apache.spark.sql.AnalysisException
+import scala.collection.JavaConverters
+import scala.collection.mutable.WrappedArray.make
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Row}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.spark.exception.ProcessMetaDataException
 
 class AlterTableColumnRenameTestCase extends QueryTest with BeforeAndAfterAll {
@@ -48,6 +52,299 @@ class AlterTableColumnRenameTestCase extends QueryTest with BeforeAndAfterAll {
     sql("drop table simple_table")
   }
 
+  test("Rename more than one column at a time in one operation") {
+    sql("drop table if exists test_rename")
+    sql("CREATE TABLE test_rename (str struct<a:struct<b:int, d:int>, c:int>) STORED AS carbondata")
+    sql("insert into test_rename values(named_struct('a',named_struct('b',12,'d',12), 'c', 12))")
+    sql("alter table test_rename change str str22 struct<a11:struct<b2:int, d:int>, c:int>")
+    sql("insert into test_rename values(named_struct('a11',named_struct('b2',24,'d',24), 'c', 24))")
+
+    val rows = sql("select str22.a11.b2 from test_rename").collect()
+    assert(rows(0).equals(Row(12)) && rows(1).equals(Row(24)))
+    // check if old column names are still present
+    val ex1 = intercept[AnalysisException] {
+      sql("select str from test_rename").show(false)
+    }
+    assert(ex1.getMessage.contains("cannot resolve '`str`'"))
+
+    val ex2 = intercept[AnalysisException] {
+      sql("select str.a from test_rename").show(false)
+    }
+    assert(ex2.getMessage.contains("cannot resolve '`str.a`'"))
+
+    // check un-altered columns
+    val rows1 = sql("select str22.c from test_rename").collect()
+    val rows2 = sql("select str22.a11.d from test_rename").collect()
+    assert(rows1.sameElements(Array(Row(12), Row(24))))
+    assert(rows2.sameElements(Array(Row(12), Row(24))))
+  }
+
+  test("rename complex columns with invalid structure/duplicate-names/Map-type") {
+    sql("drop table if exists test_rename")
+    sql(
+      "CREATE TABLE test_rename (str struct<a:int,b:long>, str2 struct<a:int,b:long>, map1 " +
+      "map<string, string>, str3 struct<a:int, b:map<string, string>>) STORED AS carbondata")
+
+    val ex1 = intercept[ProcessMetaDataException] {
+      sql("alter table test_rename change str str struct<a:array<int>,b:long>")
+    }
+    assert(ex1.getMessage
+      .contains(
+        "column rename operation failed: Altering datatypes of any child column is not supported"))
+
+    val ex2 = intercept[ProcessMetaDataException] {
+      sql("alter table test_rename change str str struct<a:int,b:long,c:int>")
+    }
+    assert(ex2.getMessage
+      .contains(
+        "column rename operation failed: Number of children of old and new complex columns are " +
+        "not the same"))
+
+    val ex3 = intercept[ProcessMetaDataException] {
+      sql("alter table test_rename change str str int")
+    }
+    assert(ex3.getMessage
+      .contains(
+        "column rename operation failed: Old and new complex columns are not compatible " +
+        "in structure"))
+
+    val ex4 = intercept[ProcessMetaDataException] {
+      sql("alter table test_rename change str str struct<a:int,a:long>")
+    }
+    assert(ex4.getMessage
+      .contains(
+        "Column Rename Operation failed. New column name str.a already exists in table " +
+        "test_rename"))
+
+    val ex5 = intercept[ProcessMetaDataException] {
+      sql("alter table test_rename change str str2 struct<a:int,b:long>")
+    }
+    assert(ex5.getMessage
+      .contains(
+        "Column Rename Operation failed. New column name str2 already exists in table test_rename"))
+
+    val ex6 = intercept[ProcessMetaDataException] {
+      sql("alter table test_rename change map1 map2 map<string, struct<a:int>>")
+    }
+    assert(ex6.getMessage
+      .contains("rename operation failed: Alter rename is unsupported for Map datatype column"))
+
+    val ex7 = intercept[ProcessMetaDataException] {
+      sql("alter table test_rename change str3 str33 struct<a:int, bc:map<string, string>>")
+    }
+    assert(ex7.getMessage
+      .contains(
+        "rename operation failed: Cannot alter complex structure that includes map type column"))
+
+    val ex8 = intercept[ProcessMetaDataException] {
+      sql("alter table test_rename change str2 str22 struct<>")
+    }
+    assert(ex8.getMessage
+      .contains(
+        "rename operation failed: Either the old or the new dimension is null"))
+
+    // ensure all failed rename operations have been reverted to original state
+    val describe = sql("desc table test_rename")
+    assert(describe.collect().size == 4)
+    assertResult(1)(describe.filter(
+      "col_name='str' and data_type = 'struct<a:int,b:bigint>'").count())
+    assertResult(1)(describe.filter(
+      "col_name='str2' and data_type = 'struct<a:int,b:bigint>'").count())
+    assertResult(1)(describe.filter(
+      "col_name='map1' and data_type = 'map<string,string>'").count())
+    assertResult(1)(describe.filter(
+      "col_name='str3' and data_type = 'struct<a:int,b:map<string,string>>'").count())
+  }
+
+  def checkAnswerUtil1(df1: DataFrame, df2: DataFrame, df3: DataFrame) {
+    checkAnswer(df1, Seq(Row(Row(Row(2)))))
+    checkAnswer(df2, Seq(Row(Row(2))))
+    checkAnswer(df3, Seq(Row(2)))
+  }
+
+  def checkAnswerUtil2(df1: DataFrame, df2: DataFrame, df3: DataFrame) {
+    checkAnswer(df1, Seq(Row(Row(Row(2))), Row(Row(Row(3)))))
+    checkAnswer(df2, Seq(Row(Row(2)), Row(Row(3))))
+    checkAnswer(df3, Seq(Row(2), Row(3)))
+  }
+
+  test("test alter rename struct of (primitive/struct/array)") {
+    sql("drop table if exists test_rename")
+    sql("CREATE TABLE test_rename (str1 struct<a:int>, str2 struct<a:struct<b:int>>, str3 " +
+        "struct<a:struct<b:struct<c:int>>>, intfield int) STORED AS carbondata")
+    sql("insert into test_rename values(named_struct('a', 2), " +
+        "named_struct('a', named_struct('b', 2)), named_struct('a', named_struct('b', " +
+        "named_struct('c', 2))), 1)")
+
+    // Operation 1: rename parent column from str2 to str22 and read old rows
+    sql("alter table test_rename change str2 str22 struct<a:struct<b:int>>")
+    var df1 = sql("select str22 from test_rename")
+    var df2 = sql("select str22.a from test_rename")
+    var df3 = sql("select str22.a.b from test_rename")
+    assert(df1.collect().size == 1 && df2.collect().size == 1 && df3.collect().size == 1)
+    checkAnswerUtil1(df1, df2, df3)
+
+    // Operation 2: rename child column from a to a11
+    sql("alter table test_rename change str22 str22 struct<a11:struct<b:int>>")
+    df1 = sql("select str22 from test_rename")
+    df2 = sql("select str22.a11 from test_rename")
+    df3 = sql("select str22.a11.b from test_rename")
+    assert(df1.collect().size == 1 && df2.collect().size == 1 && df3.collect().size == 1)
+    checkAnswerUtil1(df1, df2, df3)
+
+    // Operation 3: rename parent column from str22 to str33
+    sql("alter table test_rename change str22 str33 struct<a11:struct<b:int>>")
+    df1 = sql("select str33 from test_rename")
+    df2 = sql("select str33.a11 from test_rename")
+    df3 = sql("select str33.a11.b from test_rename")
+    assert(df1.collect().size == 1 && df2.collect().size == 1 && df3.collect().size == 1)
+    checkAnswerUtil1(df1, df2, df3)
+
+    // insert new rows
+    sql("insert into test_rename values(named_struct('a', 3), " +
+        "named_struct('a', named_struct('b', 3)), named_struct('a', named_struct('b', " +
+        "named_struct('c', 3))), 2)")
+    df1 = sql("select str33 from test_rename")
+    df2 = sql("select str33.a11 from test_rename")
+    df3 = sql("select str33.a11.b from test_rename")
+    assert(df1.collect().size == 2 && df2.collect().size == 2 && df3.collect().size == 2)
+    checkAnswerUtil2(df1, df2, df3)
+
+    // Operation 4: rename child column from a11 to a22 & b to b11
+    sql("alter table test_rename change str33 str33 struct<a22:struct<b11:int>>")
+    df1 = sql("select str33 from test_rename")
+    df2 = sql("select str33.a22 from test_rename")
+    df3 = sql("select str33.a22.b11 from test_rename")
+    assert(df1.collect().size == 2 && df2.collect().size == 2 && df3.collect().size == 2)
+    checkAnswerUtil2(df1, df2, df3)
+
+    // Operation 5: rename primitive column from intField to intField2
+    sql("alter table test_rename change intField intField2 int")
+
+    val describe = sql("desc table test_rename")
+    assert(describe.collect().size == 4)
+    assertResult(1)(describe.filter(
+      "col_name='str1' and data_type = 'struct<a:int>'").count())
+    assertResult(1)(describe.filter(
+      "col_name='str33' and data_type = 'struct<a22:struct<b11:int>>'").count())
+    assertResult(1)(describe.filter(
+      "col_name='str3' and data_type = 'struct<a:struct<b:struct<c:int>>>'").count())
+
+    // validate schema evolution entries for 4 above alter operations
+    val (addedColumns, removedColumns, noOfEvolutions) = returnValuesAfterSchemaEvolution(
+      "test_rename")
+    validateSchemaEvolution(addedColumns, removedColumns, noOfEvolutions)
+  }
+
+  def returnValuesAfterSchemaEvolution(tableName: String): (Seq[ColumnSchema], Seq[ColumnSchema],
+    Int) = {
+    val carbonTable = CarbonEnv.getCarbonTable(None, tableName)(sqlContext.sparkSession)
+    val schemaEvolutionList = carbonTable.getTableInfo
+      .getFactTable
+      .getSchemaEvolution()
+      .getSchemaEvolutionEntryList()
+    var addedColumns = Seq[ColumnSchema]()
+    var removedColumns = Seq[ColumnSchema]()
+    for (i <- 0 until schemaEvolutionList.size()) {
+      addedColumns ++=
+      JavaConverters
+        .asScalaIteratorConverter(schemaEvolutionList.get(i).getAdded.iterator())
+        .asScala
+        .toSeq
+
+      removedColumns ++=
+      JavaConverters
+        .asScalaIteratorConverter(schemaEvolutionList.get(i).getRemoved.iterator())
+        .asScala
+        .toSeq
+    }
+    (addedColumns, removedColumns, schemaEvolutionList.size() - 1)
+  }
+
+  def validateSchemaEvolution(added: Seq[ColumnSchema], removed: Seq[ColumnSchema],
+      noOfEvolutions: Int): Unit = {
+    assert(noOfEvolutions == 5 && added.size == 11 && removed.size == 11)
+    // asserting only first 6 entries of added and removed columns
+    assert(
+      added(0).getColumnName.equals("str22") && removed(0).getColumnName.equals("str2") &&
+      added(1).getColumnName.equals("str22.a") && removed(1).getColumnName.equals("str2.a") &&
+      added(2).getColumnName.equals("str22.a.b") && removed(2).getColumnName.equals("str2.a.b") &&
+      added(3).getColumnName.equals("str22.a11") && removed(3).getColumnName.equals("str22.a") &&
+      added(4).getColumnName.equals("str22.a11.b") && removed(4).getColumnName.equals("str22.a.b")&&
+      added(5).getColumnName.equals("str33") && removed(5).getColumnName.equals("str22"))
+  }
+
+  test("test alter rename array of (primitive/array/struct)") {
+    sql("drop table if exists test_rename")
+    sql(
+      "CREATE TABLE test_rename (arr1 array<int>, arr2 array<array<int>>, arr3 array<string>, " +
+      "arr4 array<struct<a:int>>) STORED AS carbondata")
+    sql(
+      "insert into test_rename values (array(1,2,3), array(array(1,2),array(3,4)), array('hello'," +
+      "'world'), array(named_struct('a',45)))")
+
+    sql("alter table test_rename change arr1 arr11 array<int>")
+    val df1 = sql("select arr11 from test_rename")
+    assert(df1.collect.size == 1)
+    checkAnswer(df1, Seq(Row(make(Array(1, 2, 3)))))
+
+    sql("alter table test_rename change arr2 arr22 array<array<int>>")
+    val df2 = sql("select arr22 from test_rename")
+    assert(df2.collect.size == 1)
+    checkAnswer(df2, Seq(Row(make(Array(make(Array(1, 2)), make(Array(3, 4)))))))
+
+    sql("alter table test_rename change arr3 arr33 array<string>")
+    val df3 = sql("select arr33 from test_rename")
+    assert(df3.collect.size == 1)
+    checkAnswer(sql("select arr33 from test_rename"), Seq(Row(make(Array("hello", "world")))))
+
+    sql("alter table test_rename change arr4 arr44 array<struct<a:int>>")
+    sql("alter table test_rename change arr44 arr44 array<struct<a11:int>>")
+
+    val df4 = sql("select arr44.a11 from test_rename")
+    assert(df4.collect.size == 1)
+    checkAnswer(df4, Seq(Row(make(Array(45)))))
+
+    // test for new inserted row
+    sql(
+      "insert into test_rename values (array(11,22,33), array(array(11,22),array(33,44)), array" +
+      "('hello11', 'world11'), array(named_struct('a',4555)))")
+    val rows = sql("select arr11, arr22, arr33, arr44.a11 from test_rename").collect
+    assert(rows.size == 2)
+    val secondRow = rows(1)
+    assert(secondRow(0).equals(make(Array(11, 22, 33))) &&
+           secondRow(1).equals(make(Array(make(Array(11, 22)), make(Array(33, 44))))) &&
+           secondRow(2).equals(make(Array("hello11", "world11"))))
+  }
+
+  test("validate alter change datatype for complex children columns") {
+    sql("drop table if exists test_rename")
+    sql(
+      "CREATE TABLE test_rename (str struct<a:int,b:long>) STORED AS carbondata")
+
+    val ex1 = intercept[ProcessMetaDataException] {
+      sql("alter table test_rename change str str struct<a:long,b:long>")
+    }
+    assert(ex1.getMessage
+      .contains(
+        "column rename operation failed: Altering datatypes of any child column is not supported"))
+  }
+
+  test("test change comment in case of complex types") {
+    sql("drop table if exists test_rename")
+    sql(
+      "CREATE TABLE test_rename (str struct<a:int> comment 'comment') STORED AS carbondata")
+    sql("alter table test_rename change str str struct<a:int> comment 'new comment'")
+    var describe = sql("desc table test_rename")
+    var count = describe.filter("col_name='str' and comment = 'new comment'").count()
+    assertResult(1)(count)
+
+    sql("alter table test_rename change str str struct<a1:int> comment 'new comment 2'")
+    describe = sql("desc table test_rename")
+    count = describe.filter("col_name='str' and comment = 'new comment 2'").count()
+    assertResult(1)(count)
+  }
+
   test("test only column rename operation with datatype change also") {
     dropTable()
     createTable()