You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by in...@apache.org on 2021/08/26 09:16:05 UTC

[carbondata] branch master updated: [CARBONDATA-4164][CARBONDATA-4198][CARBONDATA-4199][CARBONDATA-4234] Support alter add map, multilevel complex columns and rename/change datatype.

This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new f52aa20  [CARBONDATA-4164][CARBONDATA-4198][CARBONDATA-4199][CARBONDATA-4234] Support alter add map, multilevel complex columns and rename/change datatype.
f52aa20 is described below

commit f52aa20a82b3e9766d34511ac4296b50c5a3ea9e
Author: ShreelekhyaG <sh...@yahoo.com>
AuthorDate: Wed Jul 14 22:01:14 2021 +0530

    [CARBONDATA-4164][CARBONDATA-4198][CARBONDATA-4199][CARBONDATA-4234] Support alter add map, multilevel complex columns and rename/change datatype.
    
    Why is this PR needed?
    Support alter add map, multilevel complex columns, and Change datatype for complex type.
    
    What changes were proposed in this PR?
    1. Support adding of single-level and multi-level map columns
    2. Support adding of multi-level complex columns(array/struct)
    3. Support renaming of map columns including nested levels
    4. Alter change datatype at nested levels (array/map/struct)
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4180
---
 .../impl/RestructureBasedRawResultCollector.java   |  35 +++--
 .../core/scan/complextypes/PrimitiveQueryType.java |   7 +-
 .../core/scan/executor/util/RestructureUtil.java   |  49 ++++++-
 .../apache/carbondata/core/util/DataTypeUtil.java  |   9 +-
 docs/ddl-of-carbondata.md                          |  37 +++++-
 .../TestSIWithComplexArrayType.scala               |  73 ++++++++++-
 .../query/SecondaryIndexQueryResultProcessor.java  |  15 ++-
 .../spark/sql/catalyst/CarbonParserUtil.scala      |  63 +++++----
 .../command/carbonTableSchemaCommon.scala          |  71 +++++-----
 ...nAlterTableColRenameDataTypeChangeCommand.scala |  65 +++++----
 .../org/apache/spark/util/AlterTableUtil.scala     |  44 ++++---
 .../alterTable/TestAlterTableAddColumns.scala      |  95 +++++++++-----
 .../AlterTableColumnRenameTestCase.scala           | 146 +++++++++++++++++----
 .../vectorreader/ChangeDataTypeTestCases.scala     |  52 ++++++++
 14 files changed, 567 insertions(+), 194 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
index 80fb3c3..af0ddb9 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/RestructureBasedRawResultCollector.java
@@ -166,21 +166,11 @@ public class RestructureBasedRawResultCollector extends RawBasedResultCollector
                   DataTypeUtil.getDataTypeConverter().convertFromByteToUTF8Bytes(
                       CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
             } else if (currDataType.isComplexType()) {
-              // Iterate over child dimensions and add its default value.
-              List<CarbonDimension> children =
-                  actualQueryDimensions[i].getDimension().getListOfChildDimensions();
               try (ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
                   DataOutputStream dataOutputStream = new DataOutputStream(byteStream)) {
-                if (DataTypes.isArrayType(currDataType)) {
-                  dataOutputStream.writeInt(1);
-                } else if (DataTypes.isStructType(currDataType)) {
-                  dataOutputStream.writeShort(children.size());
-                }
-                for (int j = 0; j < children.size(); j++) {
-                  // update default null values based on datatype
-                  CarbonUtil.updateNullValueBasedOnDatatype(dataOutputStream,
-                      children.get(j).getDataType());
-                }
+                // Iterate over child dimensions and add its default value.
+                addDefaultValueForComplexTypes(actualQueryDimensions[i].getDimension(),
+                    currDataType, dataOutputStream);
                 newColumnDefaultValue = byteStream.toByteArray();
               } catch (IOException e) {
                 LOGGER.error(e.getMessage(), e);
@@ -200,4 +190,23 @@ public class RestructureBasedRawResultCollector extends RawBasedResultCollector
       byteArrayWrapper.setComplexTypesKeys(complexTypeKeyArrayWithNewlyAddedColumns);
     }
   }
+
+  private void addDefaultValueForComplexTypes(CarbonDimension dimension, DataType currDataType,
+      DataOutputStream dataOutputStream) throws IOException {
+    List<CarbonDimension> children = dimension.getListOfChildDimensions();
+    if (DataTypes.isArrayType(currDataType) || DataTypes.isMapType(currDataType)) {
+      dataOutputStream.writeInt(1);
+    } else if (DataTypes.isStructType(currDataType)) {
+      dataOutputStream.writeShort(children.size());
+    }
+    for (int j = 0; j < children.size(); j++) {
+      if (children.get(j).isComplex()) {
+        addDefaultValueForComplexTypes(children.get(j), children.get(j).getDataType(),
+            dataOutputStream);
+      } else {
+        // update default null values based on datatype
+        CarbonUtil.updateNullValueBasedOnDatatype(dataOutputStream, children.get(j).getDataType());
+      }
+    }
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
index 2328c75..674611d 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
@@ -76,7 +76,6 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
   @Override
   public void setParentName(String parentName) {
     this.parentName = parentName;
-
   }
 
   @Override
@@ -135,11 +134,15 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
       size = dataBuffer.array().length;
     } else if (child.getDataType() == DataTypes.TIMESTAMP) {
       size = DataTypes.LONG.getSizeInBytes();
+    } else if (dataBuffer.remaining() == DataTypes.INT.getSizeInBytes() && child.getDataType()
+        .equals(DataTypes.LONG)) {
+      // When datatype has been altered,
+      // get the actual data loaded size and then convert to long type.
+      size = DataTypes.INT.getSizeInBytes();
     } else {
       size = child.getDataType().getSizeInBytes();
     }
     actualData = getDataObject(dataBuffer, size);
-
     return actualData;
   }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
index 0710920..e74b6ea 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
@@ -119,15 +119,18 @@ public class RestructureUtil {
         for (CarbonDimension tableDimension : tableComplexDimension) {
           if (isColumnMatches(isTransactionalTable, queryDimension.getDimension(),
               tableDimension)) {
-            ProjectionDimension currentBlockDimension = null;
+            ProjectionDimension currentBlockDimension;
             // If projection dimension is child of struct field and contains Parent Ordinal
             if (null != queryDimension.getDimension().getComplexParentDimension()) {
               currentBlockDimension = new ProjectionDimension(queryDimension.getDimension());
             } else {
               currentBlockDimension = new ProjectionDimension(tableDimension);
+              // for complex dimension update datatype, set scale and precision by traversing
+              // the child dimensions. Spark requires the resultant row with latest datatype,
+              // update the dimension here to collect the resultant rows with updated datatype.
+              fillNewDatatypesForComplexChildren(currentBlockDimension.getDimension(),
+                  queryDimension.getDimension());
             }
-            // TODO: for complex dimension set scale and precision by traversing
-            // the child dimensions
             currentBlockDimension.setOrdinal(queryDimension.getOrdinal());
             presentDimension.add(currentBlockDimension);
             isDimensionExists[dimIndex] = true;
@@ -165,6 +168,46 @@ public class RestructureUtil {
     return presentDimension;
   }
 
+  public static CarbonDimension getCarbonDimension(String columnId,
+      List<CarbonDimension> dimensions) {
+    CarbonDimension carbonDimension = null;
+    if (dimensions == null) {
+      return carbonDimension;
+    }
+    for (CarbonDimension dim : dimensions) {
+      if (dim.isComplex()) {
+        carbonDimension = getCarbonDimension(columnId, dim.getListOfChildDimensions());
+      } else if (dim.getColumnId().equalsIgnoreCase(columnId)) {
+        carbonDimension = dim;
+        break;
+      }
+    }
+    return carbonDimension;
+  }
+
+  public static void fillNewDatatypesForComplexChildren(CarbonDimension currentDimension,
+      CarbonDimension tableDimension) {
+    if (tableDimension.getListOfChildDimensions() == null) {
+      return;
+    }
+    for (CarbonDimension childDimension : tableDimension.getListOfChildDimensions()) {
+      if (childDimension.getDataType().isComplexType()) {
+        fillNewDatatypesForComplexChildren(currentDimension, childDimension);
+      } else {
+        // only in case of primitive types, datatype is allowed to be altered.
+        // Find and update the current block dimension datatype.
+        CarbonDimension currentChildDimension = getCarbonDimension(childDimension.getColumnId(),
+            currentDimension.getListOfChildDimensions());
+        if (currentChildDimension != null) {
+          ColumnSchema currentSchema = currentChildDimension.getColumnSchema();
+          currentSchema.setDataType(childDimension.getDataType());
+          currentSchema.setPrecision(childDimension.getColumnSchema().getPrecision());
+          currentSchema.setScale(childDimension.getColumnSchema().getScale());
+        }
+      }
+    }
+  }
+
   public static void fillExistingTableColumnIDMap(CarbonDimension tableColumn) {
     existingTableColumnIDMap.put(tableColumn.getColumnId(), tableColumn.getColName());
     List<CarbonDimension> children = tableColumn.getListOfChildDimensions();
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index c2d380d..17d5601 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -740,7 +740,14 @@ public final class DataTypeUtil {
         if (isEmptyByteArray(dataInBytes)) {
           return null;
         }
-        return ByteUtil.toXorLong(dataInBytes, 0, dataInBytes.length);
+        DataType blockDatatype;
+        if (dataInBytes.length == DataTypes.INT.getSizeInBytes()) {
+          blockDatatype = DataTypes.INT;
+        } else {
+          blockDatatype = DataTypes.LONG;
+        }
+        return getDataBasedOnRestructuredDataType(dataInBytes, blockDatatype, 0,
+            dataInBytes.length);
       } else if (actualDataType == DataTypes.TIMESTAMP) {
         if (isEmptyByteArray(dataInBytes)) {
           return null;
diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index dd09b91..b482ae5 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -775,10 +775,9 @@ CarbonData DDL statements are documented here,which includes:
      ```
      ALTER TABLE carbon ADD COLUMNS (a1 INT, b1 STRING) TBLPROPERTIES('DEFAULT.VALUE.a1'='10')
      ```
-      **NOTE:** Adding of only single-level Complex datatype columns(only array and struct) is supported.
-      Example - 
+
       ```
-      ALTER TABLE <table-name> ADD COLUMNS(arrField array<int>, structField struct<id1:string,name1:string>)
+      ALTER TABLE <table-name> ADD COLUMNS(arrField array<array<int>>, structField struct<id1:string,name1:string>, mapField map<string,array<string>>)
       ```
 Users can specify which columns to include and exclude for local dictionary generation after adding new columns. These will be appended with the already existing local dictionary include and exclude columns of main table respectively.
      
@@ -866,7 +865,36 @@ Users can specify which columns to include and exclude for local dictionary gene
      ALTER TABLE test_db.carbon CHANGE oldArray newArray array<int>
      ```
 
-     **NOTE:** Once the column is renamed, user has to take care about replacing the fileheader with the new name or changing the column header in csv file.
+     Example 7: Change column name in column: oldMapCol map\<int, int> from oldMapCol to newMapCol.
+
+     ```
+     ALTER TABLE test_db.carbon CHANGE oldMapCol newMapCol map<int, int>
+     ```
+
+     Example 8: Change child column type in column: structField struct\<id:int> from int to long.
+
+     ```
+     ALTER TABLE test_db.carbon CHANGE structField structField struct<id:long>
+     ```
+
+     Example 9: Change column name and type in column: oldArray array\<int> from oldArray to newArray and int to long.
+
+     ```
+     ALTER TABLE test_db.carbon CHANGE oldArray newArray array<long>
+     ```
+     Example 10: Change column name and type in column: oldMapCol map\<int, decimal(5,2)> from oldMapCol to newMapCol and decimal(5,2) to decimal(6,2).
+
+     ```
+     ALTER TABLE test_db.carbon CHANGE oldMapCol newMapCol map<int, decimal(6,2)>
+     ```
+
+     Example 11: Change column name and type at nested level of column: structFiled struct\<a:int,b:map\<int,int>> from b to b2 and int to long.
+
+     ```
+     ALTER TABLE test_db.carbon CHANGE structFiled structFiled struct<a:int,b2:map<int,long>>
+     ```
+
+     **NOTE:** Once the column is renamed, user has to take care about replacing the file header with the new name or changing the column header in csv file.
    
    - #### MERGE INDEX
 
@@ -885,7 +913,6 @@ Users can specify which columns to include and exclude for local dictionary gene
      **NOTE:**
      * Merge index is supported on streaming table from carbondata 2.0.1 version.
      But streaming segments (ROW_V1) cannot create merge index.
-     * Rename column name is not supported for MAP type.
 
 
    - #### SET and UNSET
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
index 8f6aaad..5892ee7 100644
--- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.carbondata.spark.testsuite.secondaryindex
 
-import scala.collection.mutable
+import scala.collection.mutable.WrappedArray.make
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
@@ -59,8 +59,7 @@ class TestSIWithComplexArrayType extends QueryTest with BeforeAndAfterEach {
     sql("insert into complextable select 4,array('India'),'g',array('iron','man','jarvis')")
 
     checkAnswer(sql("select * from complextable where array_contains(arr2,'iron')"),
-      Seq(Row("4", mutable.WrappedArray.make(Array("India")), "g",
-        mutable.WrappedArray.make(Array("iron", "man", "jarvis")))))
+      Seq(Row("4", make(Array("India")), "g", make(Array("iron", "man", "jarvis")))))
     val result1 = sql("select * from complextable where array_contains(arr2,'iron') and name='g'")
     val result2 = sql("select * from complextable where arr2[0]='iron' and name='f'")
     sql("create index index_11 on table complextable(arr2, name) as 'carbondata'")
@@ -87,6 +86,71 @@ class TestSIWithComplexArrayType extends QueryTest with BeforeAndAfterEach {
     checkAnswer(result2, df2)
   }
 
+  test("Test restructured array<int> and existing string column as index columns on SI with compaction") {
+    sql("drop table if exists complextable")
+    sql("create table complextable (id string, name string, country array<string>) stored as carbondata")
+    sql("insert into complextable select 3,'f',array('china')")
+    sql("drop index if exists index_11 on complextable")
+    sql("ALTER TABLE complextable ADD COLUMNS(arr2 array<int>)")
+    sql("insert into complextable select 4,'g',array('India'),array(1)")
+    // change datatype
+    sql("alter table complextable change arr2 arr2 array<long>")
+    sql("insert into complextable select 3,'f',array('china'),array(26557544541,null)")
+    sql("insert into complextable select 4,'g',array('India'),array(26557544541,46557544541,3)")
+    checkAnswer(sql("select * from complextable where array_contains(arr2,3)"),
+      Seq(Row("4", "g", make(Array("India")), make(Array(26557544541L, 46557544541L, 3)))))
+    val result1 = sql("select * from complextable where array_contains(arr2,46557544541) and name='g'")
+    val result2 = sql("select * from complextable where arr2[0]=26557544541 and name='f'")
+    sql("create index index_11 on table complextable(arr2, name) as 'carbondata'")
+    sql("alter table complextable compact 'minor'")
+    val df1 = sql(" select * from complextable where array_contains(arr2,46557544541) and name='g'")
+    val df2 = sql(" select * from complextable where arr2[0]=26557544541 and name='f'")
+    if (!isFilterPushedDownToSI(df1.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+    if (!isFilterPushedDownToSI(df2.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+    val doNotHitSIDf = sql(
+      " select * from complextable where array_contains(arr2,26557544541) and array_contains" +
+      "(arr2,46557544541)")
+    if (isFilterPushedDownToSI(doNotHitSIDf.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+    checkAnswer(result1, df1)
+    checkAnswer(result2, df2)
+  }
+
+  test("Test restructured array<timestamp> as index column on SI with compaction") {
+    sql("drop table if exists complextable")
+    sql("create table complextable (name string, time date) stored as carbondata")
+    sql("insert into complextable select 'b', '2017-02-01'")
+    sql("ALTER TABLE complextable ADD COLUMNS(projectdate array<timestamp>)")
+    sql("insert into complextable select 'b', '2017-02-01',array('2017-02-01 00:01:00','')")
+    sql("drop index if exists index_1 on complextable")
+    sql("insert into complextable select 'b', '2017-02-01',array('2017-02-01 00:01:00','2018-02-01 02:00:00')")
+    sql("insert into complextable select 'b', '2017-02-01',array(null,'2018-02-01 02:00:00')")
+    sql("insert into complextable select 'b', '2017-02-01',null")
+    val result = sql(" select * from complextable where array_contains(projectdate,cast('2017-02-01 00:01:00' as timestamp))")
+    sql("create index index_1 on table complextable(projectdate) as 'carbondata'")
+    sql("alter table complextable compact 'minor'")
+    val df = sql(" select * from complextable where array_contains(projectdate,cast('2017-02-01 00:01:00' as timestamp))")
+    if (!isFilterPushedDownToSI(df.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+    checkAnswer(result, df)
+  }
+
+
+
   test("Test restructured array<string> and string columns as index columns on SI with compaction") {
     sql("drop table if exists complextable")
     sql("create table complextable (id string, country array<string>, name string) stored as carbondata")
@@ -102,8 +166,7 @@ class TestSIWithComplexArrayType extends QueryTest with BeforeAndAfterEach {
     sql("insert into complextable select 4,array('India'),'g',array('iron','man','jarvis'),'India'")
 
     checkAnswer(sql("select * from complextable where array_contains(arr2,'iron')"),
-      Seq(Row("4", mutable.WrappedArray.make(Array("India")), "g",
-        mutable.WrappedArray.make(Array("iron", "man", "jarvis")), "India")))
+      Seq(Row("4", make(Array("India")), "g", make(Array("iron", "man", "jarvis")), "India")))
     val result1 = sql("select * from complextable where array_contains(arr2,'iron') and addr='India'")
     val result2 = sql("select * from complextable where arr2[0]='iron' and addr='china'")
     sql("create index index_11 on table complextable(arr2, addr) as 'carbondata'")
diff --git a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
index d8ff50f..0640af0 100644
--- a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
+++ b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
@@ -291,8 +291,13 @@ public class SecondaryIndexQueryResultProcessor {
       } else {
         if (isComplexColumn && complexColumnParentBlockIndexes.length == 0) {
           // After restructure some complex column will not be present in parent block.
-          // In such case, set the SI implicit row value to empty byte array.
-          preparedRow[i] = new byte[0];
+          // In such case, set the SI implicit row value to null or empty byte array.
+          if (DataTypeUtil.isPrimitiveColumn(dims.getDataType())) {
+            // set null value for measures
+            preparedRow[i] = null;
+          } else {
+            preparedRow[i] = new byte[0];
+          }
         } else if (isComplexColumn) {
           // get the flattened data of complex column
           byte[] complexKeyByIndex = wrapper.getComplexKeyByIndex(complexIndex);
@@ -362,8 +367,14 @@ public class SecondaryIndexQueryResultProcessor {
    */
   private Object getData(Object[] data, int index, DataType dataType) {
     if (data == null || data.length == 0) {
+      if (DataTypeUtil.isPrimitiveColumn(dataType)) {
+        return null;
+      }
       return new byte[0];
     } else if (data[0] == null) {
+      if (DataTypeUtil.isPrimitiveColumn(dataType)) {
+        return null;
+      }
       return CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
     }
     if (dataType == DataTypes.TIMESTAMP && null != data[index]) {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
index cee16d7..7a5f53f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
@@ -1124,26 +1124,19 @@ object CarbonParserUtil {
     val dataTypeName = DataTypeConverterUtil.convertToCarbonType(complexField).getName
     val dataTypeInfo = CarbonParserUtil.parseDataType(columnName, dataTypeName.toLowerCase, values)
     complexField.dataType match {
-      case Some(CarbonCommonConstants.ARRAY) =>
-        val childField = complexField.children.get(0)
-        val childType = childField.dataType
-        val childName = columnName + CarbonCommonConstants.POINT + childField.name
-        val childValues = childType match {
-          case d: DecimalType => Some(List((d.precision, d.scale)))
-          case _ => None
-        }
-        val childDatatypeInfo = parseDataType(childName, childField, childValues)
-        dataTypeInfo.setChildren(List(childDatatypeInfo))
-      case Some(CarbonCommonConstants.STRUCT) =>
+      case Some(CarbonCommonConstants.ARRAY) | Some(CarbonCommonConstants.STRUCT) |
+           Some(CarbonCommonConstants.MAP) =>
         var childTypeInfoList: List[DataTypeInfo] = null
         for (childField <- complexField.children.get) {
           val childType = childField.dataType
-          val childName = columnName + CarbonCommonConstants.POINT + childField.name.get
-          val childValues = childType match {
-            case d: DecimalType => Some(List((d.precision, d.scale)))
-            case _ => None
+          val childName = columnName + CarbonCommonConstants.POINT + childField.column
+          val decimalValues = if (childType.get.contains(CarbonCommonConstants.DECIMAL)) {
+            // If datatype is decimal, extract its precision and scale.
+            Some(List(CommonUtil.getScaleAndPrecision(childType.get)))
+          } else {
+            None
           }
-          val childDatatypeInfo = parseDataType(childName, childField, childValues)
+          val childDatatypeInfo = parseDataType(childName, childField, decimalValues)
           if (childTypeInfoList == null) {
             childTypeInfoList = List(childDatatypeInfo)
           } else {
@@ -1153,7 +1146,6 @@ object CarbonParserUtil {
         dataTypeInfo.setChildren(childTypeInfoList)
       case _ =>
     }
-    // TODO have to handle for map types [CARBONDATA-4199]
     dataTypeInfo
   }
 
@@ -1210,22 +1202,16 @@ object CarbonParserUtil {
       case arrayType: ArrayType =>
         val childType: DataType = arrayType.elementType
         val childName = columnName + ".val"
-        val childValues = childType match {
-          case d: DecimalType => Some(List((d.precision, d.scale)))
-          case _ => None
-        }
-        val childDatatypeInfo = parseColumn(childName, childType, childValues)
+        val decimalValues = getDecimalValues(childType)
+        val childDatatypeInfo = parseColumn(childName, childType, decimalValues)
         dataTypeInfo.setChildren(List(childDatatypeInfo))
       case structType: StructType =>
         var childTypeInfoList: List[DataTypeInfo] = null
         for (childField <- structType) {
           val childType = childField.dataType
           val childName = columnName + CarbonCommonConstants.POINT + childField.name
-          val childValues = childType match {
-            case d: DecimalType => Some(List((d.precision, d.scale)))
-            case _ => None
-          }
-          val childDatatypeInfo = CarbonParserUtil.parseColumn(childName, childType, childValues)
+          val decimalValues = getDecimalValues(childType)
+          val childDatatypeInfo = CarbonParserUtil.parseColumn(childName, childType, decimalValues)
           if (childTypeInfoList == null) {
             childTypeInfoList = List(childDatatypeInfo)
           } else {
@@ -1233,12 +1219,33 @@ object CarbonParserUtil {
           }
         }
         dataTypeInfo.setChildren(childTypeInfoList)
+      case mapType: MapType =>
+        val keyType: DataType = mapType.keyType
+        val valType: DataType = mapType.valueType
+        var childTypeInfoList: List[DataTypeInfo] = List()
+        val childName1 = columnName + ".key"
+        val childName2 = columnName + ".value"
+        val keyTypeDecimalValues = getDecimalValues(keyType)
+        val valTypeDecimalValues = getDecimalValues(valType)
+        val childDatatypeInfo1 = CarbonParserUtil.parseColumn(childName1,
+          keyType, keyTypeDecimalValues)
+        val childDatatypeInfo2 = CarbonParserUtil.parseColumn(childName2,
+          valType, valTypeDecimalValues)
+        childTypeInfoList = childTypeInfoList :+ childDatatypeInfo1
+        childTypeInfoList = childTypeInfoList :+ childDatatypeInfo2
+        dataTypeInfo.setChildren(childTypeInfoList)
       case _ =>
     }
-    // TODO have to handle for map types [CARBONDATA-4199]
     dataTypeInfo
   }
 
+  def getDecimalValues(inputType: DataType): Option[List[(Int, Int)]] = {
+    inputType match {
+      case d: DecimalType => Some(List((d.precision, d.scale)))
+      case _ => None
+    }
+  }
+
   def checkFieldDefaultValue(fieldName: String, defaultValueColumnName: String): Boolean = {
     defaultValueColumnName.equalsIgnoreCase("default.value." + fieldName)
   }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 6c6030d..a00e58e 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -244,11 +244,6 @@ class AlterTableColumnSchemaGenerator(
   }
 
   private def createChildSchema(childField: Field, currentSchemaOrdinal: Int): ColumnSchema = {
-    // TODO: should support adding multi-level complex columns: CARBONDATA-4164
-    if (!childField.children.contains(null)) {
-      throw new UnsupportedOperationException(
-        "Alter add columns with nested complex types is not allowed")
-    }
 
     TableNewProcessor.createColumnSchema(
       childField,
@@ -263,18 +258,46 @@ class AlterTableColumnSchemaGenerator(
       isVarcharColumn(childField.name.getOrElse(childField.column)))
   }
 
+  def addComplexChildCols(currField: Field,
+      currColumnSchema: ColumnSchema,
+      newCols: mutable.Buffer[ColumnSchema],
+      allColumns: mutable.Buffer[ColumnSchema],
+      longStringCols: mutable.Buffer[ColumnSchema],
+      currentSchemaOrdinal: Int): Unit = {
+    if (currField.children.get == null || currField.children.isEmpty) {
+      return
+    }
+    if (currColumnSchema.getDataType.isComplexType) {
+      val noOfChildren = currField.children.get.size
+      currColumnSchema.setNumberOfChild(noOfChildren)
+      currField.children.get.foreach(childField => {
+        val childSchema: ColumnSchema = createChildSchema(childField, currentSchemaOrdinal)
+        if (childSchema.getDataType == DataTypes.VARCHAR) {
+          // put the new long string columns in 'longStringCols'
+          // and add them after old long string columns
+          longStringCols ++= Seq(childSchema)
+        } else {
+          allColumns ++= Seq(childSchema)
+        }
+        newCols ++= Seq(childSchema)
+        addComplexChildCols(childField, childSchema, newCols, allColumns, longStringCols,
+          currentSchemaOrdinal)
+      })
+    }
+  }
+
   def process: Seq[ColumnSchema] = {
     val tableSchema = tableInfo.getFactTable
     val tableCols = tableSchema.getListOfColumns.asScala
     // previous maximum column schema ordinal + 1 is the current column schema ordinal
     val currentSchemaOrdinal = tableCols.map(col => col.getSchemaOrdinal).max + 1
-    var longStringCols = Seq[ColumnSchema]()
+    val longStringCols = mutable.Buffer[ColumnSchema]()
     // get all original dimension columns
     // but exclude complex type columns and long string columns
     var allColumns = tableCols.filter(x =>
       (x.isDimensionColumn && !x.getDataType.isComplexType() && !x.isComplexColumn()
        && x.getSchemaOrdinal != -1 && (x.getDataType != DataTypes.VARCHAR)))
-    var newCols = Seq[ColumnSchema]()
+    val newCols = mutable.Buffer[ColumnSchema]()
     val invertedIndexCols: Array[String] = alterTableModel
       .tableProperties
       .get(CarbonCommonConstants.INVERTED_INDEX)
@@ -283,10 +306,6 @@ class AlterTableColumnSchemaGenerator(
 
     // add new dimension columns
     alterTableModel.dimCols.foreach(field => {
-      if (field.dataType.get.toLowerCase().equals(CarbonCommonConstants.MAP)) {
-        throw new MalformedCarbonCommandException(
-          s"Add column is unsupported for map datatype column: ${ field.column }")
-      }
       val encoders = new java.util.ArrayList[Encoding]()
       val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema(
         field,
@@ -307,34 +326,8 @@ class AlterTableColumnSchemaGenerator(
         allColumns ++= Seq(columnSchema)
       }
       newCols ++= Seq(columnSchema)
-      if (DataTypes.isArrayType(columnSchema.getDataType)) {
-        columnSchema.setNumberOfChild(field.children.size)
-        val childField = field.children.get(0)
-        val childSchema: ColumnSchema = createChildSchema(childField, currentSchemaOrdinal)
-        if (childSchema.getDataType == DataTypes.VARCHAR) {
-          // put the new long string columns in 'longStringCols'
-          // and add them after old long string columns
-          longStringCols ++= Seq(childSchema)
-        } else {
-          allColumns ++= Seq(childSchema)
-        }
-        newCols ++= Seq(childSchema)
-      } else if (DataTypes.isStructType(columnSchema.getDataType)) {
-        val noOfChildren = field.children.get.size
-        columnSchema.setNumberOfChild(noOfChildren)
-        for (i <- 0 to noOfChildren - 1) {
-          val childField = field.children.get(i)
-          val childSchema: ColumnSchema = createChildSchema(childField, currentSchemaOrdinal)
-          if (childSchema.getDataType == DataTypes.VARCHAR) {
-            // put the new long string columns in 'longStringCols'
-            // and add them after old long string columns
-            longStringCols ++= Seq(childSchema)
-          } else {
-            allColumns ++= Seq(childSchema)
-          }
-          newCols ++= Seq(childSchema)
-        }
-      }
+      addComplexChildCols(field, columnSchema, newCols, allColumns, longStringCols,
+        currentSchemaOrdinal)
     })
     // put the old long string columns
     allColumns ++= tableCols.filter(x =>
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
index 44b03ed..01491fa 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
@@ -38,8 +38,8 @@ import org.apache.carbondata.core.metadata.datatype.{DataTypes, DecimalType}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
 import org.apache.carbondata.events.{AlterTableColRenameAndDataTypeChangePostEvent, AlterTableColRenameAndDataTypeChangePreEvent, OperationContext, OperationListenerBus}
-import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.spark.util.DataTypeConverterUtil
+import org.apache.carbondata.format.{ColumnSchema, DataType, SchemaEvolutionEntry, TableInfo}
+import org.apache.carbondata.spark.util.{CommonUtil, DataTypeConverterUtil}
 
 abstract class CarbonAlterTableColumnRenameCommand(oldColumnName: String, newColumnName: String)
   extends MetadataCommand {
@@ -81,6 +81,9 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
   // stores mapping of altered column names: old-column-name -> new-column-name.
   // Including both parent/table and children columns
   val alteredColumnNamesMap = collection.mutable.LinkedHashMap.empty[String, String]
+  // stores mapping of altered column data types: old-column-name -> new-column-datatype.
+  // Including both parent/table and children columns
+  val alteredDatatypesMap = collection.mutable.LinkedHashMap.empty[String, String]
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -150,11 +153,6 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
       // set isDataTypeChange flag
       val oldDatatype = oldCarbonColumn.head.getDataType
       val newDatatype = alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.dataType
-      if (isColumnRename && (DataTypes.isMapType(oldDatatype) ||
-                             newDatatype.equalsIgnoreCase(CarbonCommonConstants.MAP))) {
-        throw new UnsupportedOperationException(
-          "Alter rename is unsupported for Map datatype column")
-      }
       if (oldDatatype.getName.equalsIgnoreCase(newDatatype)) {
         val newColumnPrecision =
           alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.precision
@@ -169,13 +167,13 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
              newColumnScale)) {
           isDataTypeChange = true
         }
-        if (DataTypes.isArrayType(oldDatatype) || DataTypes.isStructType(oldDatatype)) {
+        if (oldDatatype.isComplexType) {
           val oldParent = oldCarbonColumn.head
           val oldChildren = oldParent.asInstanceOf[CarbonDimension].getListOfChildDimensions.asScala
             .toList
           AlterTableUtil.validateComplexStructure(oldChildren,
             alterTableColRenameAndDataTypeChangeModel.dataTypeInfo.getChildren(),
-            alteredColumnNamesMap)
+            alteredColumnNamesMap, alteredDatatypesMap)
         }
       } else {
         if (oldDatatype.isComplexType) {
@@ -188,7 +186,7 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
       // If there is no columnrename and datatype change and comment change
       // return directly without execution
       if (!isColumnRename && !isDataTypeChange && !newColumnComment.isDefined &&
-          alteredColumnNamesMap.isEmpty) {
+          alteredColumnNamesMap.isEmpty && alteredDatatypesMap.isEmpty) {
         return Seq.empty
       }
       // if column datatype change operation is on partition column, then fail the
@@ -273,26 +271,41 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
           }
           addedTableColumnSchema = columnSchema
         } else if (isComplexChild(columnSchema)) {
-          if (alteredColumnNamesMap.contains(columnName)) {
-            // matches exactly
-            val newComplexChildName = alteredColumnNamesMap(columnName)
-            columnSchema.setColumn_name(newComplexChildName)
-            isSchemaEntryRequired = true
-          } else {
-            val alteredParent = checkIfParentIsAltered(columnName)
-            /*
-             * Lets say, if complex schema is: str struct<a: int>
-             * and if parent column is changed from str -> str2
-             * then its child name should also be changed from str.a -> str2.a
-             */
-            if (alteredParent != null) {
-              val newParent = alteredColumnNamesMap(alteredParent)
-              val newComplexChildName = newParent + columnName
-                .split(alteredParent)(1)
+          // check if name is altered
+          if (!alteredColumnNamesMap.isEmpty) {
+            if (alteredColumnNamesMap.contains(columnName)) {
+              // matches exactly
+              val newComplexChildName = alteredColumnNamesMap(columnName)
               columnSchema.setColumn_name(newComplexChildName)
               isSchemaEntryRequired = true
+            } else {
+              val alteredParent = checkIfParentIsAltered(columnName)
+              /*
+               * Lets say, if complex schema is: str struct<a: int>
+               * and if parent column is changed from str -> str2
+               * then its child name should also be changed from str.a -> str2.a
+               */
+              if (alteredParent != null) {
+                val newParent = alteredColumnNamesMap(alteredParent)
+                val newComplexChildName = newParent + columnName
+                  .split(alteredParent)(1)
+                columnSchema.setColumn_name(newComplexChildName)
+                isSchemaEntryRequired = true
+              }
             }
           }
+          // check if datatype is altered
+          if (!alteredDatatypesMap.isEmpty && alteredDatatypesMap.get(columnName) != None) {
+            val newDatatype = alteredDatatypesMap.get(columnName).get
+            if (newDatatype.equals(CarbonCommonConstants.LONG)) {
+              columnSchema.setData_type(DataType.LONG)
+            } else if (newDatatype.contains(CarbonCommonConstants.DECIMAL)) {
+              val (newPrecision, newScale) = CommonUtil.getScaleAndPrecision(newDatatype)
+              columnSchema.setPrecision(newPrecision)
+              columnSchema.setScale(newScale)
+            }
+            isSchemaEntryRequired = true
+          }
         }
 
         // make a new schema evolution entry after column rename or datatype change
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 3250f7a..f7c56ba 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -42,7 +42,7 @@ import org.apache.carbondata.core.index.IndexStoreManager
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.datatype.{DataTypes, DecimalType}
 import org.apache.carbondata.core.metadata.index.IndexType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension, ColumnSchema}
@@ -1088,7 +1088,8 @@ object AlterTableUtil {
    */
   def validateComplexStructure(oldDimensionList: List[CarbonDimension],
       newDimensionList: List[DataTypeInfo],
-      alteredColumnNamesMap: mutable.LinkedHashMap[String, String]): Unit = {
+      alteredColumnNamesMap: mutable.LinkedHashMap[String, String],
+      alteredDatatypesMap: mutable.LinkedHashMap[String, String]): Unit = {
     if (oldDimensionList == null && newDimensionList == null) {
       throw new UnsupportedOperationException("Both old and new dimensions are null")
     } else if (oldDimensionList == null || newDimensionList == null) {
@@ -1104,30 +1105,43 @@ object AlterTableUtil {
         val old_column_datatype = oldDimensionInfo.getDataType.getName
         val new_column_name = newDimensionInfo
           .columnName.split(CarbonCommonConstants.POINT.toCharArray).last
-        val new_column_datatype = newDimensionInfo.dataType
+        var new_column_datatype = newDimensionInfo.dataType
+
+        // check if column datatypes are altered. If altered, validate them
         if (!old_column_datatype.equalsIgnoreCase(new_column_datatype)) {
-          // datatypes of complex children cannot be altered. So throwing exception for now.
-          throw new UnsupportedOperationException(
-            "Altering datatypes of any child column is not supported")
+          this.validateColumnDataType(newDimensionInfo, oldDimensionInfo)
+          alteredDatatypesMap += (oldDimensionInfo.getColName -> new_column_datatype)
+        } else if (old_column_datatype.equalsIgnoreCase(CarbonCommonConstants.DECIMAL) &&
+                   old_column_datatype.equalsIgnoreCase(new_column_datatype)) {
+          val oldPrecision = oldDimensionInfo.getDataType().asInstanceOf[DecimalType].getPrecision
+          val oldScale = oldDimensionInfo.getDataType().asInstanceOf[DecimalType].getScale
+          if (oldPrecision != newDimensionInfo.precision || oldScale != newDimensionInfo.scale) {
+            this.validateColumnDataType(newDimensionInfo, oldDimensionInfo)
+            new_column_datatype = "decimal(" + newDimensionInfo.precision + "," +
+                                  newDimensionInfo.scale + ")"
+            alteredDatatypesMap += (oldDimensionInfo.getColName -> new_column_datatype)
+          }
         }
+
+        // check if column names are altered
         if (!old_column_name.equalsIgnoreCase(new_column_name)) {
           alteredColumnNamesMap += (oldDimensionInfo.getColName -> newDimensionInfo.columnName)
         }
-        if (old_column_datatype.equalsIgnoreCase(CarbonCommonConstants.MAP) ||
-            new_column_datatype.equalsIgnoreCase(CarbonCommonConstants.MAP)) {
-          throw new UnsupportedOperationException(
-            "Cannot alter complex structure that includes map type column")
-        } else if (new_column_datatype.equalsIgnoreCase(CarbonCommonConstants.ARRAY) ||
-                   old_column_datatype.equalsIgnoreCase(CarbonCommonConstants.ARRAY) ||
-                   new_column_datatype.equalsIgnoreCase(CarbonCommonConstants.STRUCT) ||
-                   old_column_datatype.equalsIgnoreCase(CarbonCommonConstants.STRUCT)) {
+        if (isComplexType(new_column_datatype) || isComplexType(old_column_datatype)) {
           validateComplexStructure(oldDimensionInfo.getListOfChildDimensions.asScala.toList,
-            newDimensionInfo.getChildren(), alteredColumnNamesMap)
+            newDimensionInfo.getChildren(), alteredColumnNamesMap, alteredDatatypesMap)
         }
       }
     }
   }
 
+  // To identify if the datatype name is of complex type.
+  def isComplexType(dataTypeName: String): Boolean = {
+    dataTypeName.equalsIgnoreCase(CarbonCommonConstants.ARRAY) ||
+    dataTypeName.equalsIgnoreCase(CarbonCommonConstants.STRUCT) ||
+    dataTypeName.equalsIgnoreCase(CarbonCommonConstants.MAP)
+  }
+
   /**
    * This method will validate a column for its data type and check whether the column data type
    * can be modified and update if conditions are met.
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
index 8f3497b..4b412cb 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
@@ -21,6 +21,7 @@ import java.sql.{Date, Timestamp}
 
 import scala.collection.JavaConverters
 import scala.collection.mutable
+import scala.collection.mutable.WrappedArray.make
 
 import org.apache.spark.sql.{CarbonEnv, Row}
 import org.apache.spark.sql.test.util.QueryTest
@@ -214,6 +215,28 @@ class TestAlterTableAddColumns extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS alter_com")
   }
 
+  test("Test adding of map of all primitive datatypes") {
+    sql("DROP TABLE IF EXISTS alter_com")
+    sql("CREATE TABLE alter_com(intfield int) STORED AS carbondata")
+    sql("ALTER TABLE alter_com ADD COLUMNS(map1 Map<short,int>, map2 Map<long,double>, " +
+        "map3 Map<decimal(3,2),string>, map4 Map<char(5),varchar(50)>, map5 Map<boolean,date>, " +
+        "map6 Map<string,timestamp>)")
+    sql("insert into alter_com values(1, map(1,2),map(3,2.34), map(1.23,'hello')," +
+        "map('abc','def'), map(true,'2017-02-01')," + "map('time','2018-02-01 02:00:00.0')) ")
+    sql("select * from alter_com").show(false)
+    checkAnswer(sql("select * from alter_com"),
+      Seq(Row(1,
+        Map(1 -> 2),
+        Map(3 -> 2.34),
+        Map(java.math.BigDecimal.valueOf(1.23).setScale(2) -> "hello"),
+        Map("abc" -> "def"),
+        Map(true -> Date.valueOf("2017-02-01")),
+        Map("time" -> Timestamp.valueOf("2018-02-01 02:00:00.0")))))
+    val addedColumns = addedColumnsInSchemaEvolutionEntry("alter_com")
+    assert(addedColumns.size == 6)
+  }
+
+
   test("Test alter add complex type and compaction") {
     sql("DROP TABLE IF EXISTS alter_com")
     sql("create table alter_com (a int, b string, arr1 array<string>) stored as carbondata")
@@ -271,7 +294,7 @@ class TestAlterTableAddColumns extends QueryTest with BeforeAndAfterAll {
       sql("insert into alter_struct values(1, named_struct('id1', 'id1','name1','name1'))")
       sql("ALTER TABLE alter_struct ADD COLUMNS(struct1 struct<a:string,b:string>, temp string," +
           " intField int, struct2 struct<c:string,d:string,e:int>, arr array<int>) TBLPROPERTIES " +
-          "('LOCAL_DICTIONARY_INCLUDE'='struct1, struct2')")
+          s"('$dictionary'='struct1, struct2')")
       val schema = sql("describe alter_struct").collect()
       assert(schema.size == 7)
     }
@@ -288,7 +311,6 @@ class TestAlterTableAddColumns extends QueryTest with BeforeAndAfterAll {
     sql(
       "insert into alter_com values(2,array(9,0),array(1,2,3),array('hello','world'),array(6,7)," +
       "array(8,9), named_struct('a',1,'b','abcde') )")
-    sql("select * from alter_com").show(false)
     checkAnswer(sql("select * from alter_com where array_contains(arr4,6)"),
       Seq(Row(2,
         make(Array(9, 0)),
@@ -354,22 +376,46 @@ class TestAlterTableAddColumns extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS alter_struct")
   }
 
-  test("Validate alter add multi-level complex column") {
+  test("test alter add multi-level complex columns") {
     sql("DROP TABLE IF EXISTS alter_com")
-    sql("CREATE TABLE alter_com(intField INT, arr array<int>) " +
-      "STORED AS carbondata ")
-    var exception = intercept[Exception] {
-      sql("ALTER TABLE alter_com ADD COLUMNS(arr1 array<array<int>>) ")
-    }
-    val exceptionMessage =
-      "operation failed for default.alter_com: Alter table add operation failed: Alter add " +
-      "columns with nested complex types is not allowed"
-    assert(exception.getMessage.contains(exceptionMessage))
-
-    exception = intercept[Exception] {
-      sql("ALTER TABLE alter_com ADD COLUMNS(struct1 struct<arr: array<int>>) ")
-    }
-    assert(exception.getMessage.contains(exceptionMessage))
+    sql("CREATE TABLE alter_com(intField INT) STORED AS carbondata ")
+    sql("insert into alter_com values(1)")
+    // multi-level nested array
+    sql(
+      "ALTER TABLE alter_com ADD COLUMNS(arr1 array<array<int>>, arr2 array<struct<a1:string, " +
+      "map1:Map<string, string>>>) ")
+    sql(
+      "insert into alter_com values(1, array(array(1,2)), array(named_struct('a1','st','map1', " +
+      "map('a','b'))))")
+    // multi-level nested struct
+    sql("ALTER TABLE alter_com ADD COLUMNS(struct1 struct<s1:string, arr: array<int>>," +
+        " struct2 struct<num:double,contact:map<string,array<int>>>) ")
+    sql("insert into alter_com values(1, " +
+        "array(array(1,2)), array(named_struct('a1','st','map1', map('a','b'))), " +
+        "named_struct('s1','hi','arr',array(1,2)), named_struct('num',2.3,'contact',map('ph'," +
+        "array(1,2))))")
+    // multi-level nested map
+    sql(
+      "ALTER TABLE alter_com ADD COLUMNS(map1 map<string,array<string>>, map2 map<string," +
+      "struct<d:int, s:struct<im:string>>>)")
+    sql("insert into alter_com values(1,  " +
+        "array(array(1,2)), array(named_struct('a1','st','map1', map('a','b'))), " +
+        "named_struct('s1','hi','arr',array(1,2)), named_struct('num',2.3,'contact',map('ph'," +
+        "array(1,2))),map('a',array('hi')), map('a',named_struct('d',23,'s',named_struct('im'," +
+        "'sh'))))")
+    sql("alter table alter_com compact 'minor'")
+    checkAnswer(sql("select * from alter_com"),
+      Seq(Row(1, null, null, null, null, null, null),
+        Row(1, make(Array(make(Array(1, 2)))), make(Array(Row("st", Map("a" -> "b")))),
+          null, null, null, null),
+        Row(1, make(Array(make(Array(1, 2)))), make(Array(Row("st", Map("a" -> "b")))),
+          Row("hi", make(Array(1, 2))), Row(2.3, Map("ph" -> make(Array(1, 2)))), null, null),
+        Row(1, make(Array(make(Array(1, 2)))), make(Array(Row("st", Map("a" -> "b")))),
+          Row("hi", make(Array(1, 2))), Row(2.3, Map("ph" -> make(Array(1, 2)))),
+          Map("a" -> make(Array("hi"))), Map("a" -> Row(23, Row("sh"))))
+      ))
+    val addedColumns = addedColumnsInSchemaEvolutionEntry("alter_com")
+    assert(addedColumns.size == 6)
     sql("DROP TABLE IF EXISTS alter_com")
   }
 
@@ -389,21 +435,6 @@ class TestAlterTableAddColumns extends QueryTest with BeforeAndAfterAll {
     assert(exception.getMessage.contains(exceptionMessage))
   }
 
-  test("Validate adding of map types through alter command") {
-    sql("DROP TABLE IF EXISTS alter_com")
-    sql(
-      "CREATE TABLE alter_com(doubleField double, arr1 array<long>, m map<int, string> ) STORED " +
-      "AS carbondata")
-    sql("insert into alter_com values(1.1,array(77),map(1,'abc'))")
-    val exception = intercept[Exception] {
-      sql("ALTER TABLE alter_com ADD COLUMNS(mapField map<int, string>)")
-    }
-    val exceptionMessage =
-      "operation failed for default.alter_com: Alter table add operation failed: Add column is " +
-      "unsupported for map datatype column: mapfield"
-    assert(exception.getMessage.contains(exceptionMessage))
-  }
-
   test("alter table add complex columns with comment") {
     sql("""create table test_add_column_with_comment(
           | col1 string comment 'col1 comment',
diff --git a/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala b/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
index c54eb9a..cc7152b 100644
--- a/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
@@ -83,13 +83,6 @@ class AlterTableColumnRenameTestCase extends QueryTest with BeforeAndAfterAll {
       "CREATE TABLE test_rename (str struct<a:int,b:long>, str2 struct<a:int,b:long>, map1 " +
       "map<string, string>, str3 struct<a:int, b:map<string, string>>) STORED AS carbondata")
 
-    val ex1 = intercept[ProcessMetaDataException] {
-      sql("alter table test_rename change str str struct<a:array<int>,b:long>")
-    }
-    assert(ex1.getMessage
-      .contains(
-        "column rename operation failed: Altering datatypes of any child column is not supported"))
-
     val ex2 = intercept[ProcessMetaDataException] {
       sql("alter table test_rename change str str struct<a:int,b:long,c:int>")
     }
@@ -125,14 +118,10 @@ class AlterTableColumnRenameTestCase extends QueryTest with BeforeAndAfterAll {
       sql("alter table test_rename change map1 map2 map<string, struct<a:int>>")
     }
     assert(ex6.getMessage
-      .contains("rename operation failed: Alter rename is unsupported for Map datatype column"))
-
-    val ex7 = intercept[ProcessMetaDataException] {
-      sql("alter table test_rename change str3 str33 struct<a:int, bc:map<string, string>>")
-    }
-    assert(ex7.getMessage
       .contains(
-        "rename operation failed: Cannot alter complex structure that includes map type column"))
+        "operation failed for default.test_rename: Alter table data type change or column rename " +
+        "operation failed: Given column map1.val.value with data type STRING cannot be modified. " +
+        "Only Int and Decimal data types are allowed for modification"))
 
     // ensure all failed rename operations have been reverted to original state
     val describe = sql("desc table test_rename")
@@ -307,17 +296,128 @@ class AlterTableColumnRenameTestCase extends QueryTest with BeforeAndAfterAll {
           make(Array("hello11", "world11")), make(Array(4555)))))
   }
 
-  test("validate alter change datatype for complex children columns") {
+  test("test alter rename and change datatype for map of (primitive/array/struct)") {
     sql("drop table if exists test_rename")
     sql(
-      "CREATE TABLE test_rename (str struct<a:int,b:long>) STORED AS carbondata")
-
-    val ex1 = intercept[ProcessMetaDataException] {
-      sql("alter table test_rename change str str struct<a:long,b:long>")
-    }
-    assert(ex1.getMessage
-      .contains(
-        "column rename operation failed: Altering datatypes of any child column is not supported"))
+      "CREATE TABLE test_rename (map1 map<int,int>, map2 map<string,array<int>>, " +
+      "map3 map<int, map<string,int>>, map4 map<string,struct<b:int>>) STORED AS carbondata")
+    sql("insert into test_rename values (map(1,2), map('a',array(1,2)), " +
+      "map(2,map('hello',1)), map('hi',named_struct('b',3)))")
+    // rename parent column from map1 to map11 and read old rows
+    sql("alter table test_rename change map1 map11 map<int,int>")
+    sql("insert into test_rename values (map(1,2), map('a',array(1,2)), " +
+      "map(2,map('hello',1)), map('hi',named_struct('b',3)))")
+    checkAnswer(sql("select map11 from test_rename"), Seq(Row(Map(1 -> 2)),
+      Row(Map(1 -> 2))))
+    // rename parent column from map2 to map22 and read old rows
+    sql("alter table test_rename change map2 map22 map<string,array<int>>")
+    sql("insert into test_rename values (map(1,2), map('a',array(1,2)), " +
+      "map(2,map('hello',1)), map('hi',named_struct('b',3)))")
+    checkAnswer(sql("select map22 from test_rename"), Seq(Row(Map("a" -> make(Array(1, 2)))),
+      Row(Map("a" -> make(Array(1, 2)))), Row(Map("a" -> make(Array(1, 2))))))
+    // rename child column and change datatype
+    sql("alter table test_rename change map4 map4 map<string,struct<b2:long>>")
+    sql("insert into test_rename values (map(1,2), map('a',array(1,2)), " +
+        "map(2,map('hello',1)), map('hi',named_struct('b',26557544541)))")
+    checkAnswer(sql("describe test_rename"),
+      Seq(Row("map11", "map<int,int>", null),
+        Row("map22", "map<string,array<int>>", null),
+        Row("map3", "map<int,map<string,int>>", null),
+        Row("map4", "map<string,struct<b2:bigint>>", null)))
+    checkAnswer(sql("select map4['hi']['b2'] from test_rename"),
+      Seq(Row(3), Row(3), Row(3), Row(26557544541L)))
+  }
+
+  test("test alter rename and change datatype for struct integer") {
+    sql("drop table if exists test_rename")
+    sql("CREATE TABLE test_rename (str struct<a:int>) STORED AS carbondata")
+    sql("insert into test_rename values(named_struct('a', 1234))")
+    sql("insert into test_rename values(named_struct('a', 3456))")
+    // only rename operation
+    sql("alter table test_rename change str str1 struct<a1:int>")
+    // both rename and change datatype operation
+    sql("alter table test_rename change str1 str1 struct<a2:long>")
+    sql("insert into test_rename values(named_struct('a2', 26557544541))")
+    // rename child column
+    sql("alter table test_rename change str1 str2 struct<a3:long>")
+    sql("insert into test_rename values(named_struct('a3', 26557544541))")
+    checkAnswer(sql("describe test_rename"), Seq(Row("str2", "struct<a3:bigint>", null)))
+    checkAnswer(sql("select str2 from test_rename"),
+      Seq(Row(Row(1234L)), Row(Row(3456L)), Row(Row(26557544541L)), Row(Row(26557544541L))))
+  }
+
+  test("test alter rename and change datatype for map integer") {
+    sql("drop table if exists test_rename")
+    sql("CREATE TABLE test_rename (name string,mapField1 MAP<int, int>) STORED AS carbondata")
+    sql("insert into test_rename values('a',map(1,2))")
+    sql("insert into test_rename values('v',map(3,4))")
+    sql(s"create index si_1 on test_rename(name) as 'carbondata'")
+    // only rename operation
+    sql("alter table test_rename change mapField1 mapField2 MAP<int, int>")
+    sql("insert into test_rename values('df',map(5, 6))")
+    // both rename and change datatype operation
+    sql("alter table test_rename change mapField2 mapField3 MAP<int, long>")
+    sql("insert into test_rename values('sdf',map(7, 26557544541))")
+    sql("describe test_rename").show(false)
+    checkAnswer(sql("describe test_rename"),
+      Seq(Row("name", "string", null), Row("mapfield3", "map<int,bigint>", null)))
+    checkAnswer(sql("select mapField3 from test_rename"),
+      Seq(Row(Map(1 -> 2L)), Row(Map(3 -> 4L)), Row(Map(5 -> 6L)), Row(Map(7 -> 26557544541L))))
+  }
+
+  test("test alter rename and change datatype for array integer") {
+    sql("drop table if exists test_rename")
+    sql("CREATE TABLE test_rename (arr array<int>) STORED AS carbondata")
+    sql("insert into test_rename values(array(1,2,3))")
+    sql("insert into test_rename values(array(4,5,6))")
+    // only rename operation
+    sql("alter table test_rename change arr arr1 array<int>")
+    sql("insert into test_rename values(array(7,8,9))")
+    // both rename and change datatype operation
+    sql("alter table test_rename change arr1 arr2 array<long>")
+    sql("insert into test_rename values(array(26557544541,3,46557544541))")
+    checkAnswer(sql("describe test_rename"), Seq(Row("arr2", "array<bigint>", null)))
+    checkAnswer(sql("select arr2 from test_rename"),
+      Seq(Row(make(Array(1, 2, 3))), Row(make(Array(4, 5, 6))), Row(make(Array(7, 8, 9))),
+        Row(make(Array(26557544541L, 3, 46557544541L)))))
+  }
+
+  test("test alter rename and change datatype for complex decimal types") {
+    sql("drop table if exists test_rename")
+    sql("CREATE TABLE test_rename (strField struct<a:decimal(5,2)>," +
+        "mapField1 map<int,decimal(5,2)>, mapField2 map<int,struct<a:decimal(5,2)>>, " +
+        "arrField array<decimal(5,2)>) STORED AS carbondata")
+    sql("insert into test_rename values(named_struct('a', 123.45),map(1, 123.45)," +
+        "map(1, named_struct('a', 123.45)),array(123.45))")
+    sql("insert into test_rename values(named_struct('a', 123.45),map(2, 123.45)," +
+        "map(2, named_struct('a', 123.45)),array(123.45))")
+    // rename and change datatype
+    sql("alter table test_rename change strField strField1 struct<a1:decimal(6,2)>")
+    sql("alter table test_rename change mapField1 mapField11 map<int,decimal(6,2)>")
+    // rename and change nested decimal datatype
+    sql("alter table test_rename change mapField2 mapField22 map<int,struct<a2:decimal(6,2)>>")
+    sql("alter table test_rename change arrField arrField1 array<decimal(6,2)>")
+    sql("insert into test_rename values(named_struct('a', 1234.45),map(1, 1234.45)," +
+        "map(1, named_struct('a2', 1234.45)),array(1234.45))")
+    sql("insert into test_rename values(named_struct('a', 1234.45),map(2, 1234.45)," +
+        "map(2, named_struct('a2', 1234.45)),array(1234.45))")
+    sql("alter table test_rename compact 'minor'")
+    checkAnswer(sql("describe test_rename"),
+      Seq(Row("strfield1", "struct<a1:decimal(6,2)>", null),
+        Row("mapfield11", "map<int,decimal(6,2)>", null),
+        Row("mapfield22", "map<int,struct<a2:decimal(6,2)>>", null),
+        Row("arrfield1", "array<decimal(6,2)>", null)))
+    val result1 = java.math.BigDecimal.valueOf(123.45).setScale(2)
+    val result2 = java.math.BigDecimal.valueOf(1234.45).setScale(2)
+    checkAnswer(sql("select strField1,mapField11,mapField22,arrField1 from test_rename"),
+      Seq(Row(Row(result1), Map(1 -> result1), Map(1 -> Row(result1)),
+        make(Array(result1))),
+        Row(Row(result1), Map(2 -> result1), Map(2 -> Row(result1)),
+          make(Array(result1))),
+        Row(Row(result2), Map(1 -> result2), Map(1 -> Row(result2)),
+          make(Array(result2))),
+        Row(Row(result2), Map(2 -> result2), Map(2 -> Row(result2)),
+          make(Array(result2)))))
   }
 
   test("test change comment in case of complex types") {
diff --git a/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/ChangeDataTypeTestCases.scala b/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/ChangeDataTypeTestCases.scala
index 6e60c45..e0e5c58 100644
--- a/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/ChangeDataTypeTestCases.scala
+++ b/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/ChangeDataTypeTestCases.scala
@@ -19,11 +19,14 @@ package org.apache.spark.carbondata.restructure.vectorreader
 
 import java.math.BigDecimal
 
+import scala.collection.mutable.WrappedArray.make
+
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
 
 class ChangeDataTypeTestCases extends QueryTest with BeforeAndAfterAll {
 
@@ -173,6 +176,55 @@ class ChangeDataTypeTestCases extends QueryTest with BeforeAndAfterAll {
     test_change_data_type()
   }
 
+  test("test alter change datatype for complex types") {
+    sql("drop table if exists test_rename")
+    sql("CREATE TABLE test_rename (name string) STORED AS carbondata")
+    // add complex columns
+    sql("alter table test_rename add columns(mapField1 MAP<int, int>, " +
+        "strField1 struct<a:int,b:decimal(5,2)>, arrField1 array<int>)")
+    sql("insert into test_rename values('df',map(5, 6),named_struct('a',1,'b', 123.45),array(1))")
+    // change datatype operation
+    sql("alter table test_rename change mapField1 mapField1 MAP<int, long>")
+    assert(intercept[ProcessMetaDataException] {
+      sql("alter table test_rename change strField1 strField1 struct<a:long,b:decimal(3,2)>")
+    }.getMessage
+      .contains(
+        "operation failed for default.test_rename: Alter table data type change or column rename " +
+        "operation failed: Given column strfield1.b cannot be modified. Specified precision value" +
+        " 3 should be greater than current precision value 5"))
+    sql("alter table test_rename change strField1 strField1 struct<a:long,b:decimal(6,2)>")
+    sql("alter table test_rename change arrField1 arrField1 array<long>")
+    sql("insert into test_rename values('sdf',map(7, 26557544541)," +
+        "named_struct('a',26557544541,'b', 1234.45),array(26557544541))")
+    // add nested complex columns
+    sql("alter table test_rename add columns(mapField2 MAP<int, array<int>>, " +
+        "strField2 struct<a:int,b:MAP<int, int>>, arrField2 array<struct<a:int>>)")
+    sql("insert into test_rename values('df',map(7, 26557544541),named_struct" +
+        "('a',26557544541,'b', 1234.45),array(26557544541),map(5, array(1))," +
+        "named_struct('a',1,'b',map(5,6)),array(named_struct('a',1)))")
+    // change datatype operation at nested level
+    sql("alter table test_rename change mapField2 mapField2 MAP<int, array<long>>")
+    sql("alter table test_rename change strField2 strField2 struct<a:int,b:map<int,long>>")
+    sql("alter table test_rename change arrField2 arrField2 array<struct<a:long>>")
+    sql("insert into test_rename values('sdf',map(7, 26557544541),named_struct" +
+        "('a',26557544541,'b', 1234.45),array(26557544541),map(7, array(26557544541))," +
+        "named_struct('a',2,'b', map(7, 26557544541)),array(named_struct('a',26557544541)))")
+    sql("alter table test_rename compact 'minor'")
+    val result1 = java.math.BigDecimal.valueOf(123.45).setScale(2)
+    val result2 = java.math.BigDecimal.valueOf(1234.45).setScale(2)
+    checkAnswer(sql("select * from test_rename"),
+      Seq(
+        Row("df", Map(5 -> 6), Row(1, result1), make(Array(1)), null, null, null),
+        Row("sdf", Map(7 -> 26557544541L), Row(26557544541L, result2), make(Array(26557544541L)),
+          null, null, null),
+        Row("df", Map(7 -> 26557544541L), Row(26557544541L, result2), make(Array(26557544541L)),
+          Map(5 -> make(Array(1))), Row(1, Map(5 -> 6)), make(Array(Row(1)))),
+        Row("sdf", Map(7 -> 26557544541L), Row(26557544541L, result2), make(Array(26557544541L)),
+          Map(7 -> make(Array(26557544541L))), Row(2, Map(7 -> 26557544541L)),
+          make(Array(Row(26557544541L))))
+      ))
+  }
+
   override def afterAll {
     sqlContext.setConf("carbon.enable.vector.reader",
       CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)