You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/28 05:15:02 UTC

[carbondata] branch master updated: [CARBONDATA-3357] Support TableProperties from single parent table and restrict alter/delete/partition on mv

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a28dba  [CARBONDATA-3357] Support TableProperties from single parent table and restrict alter/delete/partition on mv
2a28dba is described below

commit 2a28dba04236ce976984d9cbc398eb8fa517d6f5
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Wed Apr 24 01:04:21 2019 +0530

    [CARBONDATA-3357] Support TableProperties from single parent table and restrict alter/delete/partition on mv
    
    Inherit Table Properties from main table to mv datamap table, if datamap has single parent table, else use
    default table properties.
    Restrict Alter/Delete/Partition operations on MV
    
    This closes #3184
---
 .../core/datamap/DataMapStoreManager.java          |  27 +-
 .../carbondata/core/datamap/DataMapUtil.java       |   1 +
 .../core/metadata/schema/table/CarbonTable.java    |  17 --
 .../core/metadata/schema/table/DataMapSchema.java  |  14 +
 .../carbondata/mv/datamap/MVDataMapProvider.scala  |  19 +-
 .../apache/carbondata/mv/datamap/MVHelper.scala    | 110 ++++++--
 .../org/apache/carbondata/mv/datamap/MVUtil.scala  | 287 +++++++++++++++++++++
 .../mv/rewrite/MVCountAndCaseTestCase.scala        |   2 -
 .../carbondata/mv/rewrite/MVCreateTestCase.scala   |  29 +--
 .../mv/rewrite/MVIncrementalLoadingTestcase.scala  |   1 -
 .../mv/rewrite/MVMultiJoinTestCase.scala           |   8 +-
 .../carbondata/mv/rewrite/MVTpchTestCase.scala     |  10 +-
 .../mv/rewrite/TestAllOperationsOnMV.scala         | 255 ++++++++++++++++++
 .../mv/rewrite/matching/TestSQLBatch.scala         |   4 +-
 .../preaggregate/TestPreAggregateLoad.scala        |   2 +-
 .../TestTimeSeriesUnsupportedSuite.scala           |   8 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala     |   9 +-
 .../command/datamap/CarbonDropDataMapCommand.scala |   9 +
 .../management/CarbonCleanFilesCommand.scala       |   3 +-
 .../execution/command/mv/DataMapListeners.scala    | 146 ++++++++++-
 .../CarbonAlterTableDropHivePartitionCommand.scala |   7 +-
 .../preaaggregate/PreAggregateListeners.scala      |   6 +-
 .../preaaggregate/PreAggregateTableHelper.scala    | 102 +-------
 .../schema/CarbonAlterTableRenameCommand.scala     |   7 +-
 .../spark/sql/execution/strategy/DDLStrategy.scala |   4 +-
 .../spark/sql/hive/CarbonAnalysisRules.scala       |  10 +-
 .../scala/org/apache/spark/util/DataMapUtil.scala  | 160 ++++++++++++
 27 files changed, 1054 insertions(+), 203 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 81b1fb2..89402c2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -281,19 +281,22 @@ public final class DataMapStoreManager {
       dataMapCatalogs = new ConcurrentHashMap<>();
       List<DataMapSchema> dataMapSchemas = getAllDataMapSchemas();
       for (DataMapSchema schema : dataMapSchemas) {
-        DataMapCatalog dataMapCatalog = dataMapCatalogs.get(schema.getProviderName());
-        if (dataMapCatalog == null) {
-          dataMapCatalog = dataMapProvider.createDataMapCatalog();
-          if (null == dataMapCatalog) {
-            throw new RuntimeException("Internal Error.");
+        if (schema.getProviderName()
+            .equalsIgnoreCase(dataMapProvider.getDataMapSchema().getProviderName())) {
+          DataMapCatalog dataMapCatalog = dataMapCatalogs.get(schema.getProviderName());
+          if (dataMapCatalog == null) {
+            dataMapCatalog = dataMapProvider.createDataMapCatalog();
+            if (null == dataMapCatalog) {
+              throw new RuntimeException("Internal Error.");
+            }
+            dataMapCatalogs.put(schema.getProviderName(), dataMapCatalog);
+          }
+          try {
+            dataMapCatalog.registerSchema(schema);
+          } catch (Exception e) {
+            // Ignore the schema
+            LOGGER.error("Error while registering schema", e);
           }
-          dataMapCatalogs.put(schema.getProviderName(), dataMapCatalog);
-        }
-        try {
-          dataMapCatalog.registerSchema(schema);
-        } catch (Exception e) {
-          // Ignore the schema
-          LOGGER.error("Error while registering schema", e);
         }
       }
     }
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
index 0a604fb..e20f19a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
@@ -270,4 +270,5 @@ public class DataMapUtil {
     }
     return segmentList;
   }
+
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index afb5fd3..4f4475d 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -63,7 +63,6 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
-import static org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.MV;
 import static org.apache.carbondata.core.util.CarbonUtil.thriftColumnSchemaToWrapperColumnSchema;
 
 import com.google.common.collect.Lists;
@@ -1321,22 +1320,6 @@ public class CarbonTable implements Serializable, Writable {
   }
 
   /**
-   * Return true if MV datamap present in the specified table
-   * @param carbonTable
-   * @return timeseries data map present
-   */
-  public static boolean hasMVDataMap(CarbonTable carbonTable) throws IOException {
-    List<DataMapSchema> dataMapSchemaList = DataMapStoreManager.getInstance()
-        .getDataMapSchemasOfTable(carbonTable);
-    for (DataMapSchema dataMapSchema : dataMapSchemaList) {
-      if (dataMapSchema.getProviderName().equalsIgnoreCase(MV.toString())) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  /**
    * Return all inverted index columns in this table
    */
   public List<ColumnSchema> getInvertedIndexColumns() {
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
index a48b03c..b927ce0 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 
 import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider;
@@ -80,6 +81,11 @@ public class DataMapSchema implements Serializable, Writable {
    */
   protected TableSchema childSchema;
 
+  /**
+   * main table column list mapped to datamap table
+   */
+  private Map<String, Set<String>> mainTableColumnList;
+
   public DataMapSchema(String dataMapName, String providerName) {
     this.dataMapName = dataMapName;
     this.providerName = providerName;
@@ -250,4 +256,12 @@ public class DataMapSchema implements Serializable, Writable {
   @Override public int hashCode() {
     return Objects.hash(dataMapName);
   }
+
+  public Map<String, Set<String>> getMainTableColumnList() {
+    return mainTableColumnList;
+  }
+
+  public void setMainTableColumnList(Map<String, Set<String>> mainTableColumnList) {
+    this.mainTableColumnList = mainTableColumnList;
+  }
 }
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
index 26c4cb6..90b7dbc 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
@@ -58,7 +58,21 @@ class MVDataMapProvider(
         "select statement is mandatory")
     }
     MVHelper.createMVDataMap(sparkSession, dataMapSchema, ctasSqlStatement, true)
-    DataMapStoreManager.getInstance.registerDataMapCatalog(this, dataMapSchema)
+    try {
+      DataMapStoreManager.getInstance.registerDataMapCatalog(this, dataMapSchema)
+      if (dataMapSchema.isLazy) {
+        DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName)
+      }
+    } catch {
+      case exception: Exception =>
+        dropTableCommand = new CarbonDropTableCommand(true,
+          new Some[String](dataMapSchema.getRelationIdentifier.getDatabaseName),
+          dataMapSchema.getRelationIdentifier.getTableName,
+          true)
+        dropTableCommand.run(sparkSession)
+        DataMapStoreManager.getInstance().dropDataMapSchema(dataMapSchema.getDataMapName)
+        throw exception
+    }
   }
 
   override def initData(): Unit = {
@@ -141,7 +155,8 @@ class MVDataMapProvider(
         dataFrame = Some(queryPlan),
         updateModel = None,
         tableInfoOp = None,
-        internalOptions = Map("mergedSegmentName" -> newLoadName),
+        internalOptions = Map("mergedSegmentName" -> newLoadName,
+          CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true"),
         partition = Map.empty)
 
       try {
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index 810449c..4bcaa1d 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -29,17 +29,17 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeRef
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join, LogicalPlan, Project}
 import org.apache.spark.sql.execution.command.{Field, TableModel, TableNewProcessor}
-import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
+import org.apache.spark.sql.execution.command.table.{CarbonCreateTableCommand, CarbonDropTableCommand}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.spark.util.DataMapUtil
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.datamap.status.DataMapStatusManager
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
-import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, RelationIdentifier}
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, RelationIdentifier}
 import org.apache.carbondata.datamap.DataMapManager
 import org.apache.carbondata.mv.plans.modular.{GroupBy, Matchable, ModularPlan, Select}
 import org.apache.carbondata.mv.rewrite.{MVPlanWrapper, QueryRewrite, SummaryDatasetCatalog}
@@ -58,7 +58,12 @@ object MVHelper {
     val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString)
     val query = sparkSession.sql(updatedQuery)
     val logicalPlan = MVHelper.dropDummFuc(query.queryExecution.analyzed)
-    validateMVQuery(sparkSession, logicalPlan)
+    val selectTables = getTables(logicalPlan)
+    if (selectTables.isEmpty) {
+      throw new MalformedCarbonCommandException(
+        s"Non-Carbon table does not support creating MV datamap")
+    }
+    val updatedQueryWithDb = validateMVQuery(sparkSession, logicalPlan)
     val fullRebuild = isFullReload(logicalPlan)
     val fields = logicalPlan.output.map { attr =>
       val name = updateColumnName(attr)
@@ -81,29 +86,41 @@ object MVHelper {
       }
     }
     val tableProperties = mutable.Map[String, String]()
-    dmProperties.foreach(t => tableProperties.put(t._1, t._2))
-
-    val selectTables = getTables(logicalPlan)
     val parentTables = new util.ArrayList[String]()
+    val parentTablesList = new util.ArrayList[CarbonTable](selectTables.size)
     selectTables.foreach { selectTable =>
       val mainCarbonTable = try {
         Some(CarbonEnv.getCarbonTable(selectTable.identifier.database,
           selectTable.identifier.table)(sparkSession))
       } catch {
         // Exception handling if it's not a CarbonTable
-        case ex : Exception => None
+        case ex: Exception =>
+          throw new MalformedCarbonCommandException(
+            s"Non-Carbon table does not support creating MV datamap")
       }
       parentTables.add(mainCarbonTable.get.getTableName)
       if (!mainCarbonTable.isEmpty && mainCarbonTable.get.isStreamingSink) {
         throw new MalformedCarbonCommandException(
           s"Streaming table does not support creating MV datamap")
       }
+      parentTablesList.add(mainCarbonTable.get)
     }
     tableProperties.put(CarbonCommonConstants.DATAMAP_NAME, dataMapSchema.getDataMapName)
     tableProperties.put(CarbonCommonConstants.PARENT_TABLES, parentTables.asScala.mkString(","))
 
-    // TODO inherit the table properties like sort order, sort scope and block size from parent
-    // tables to mv datamap table
+    val fieldRelationMap = MVUtil.getFieldsAndDataMapFieldsFromPlan(
+      logicalPlan, queryString, sparkSession)
+    // If dataMap is mapped to single main table, then inherit table properties from main table,
+    // else, will use default table properties. If DMProperties contains table properties, then
+    // table properties of datamap table will be updated
+    if (parentTablesList.size() == 1) {
+      DataMapUtil
+        .inheritTablePropertiesFromMainTable(parentTablesList.get(0),
+          fields,
+          fieldRelationMap,
+          tableProperties)
+    }
+    dmProperties.foreach(t => tableProperties.put(t._1, t._2))
     // TODO Use a proper DB
     val tableIdentifier =
     TableIdentifier(dataMapSchema.getDataMapName + "_table",
@@ -129,7 +146,28 @@ object MVHelper {
     CarbonCreateTableCommand(TableNewProcessor(tableModel),
       tableModel.ifNotExistsSet, Some(tablePath), isVisible = false).run(sparkSession)
 
-    dataMapSchema.setCtasQuery(queryString)
+    // Map list of main table columns mapped to datamap table and add to dataMapSchema
+    val mainTableToColumnsMap = new java.util.HashMap[String, util.Set[String]]()
+    val mainTableFieldIterator = fieldRelationMap.values.asJava.iterator()
+    while (mainTableFieldIterator.hasNext) {
+      val value = mainTableFieldIterator.next()
+      value.columnTableRelationList.foreach {
+        columnTableRelation =>
+          columnTableRelation.foreach {
+            mainTable =>
+              if (null == mainTableToColumnsMap.get(mainTable.parentTableName)) {
+                val columns = new util.HashSet[String]()
+                columns.add(mainTable.parentColumnName)
+                mainTableToColumnsMap.put(mainTable.parentTableName, columns)
+              } else {
+                mainTableToColumnsMap.get(mainTable.parentTableName)
+                  .add(mainTable.parentColumnName)
+              }
+          }
+      }
+    }
+    dataMapSchema.setMainTableColumnList(mainTableToColumnsMap)
+    dataMapSchema.setCtasQuery(updatedQueryWithDb)
     dataMapSchema
       .setRelationIdentifier(new RelationIdentifier(tableIdentifier.database.get,
         tableIdentifier.table,
@@ -143,14 +181,44 @@ object MVHelper {
     dataMapSchema.getRelationIdentifier.setTablePath(tablePath)
     dataMapSchema.setParentTables(new util.ArrayList[RelationIdentifier](parentIdents.asJava))
     dataMapSchema.getProperties.put("full_refresh", fullRebuild.toString)
-    DataMapStoreManager.getInstance().saveDataMapSchema(dataMapSchema)
-    if (dataMapSchema.isLazy) {
-      DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName)
+    try {
+      DataMapStoreManager.getInstance().saveDataMapSchema(dataMapSchema)
+    } catch {
+      case ex: Exception =>
+        val dropTableCommand = CarbonDropTableCommand(true,
+          new Some[String](dataMapSchema.getRelationIdentifier.getDatabaseName),
+          dataMapSchema.getRelationIdentifier.getTableName,
+          true)
+        dropTableCommand.run(sparkSession)
+        throw ex
+    }
+  }
+
+  private def isValidSelect(isValidExp: Boolean,
+      s: Select): Boolean = {
+    // Make sure all predicates are present in projections.
+    var predicateList: Seq[AttributeReference] = Seq.empty
+    s.predicateList.map { f =>
+      f.children.collect {
+        case p: AttributeReference =>
+          predicateList = predicateList.+:(p)
+      }
+    }
+    if (predicateList.nonEmpty) {
+      predicateList.forall { p =>
+        s.outputList.exists {
+          case a: Alias =>
+            a.semanticEquals(p) || a.child.semanticEquals(p)
+          case other => other.semanticEquals(p)
+        }
+      }
+    } else {
+      isValidExp
     }
   }
 
   private def validateMVQuery(sparkSession: SparkSession,
-      logicalPlan: LogicalPlan): Unit = {
+      logicalPlan: LogicalPlan): String = {
     val dataMapProvider = DataMapManager.get().getDataMapProvider(null,
       new DataMapSchema("", DataMapClassProvider.MV.getShortName), sparkSession)
     var catalog = DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider,
@@ -169,17 +237,24 @@ object MVHelper {
     val isValid = modularPlan match {
       case g: GroupBy =>
         // Make sure all predicates are present in projections.
-        g.predicateList.forall{p =>
+        val isValidExp = g.predicateList.forall{p =>
           g.outputList.exists{
             case a: Alias =>
               a.semanticEquals(p) || a.child.semanticEquals(p)
             case other => other.semanticEquals(p)
           }
         }
+        g.child match {
+          case s: Select =>
+            isValidSelect(isValidExp, s)
+        }
+      case s: Select =>
+        isValidSelect(true, s)
       case _ => true
     }
     if (!isValid) {
-      throw new UnsupportedOperationException("Group by columns must be present in project columns")
+      throw new UnsupportedOperationException(
+        "Group by/Filter columns must be present in project columns")
     }
     if (catalog.isMVWithSameQueryPresent(logicalPlan)) {
       throw new UnsupportedOperationException("MV with same query present")
@@ -196,6 +271,7 @@ object MVHelper {
     if (!expressionValid) {
       throw new UnsupportedOperationException("MV doesn't support Coalesce")
     }
+    modularPlan.asCompactSQL
   }
 
   def updateColumnName(attr: Attribute): String = {
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
new file mode 100644
index 0000000..6852695
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.datamap
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, SparkSession}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.command.{ColumnTableRelation, DataMapField, Field}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.DataType
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.spark.util.CommonUtil
+
+/**
+ * Utility class for keeping all the utility method for mv datamap
+ */
+object MVUtil {
+
+  /**
+   * Below method will be used to validate and get the required fields from select plan
+   */
+  def getFieldsAndDataMapFieldsFromPlan(plan: LogicalPlan,
+      selectStmt: String,
+      sparkSession: SparkSession): scala.collection.mutable.LinkedHashMap[Field, DataMapField] = {
+    plan match {
+      case Project(projectList, child: Sort) =>
+        getFieldsFromProject(projectList, plan, child)
+      case Project(projectList, _) =>
+        getFieldsFromProject(projectList, plan)
+      case Aggregate(groupByExp, aggExp, _) =>
+        getFieldsFromAggregate(groupByExp, aggExp, plan)
+    }
+  }
+
+  def getFieldsFromProject(projectList: Seq[NamedExpression],
+      plan: LogicalPlan, sort: LogicalPlan): mutable.LinkedHashMap[Field, DataMapField] = {
+    var fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField]
+    sort.transformDown {
+      case agg@Aggregate(groupByExp, aggExp, _) =>
+        fieldToDataMapFieldMap ++== getFieldsFromAggregate(groupByExp, aggExp, plan)
+        agg
+    }
+    fieldToDataMapFieldMap ++== getFieldsFromProject(projectList, plan)
+    fieldToDataMapFieldMap
+  }
+
+  def getFieldsFromProject(projectList: Seq[NamedExpression],
+      plan: LogicalPlan): mutable.LinkedHashMap[Field, DataMapField] = {
+    var fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField]
+    val logicalRelation =
+      plan.collect {
+        case lr: LogicalRelation =>
+          lr
+      }
+    projectList.map {
+      case attr: AttributeReference =>
+        val carbonTable = getCarbonTable(logicalRelation, attr)
+        if (null != carbonTable) {
+          val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new ArrayBuffer[ColumnTableRelation]()
+          val relation = getColumnRelation(attr.name,
+            carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
+            carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
+            carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
+            carbonTable)
+          if (null != relation) {
+            arrayBuffer += relation
+          }
+          fieldToDataMapFieldMap +=
+          getFieldToDataMapFields(attr.name,
+            attr.dataType,
+            attr.qualifier,
+            "",
+            arrayBuffer,
+            carbonTable.getTableName)
+        }
+      case Alias(attr: AttributeReference, name) =>
+        val carbonTable = getCarbonTable(logicalRelation, attr)
+        if (null != carbonTable) {
+          val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new ArrayBuffer[ColumnTableRelation]()
+          val relation = getColumnRelation(attr.name,
+            carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
+            carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
+            carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
+            carbonTable)
+          if (null != relation) {
+            arrayBuffer += relation
+          }
+          fieldToDataMapFieldMap +=
+          getFieldToDataMapFields(name, attr.dataType, attr.qualifier, "", arrayBuffer, "")
+        }
+    }
+    fieldToDataMapFieldMap
+  }
+
+  def getFieldsFromAggregate(groupByExp: Seq[Expression],
+      aggExp: Seq[NamedExpression],
+      plan: LogicalPlan): mutable.LinkedHashMap[Field, DataMapField] = {
+    var fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField]
+    val logicalRelation =
+      plan.collect {
+        case lr: LogicalRelation =>
+          lr
+      }
+    aggExp.map { agg =>
+      var aggregateType: String = ""
+      val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new ArrayBuffer[ColumnTableRelation]()
+      agg.collect {
+        case Alias(attr: AggregateExpression, name) =>
+          if (attr.aggregateFunction.isInstanceOf[Count]) {
+            fieldToDataMapFieldMap +=
+            getFieldToDataMapFields(name,
+              attr.aggregateFunction.dataType,
+              None,
+              attr.aggregateFunction.nodeName,
+              arrayBuffer,
+              "")
+          } else {
+            aggregateType = attr.aggregateFunction.nodeName
+          }
+        case Alias(_, name) =>
+          // In case of arithmetic expressions like sum(a)+sum(b)
+          aggregateType = "arithmetic"
+      }
+      agg.collect {
+        case attr: AttributeReference =>
+          val carbonTable: CarbonTable = getCarbonTable(logicalRelation, attr)
+          if (null != carbonTable) {
+            val relation = getColumnRelation(attr.name,
+              carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
+              carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
+              carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
+              carbonTable)
+            if (null != relation) {
+              arrayBuffer += relation
+            }
+            if (aggregateType.isEmpty && arrayBuffer.nonEmpty) {
+              val tableName = carbonTable.getTableName
+              fieldToDataMapFieldMap +=
+              getFieldToDataMapFields(agg.name,
+                agg.dataType,
+                attr.qualifier,
+                aggregateType,
+                arrayBuffer,
+                tableName)
+            }
+          }
+      }
+      if (!aggregateType.isEmpty && arrayBuffer.nonEmpty) {
+        fieldToDataMapFieldMap +=
+        getFieldToDataMapFields(agg.name,
+          agg.dataType,
+          agg.qualifier,
+          aggregateType,
+          arrayBuffer,
+          "")
+      }
+    }
+    groupByExp map { grp =>
+      grp.collect {
+        case attr: AttributeReference =>
+          val carbonTable: CarbonTable = getCarbonTable(logicalRelation, attr)
+          if (null != carbonTable) {
+            val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new
+                ArrayBuffer[ColumnTableRelation]()
+            arrayBuffer += getColumnRelation(attr.name,
+              carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
+              carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
+              carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
+              carbonTable)
+            fieldToDataMapFieldMap +=
+            getFieldToDataMapFields(attr.name,
+              attr.dataType,
+              attr.qualifier,
+              "",
+              arrayBuffer,
+              carbonTable.getTableName)
+          }
+      }
+    }
+    fieldToDataMapFieldMap
+  }
+
+  /**
+   * Below method will be used to get the column relation with the parent column
+   */
+  def getColumnRelation(parentColumnName: String,
+      parentTableId: String,
+      parentTableName: String,
+      parentDatabaseName: String,
+      carbonTable: CarbonTable): ColumnTableRelation = {
+    val parentColumn = carbonTable.getColumnByName(parentTableName, parentColumnName)
+    var columnTableRelation: ColumnTableRelation = null
+    if (null != parentColumn) {
+      val parentColumnId = parentColumn.getColumnId
+      columnTableRelation = ColumnTableRelation(parentColumnName = parentColumnName,
+        parentColumnId = parentColumnId,
+        parentTableName = parentTableName,
+        parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
+      columnTableRelation
+    } else {
+      columnTableRelation
+    }
+  }
+
+  /**
+   * This method is used to get carbon table for corresponding attribute reference
+   * from logical relation
+   */
+  private def getCarbonTable(logicalRelation: Seq[LogicalRelation],
+      attr: AttributeReference) = {
+    val relations = logicalRelation
+      .filter(lr => lr.output
+        .exists(attrRef => attrRef.name.equalsIgnoreCase(attr.name) &&
+                           attrRef.exprId.equals(attr.exprId)))
+    if (relations.nonEmpty) {
+      relations
+        .head.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation
+        .metaData.carbonTable
+    } else {
+      null
+    }
+  }
+
+  /**
+   * Below method will be used to get the fields object for mv table
+   */
+  private def getFieldToDataMapFields(name: String,
+      dataType: DataType,
+      qualifier: Option[String],
+      aggregateType: String,
+      columnTableRelationList: ArrayBuffer[ColumnTableRelation],
+      parenTableName: String) = {
+    var actualColumnName =
+      name.replace("(", "_")
+        .replace(")", "")
+        .replace(" ", "_")
+        .replace("=", "")
+        .replace(",", "")
+    if (qualifier.isDefined) {
+      actualColumnName = qualifier.map(qualifier => qualifier + "_" + name)
+        .getOrElse(actualColumnName)
+    }
+    if (qualifier.isEmpty) {
+      if (aggregateType.isEmpty && !parenTableName.isEmpty) {
+        actualColumnName = parenTableName + "_" + actualColumnName
+      }
+    }
+    val rawSchema = '`' + actualColumnName + '`' + ' ' + dataType.typeName
+    val dataMapField = DataMapField(aggregateType, Some(columnTableRelationList))
+    if (dataType.typeName.startsWith("decimal")) {
+      val (precision, scale) = CommonUtil.getScaleAndPrecision(dataType.catalogString)
+      (Field(column = actualColumnName,
+        dataType = Some(dataType.typeName),
+        name = Some(actualColumnName),
+        children = None,
+        precision = precision,
+        scale = scale,
+        rawSchema = rawSchema), dataMapField)
+    } else {
+      (Field(column = actualColumnName,
+        dataType = Some(dataType.typeName),
+        name = Some(actualColumnName),
+        children = None,
+        rawSchema = rawSchema), dataMapField)
+    }
+  }
+}
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCountAndCaseTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCountAndCaseTestCase.scala
index 567d6a9..af4afb6 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCountAndCaseTestCase.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCountAndCaseTestCase.scala
@@ -47,8 +47,6 @@ class MVCountAndCaseTestCase  extends QueryTest with BeforeAndAfterAll{
            | FROM data_table
            | GROUP BY STARTTIME,LAYER4ID""".stripMargin)
 
-    sql("rebuild datamap data_table_mv")
-
     var frame = sql(s"""SELECT  MT.`3600` AS `3600`,
                        | MT.`2250410101` AS `2250410101`,
                        | count(1) over() as countNum,
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
index 5016bbe..4f5423e 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -180,7 +180,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test create datamap with simple and same projection with datamap filter on non projection column and extra column filter") {
-    sql("create datamap datamap9 using 'mv' as select empname, designation from fact_table1 where deptname='cloud'")
+    sql("create datamap datamap9 using 'mv' as select empname, designation,deptname  from fact_table1 where deptname='cloud'")
     val frame = sql("select empname,designation from fact_table1 where deptname='cloud'")
     val analyzed = frame.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap9"))
@@ -189,7 +189,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test create datamap with simple and same projection with datamap filter on non projection column and no column filter") {
-    sql("create datamap datamap10 using 'mv' as select empname, designation from fact_table1 where deptname='cloud'")
+    sql("create datamap datamap10 using 'mv' as select empname, designation,deptname from fact_table1 where deptname='cloud'")
     val frame = sql("select empname,designation from fact_table1")
     val analyzed = frame.queryExecution.analyzed
     assert(!verifyMVDataMap(analyzed, "datamap10"))
@@ -198,7 +198,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test create datamap with simple and same projection with datamap filter on non projection column and different column filter") {
-    sql("create datamap datamap11 using 'mv' as select empname, designation from fact_table1 where deptname='cloud'")
+    sql("create datamap datamap11 using 'mv' as select empname, designation,deptname from fact_table1 where deptname='cloud'")
     val frame = sql("select empname,designation from fact_table1 where designation='SA'")
     val analyzed = frame.queryExecution.analyzed
     assert(!verifyMVDataMap(analyzed, "datamap11"))
@@ -327,7 +327,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("test create datamap with simple join and filter on query") {
     sql("drop datamap if exists datamap22")
-    sql("create datamap datamap22 using 'mv' as select t1.empname, t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname)")
+    sql("create datamap datamap22 using 'mv' as select t1.empname, t2.designation,t2.empname from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname)")
     val frame = sql(
       "select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = " +
       "t2.empname and t1.empname='shivani'")
@@ -341,7 +341,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("test create datamap with simple join and filter on query and datamap") {
     sql("drop datamap if exists datamap23")
-    sql("create datamap datamap23 using 'mv' as select t1.empname, t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) where t1.empname='shivani'")
+    sql("create datamap datamap23 using 'mv' as select t1.empname, t2.designation, t2.empname from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) where t1.empname='shivani'")
     val frame = sql(
       "select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = " +
       "t2.empname and t1.empname='shivani'")
@@ -354,7 +354,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("test create datamap with simple join and filter on datamap and no filter on query") {
     sql("drop datamap if exists datamap24")
-    sql("create datamap datamap24 using 'mv' as select t1.empname, t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) where t1.empname='shivani'")
+    sql("create datamap datamap24 using 'mv' as select t1.empname, t2.designation, t2.empname from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) where t1.empname='shivani'")
     val frame = sql(
       "select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = t2.empname")
     val analyzed = frame.queryExecution.analyzed
@@ -365,7 +365,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("test create datamap with multiple join") {
     sql("drop datamap if exists datamap25")
-    sql("create datamap datamap25 using 'mv' as select t1.empname as c1, t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) inner join fact_table3 t3  on (t1.empname=t3.empname)")
+    sql("create datamap datamap25 using 'mv' as select t1.empname as c1, t2.designation, t2.empname, t3.empname from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) inner join fact_table3 t3  on (t1.empname=t3.empname)")
     val frame = sql(
       "select t1.empname as c1, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = t2.empname")
     val analyzed = frame.queryExecution.analyzed
@@ -379,7 +379,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   }
 
   ignore("test create datamap with simple join on datamap and multi join on query") {
-    sql("create datamap datamap26 using 'mv' as select t1.empname, t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname)")
+    sql("create datamap datamap26 using 'mv' as select t1.empname, t2.designation, t2.empname from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname)")
     val frame = sql(
       "select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2,fact_table3 " +
       "t3  where t1.empname = t2.empname and t1.empname=t3.empname")
@@ -391,7 +391,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test create datamap with join with group by") {
-    sql("create datamap datamap27 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 inner join fact_table2 t2  on (t1.empname = t2.empname) group by t1.empname, t2.designation")
+    sql("create datamap datamap27 using 'mv' as select  t1.empname , t2.designation, sum(t1.utilization), sum(t2.empname) from fact_table1 t1 inner join fact_table2 t2  on (t1.empname = t2.empname) group by t1.empname, t2.designation")
     val frame = sql(
       "select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2  " +
       "where t1.empname = t2.empname group by t1.empname, t2.designation")
@@ -404,7 +404,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("test create datamap with join with group by and sub projection") {
     sql("drop datamap if exists datamap28")
-    sql("create datamap datamap28 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 inner join fact_table2 t2  on (t1.empname = t2.empname) group by t1.empname, t2.designation")
+    sql("create datamap datamap28 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization),sum(t2.empname) from fact_table1 t1 inner join fact_table2 t2  on (t1.empname = t2.empname) group by t1.empname, t2.designation")
     val frame = sql(
       "select t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2  where " +
       "t1.empname = t2.empname group by t2.designation")
@@ -417,7 +417,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("test create datamap with join with group by and sub projection with filter") {
     sql("drop datamap if exists datamap29")
-    sql("create datamap datamap29 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 inner join fact_table2 t2  on (t1.empname = t2.empname) group by t1.empname, t2.designation")
+    sql("create datamap datamap29 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization),sum(t2.empname) from fact_table1 t1 inner join fact_table2 t2  on (t1.empname = t2.empname) group by t1.empname, t2.designation")
     val frame = sql(
       "select t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2  where " +
       "t1.empname = t2.empname and t1.empname='shivani' group by t2.designation")
@@ -430,7 +430,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
   ignore("test create datamap with join with group by with filter") {
     sql("drop datamap if exists datamap30")
-    sql("create datamap datamap30 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation")
+    sql("create datamap datamap30 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization),sum(t2.empname) from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation")
     val frame = sql(
       "select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2  " +
       "where t1.empname = t2.empname and t2.designation='SA' group by t1.empname, t2.designation")
@@ -612,7 +612,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with left join on query and equi join on mv with group by with filter") {
     sql("drop datamap if exists datamap45")
 
-    sql("create datamap datamap45 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 join fact_table2 t2  on t1.empname = t2.empname group by t1.empname, t2.designation")
+    sql("create datamap datamap45 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization),sum(t2.empname) from fact_table1 t1 join fact_table2 t2  on t1.empname = t2.empname group by t1.empname, t2.designation")
     // During spark optimizer it converts the left outer join queries with equi join if any filter present on right side table
     val frame = sql(
       "select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2  " +
@@ -649,7 +649,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   }
 
   test("jira carbondata-2528-2") {
-
     sql("drop datamap if exists MV_order")
     sql("drop datamap if exists MV_desc_order")
     sql("create datamap MV_order using 'mv' as select empname,sum(salary)+sum(utilization) as total from fact_table1 group by empname")
@@ -910,7 +909,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     
     val exception_tb_mv2: Exception = intercept[Exception] {
       sql("create datamap dm_stream_test2 using 'mv' as select t1.empname as c1, t2.designation, " +
-          "t2.empname as c2 from (fact_table1 t1 inner join fact_streaming_table2 t2  " +
+          "t2.empname as c2,t3.empname from (fact_table1 t1 inner join fact_streaming_table2 t2  " +
           "on (t1.empname = t2.empname)) inner join fact_table_parquet t3 " +
           "on (t1.empname = t3.empname)")
     }
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
index bbd7b4c..2e64055 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
@@ -182,7 +182,6 @@ class MVIncrementalLoadingTestcase extends QueryTest with BeforeAndAfterAll {
     sql(s"rebuild datamap datamap1")
     loadDataToFactTable("test_table")
     sql(s"rebuild datamap datamap1")
-    checkExistence(sql("show segments for table datamap1_table"), false, "0.1")
     sql("alter datamap datamap1 compact 'major'")
     val dataMapTable = CarbonMetadata.getInstance().getCarbonTable(
       CarbonCommonConstants.DATABASE_DEFAULT_NAME,
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVMultiJoinTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVMultiJoinTestCase.scala
index bfd621d..4e3eb10 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVMultiJoinTestCase.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVMultiJoinTestCase.scala
@@ -48,8 +48,9 @@ class MVMultiJoinTestCase extends QueryTest with BeforeAndAfterAll {
          |inner join areas as c on c.pid=p.aid
          |where p.title = 'hebei'
        """.stripMargin
-    sql("create datamap table_mv using 'mv' as " + mvSQL)
-    sql("rebuild datamap table_mv")
+    sql("create datamap table_mv using 'mv' as " +
+        "select p.title,c.title,c.pid,p.aid from areas as p inner join areas as c on " +
+        "c.pid=p.aid where p.title = 'hebei'")
     val frame = sql(mvSQL)
     assert(verifyMVDataMap(frame.queryExecution.analyzed, "table_mv"))
     checkAnswer(frame, Seq(Row("hebei","shijiazhuang"), Row("hebei","handan")))
@@ -70,8 +71,7 @@ class MVMultiJoinTestCase extends QueryTest with BeforeAndAfterAll {
          | left join dim_table dim_other on sdr.name = dim_other.name
          | group by sdr.name,dim.age,dim_other.height
        """.stripMargin
-    sql("create datamap table_mv using 'mv' as " + mvSQL)
-    sql("rebuild datamap table_mv")
+    sql("create datamap table_mv using 'mv' as " + "select sdr.name,sum(sdr.score),dim.age,dim_other.height,count(dim.name) as c1, count(dim_other.name) as c2 from sdr_table sdr left join dim_table dim on sdr.name = dim.name left join dim_table dim_other on sdr.name = dim_other.name group by sdr.name,dim.age,dim_other.height")
     val frame = sql(mvSQL)
     assert(verifyMVDataMap(frame.queryExecution.analyzed, "table_mv"))
     checkAnswer(frame, Seq(Row("lily",80,30,160),Row("tom",120,20,170)))
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala
index 5788a23..b5d874a 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala
@@ -110,7 +110,7 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("test create datamap with tpch3 with no filters on mv") {
     sql(s"drop datamap if exists datamap5")
-    sql("create datamap datamap5 using 'mv' as select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority,c_mktsegment,l_shipdate from customer, orders, lineitem where c_custkey = o_custkey and l_orderkey = o_orderkey group by l_orderkey, o_orderdate, o_shippriority,c_mktsegment,l_shipdate")
+    sql("create datamap datamap5 using 'mv' as select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority,c_mktsegment,l_shipdate, c_custkey as c1, o_custkey as c2,o_orderkey as o1  from customer, orders, lineitem where c_custkey = o_custkey and l_orderkey = o_orderkey group by l_orderkey, o_orderdate, o_shippriority,c_mktsegment,l_shipdate,c_custkey,o_custkey, o_orderkey ")
     val df = sql("select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority from customer, orders, lineitem where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate limit 10")
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap5"))
@@ -150,7 +150,7 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("test create datamap with tpch5 with no filters on mv") {
     sql(s"drop datamap if exists datamap8")
-    sql("create datamap datamap8 using 'mv' as select n_name,o_orderdate,r_name, sum(l_extendedprice * (1 - l_discount)) as revenue from customer, orders, lineitem, supplier, nation, region where c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and c_nationkey = s_nationkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey  group by n_name,o_orderdate,r_name")
+    sql("create datamap datamap8 using 'mv' as select n_name,o_orderdate,r_name, sum(l_extendedprice * (1 - l_discount)) as revenue, sum(c_custkey), sum(o_custkey), sum(l_orderkey),sum(o_orderkey), sum(l_suppkey), sum(s_suppkey), sum(c_nationkey), sum(s_nationkey), sum(n_nationkey), sum(n_regionkey), sum(r_regionkey)  from customer, orders, lineitem, supplier, nation, region where c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and c_nationkey = s_nationkey an [...]
     val df = sql("select n_name, sum(l_extendedprice * (1 - l_discount)) as revenue from customer, orders, lineitem, supplier, nation, region where c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and c_nationkey = s_nationkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'ASIA' and o_orderdate >= date('1994-01-01') and o_orderdate < date('1995-01-01') group by n_name order by revenue desc")
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap8"))
@@ -160,7 +160,7 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("test create datamap with tpch6") {
     sql(s"drop datamap if exists datamap9")
-    sql("create datamap datamap9 using 'mv' as select sum(l_extendedprice * l_discount) as revenue from lineitem where l_shipdate >= date('1994-01-01') and l_shipdate < date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24")
+    sql("create datamap datamap9 using 'mv' as select sum(l_extendedprice * l_discount) as revenue, count(l_shipdate), sum(l_discount),sum(l_quantity)  from lineitem where l_shipdate >= date('1994-01-01') and l_shipdate < date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24")
     val df = sql("select sum(l_extendedprice * l_discount) as revenue from lineitem where l_shipdate >= date('1994-01-01') and l_shipdate < date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24")
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap9"))
@@ -182,7 +182,7 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("test create datamap with tpch7 part of query1") {
     sql(s"drop datamap if exists datamap11")
-    sql("create datamap datamap11 using 'mv' as select l_shipdate,n_name , l_extendedprice , l_discount from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey")
+    sql("create datamap datamap11 using 'mv' as select l_shipdate,n_name , l_extendedprice , l_discount, s_suppkey,l_suppkey, o_orderkey,l_orderkey, c_custkey, o_custkey, s_nationkey,  n1.n_nationkey, c_nationkey from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey")
     val df = sql("select year(l_shipdate) as l_year, l_extendedprice * (1 - l_discount) as volume from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey and ( (n1.n_name = 'FRANCE') or (n1.n_name = 'GERMANY') ) and l_shipdate between date('1995-01-01') and date('1996-12-31')")
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap11"))
@@ -192,7 +192,7 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("test create datamap with tpch7 part of query2 (core issue)") {
     sql(s"drop datamap if exists datamap12")
-    sql("create datamap datamap12 using 'mv' as select n1.n_name, l_shipdate, l_extendedprice ,l_discount from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey")
+    sql("create datamap datamap12 using 'mv' as select n1.n_name, l_shipdate, l_extendedprice ,l_discount,s_suppkey, l_suppkey,o_orderkey,l_orderkey, c_custkey,o_custkey,s_nationkey,  n1.n_nationkey,c_nationkey from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey")
     val df = sql("select supp_nation, l_year, sum(volume) as revenue from ( select n1.n_name as supp_nation, year(l_shipdate) as l_year, l_extendedprice * (1 - l_discount) as volume from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey and ( (n1.n_name = 'FRANCE' ) or (n1.n_name = 'GERMANY') ) and l_shipdate between date('1995-01-01') and date('19 [...]
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap12"))
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
new file mode 100644
index 0000000..3978bd1
--- /dev/null
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
@@ -0,0 +1,255 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
+
+/**
+ * Test Class for MV Datamap to verify all scenerios
+ */
+class TestAllOperationsOnMV extends QueryTest with BeforeAndAfterEach {
+
+  override def beforeEach(): Unit = {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) stored by 'carbondata'")
+    sql("insert into table maintable select 'abc',21,2000")
+    sql("drop table IF EXISTS testtable")
+    sql("create table testtable(name string, c_code int, price int) stored by 'carbondata'")
+    sql("insert into table testtable select 'abc',21,2000")
+    sql("drop datamap if exists dm1")
+    sql("create datamap dm1 using 'mv' WITH DEFERRED REBUILD as select name,sum(price) " +
+        "from maintable group by name")
+    sql("rebuild datamap dm1")
+    checkResult()
+  }
+
+  private def checkResult(): Unit = {
+    checkAnswer(sql("select name,sum(price) from maintable group by name"),
+      sql("select name,sum(price) from maintable group by name"))
+  }
+
+  override def afterEach(): Unit = {
+    sql("drop table IF EXISTS maintable")
+    sql("drop table IF EXISTS testtable")
+    sql("drop table if exists par_table")
+  }
+
+  test("test alter add column on maintable") {
+    sql("alter table maintable add columns(d int)")
+    sql("insert into table maintable select 'abc',21,2000,30")
+    sql("rebuild datamap dm1")
+    checkResult()
+  }
+
+  test("test alter add column on datamaptable") {
+    intercept[ProcessMetaDataException] {
+      sql("alter table dm1_table add columns(d int)")
+    }.getMessage.contains("Cannot add columns in a DataMap table default.dm1_table")
+  }
+
+  test("test drop column on maintable") {
+    // check drop column not present in datamap table
+    sql("alter table maintable drop columns(c_code)")
+    checkResult()
+    // check drop column present in datamap table
+    intercept[ProcessMetaDataException] {
+      sql("alter table maintable drop columns(name)")
+    }.getMessage.contains("Column name cannot be dropped because it exists in mv datamap: dm1")
+  }
+
+  test("test alter drop column on datamaptable") {
+    intercept[ProcessMetaDataException] {
+      sql("alter table dm1_table drop columns(maintable_name)")
+    }.getMessage.contains("Cannot drop columns present in a datamap table default.dm1_table")
+  }
+
+  test("test rename column on maintable") {
+    // check rename column not present in datamap table
+    sql("alter table maintable change c_code d_code int")
+    checkResult()
+    // check rename column present in mv datamap table
+    intercept[ProcessMetaDataException] {
+      sql("alter table maintable change name name1 string")
+    }.getMessage.contains("Column name exists in a MV datamap. Drop MV datamap to continue")
+  }
+
+  test("test alter rename column on datamaptable") {
+    intercept[ProcessMetaDataException] {
+      sql("alter table dm1_table change sum_price sum_cost int")
+    }.getMessage.contains("Cannot change data type or rename column for columns " +
+                          "present in mv datamap table default.dm1_table")
+  }
+
+  test("test alter rename table") {
+    //check rename maintable
+    intercept[MalformedCarbonCommandException] {
+      sql("alter table maintable rename to maintable_rename")
+    }.getMessage.contains("alter rename is not supported for datamap table or for tables which have child datamap")
+    //check rename datamaptable
+    intercept[MalformedCarbonCommandException] {
+      sql("alter table dm1_table rename to dm11_table")
+    }.getMessage.contains("alter rename is not supported for datamap table or for tables which have child datamap")
+  }
+
+  test("test alter change datatype") {
+    //change datatype for column
+    intercept[ProcessMetaDataException] {
+      sql("alter table maintable change price price bigint")
+    }.getMessage.contains("Column price exists in a MV datamap. Drop MV datamap to continue")
+    //change datatype for column not present in datamap table
+    sql("alter table maintable change c_code c_code bigint")
+    checkResult()
+    //change datatype for column present in datamap table
+    intercept[ProcessMetaDataException] {
+      sql("alter table dm1_table change sum_price sum_price bigint")
+    }.getMessage.contains("Cannot change data type or rename column for columns " +
+                          "present in mv datamap table default.dm1_table")
+  }
+
+  test("test dmproperties") {
+    sql("drop datamap if exists dm1")
+    sql("create datamap dm1 on table maintable using 'mv' WITH DEFERRED REBUILD dmproperties" +
+        "('LOCAL_DICTIONARY_ENABLE'='false') as select name,price from maintable")
+    checkExistence(sql("describe formatted dm1_table"), true, "Local Dictionary Enabled false")
+    sql("drop datamap if exists dm1")
+    sql("create datamap dm1 on table maintable using 'mv' WITH DEFERRED REBUILD dmproperties('TABLE_BLOCKSIZE'='256 MB') " +
+        "as select name,price from maintable")
+    checkExistence(sql("describe formatted dm1_table"), true, "Table Block Size  256 MB")
+  }
+
+  test("test table properties") {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) stored by 'carbondata' tblproperties('LOCAL_DICTIONARY_ENABLE'='false')")
+    sql("drop datamap if exists dm1")
+    sql("create datamap dm1  using 'mv' WITH DEFERRED REBUILD as select name,price from maintable")
+    checkExistence(sql("describe formatted dm1_table"), true, "Local Dictionary Enabled false")
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) stored by 'carbondata' tblproperties('TABLE_BLOCKSIZE'='256 MB')")
+    sql("drop datamap if exists dm1")
+    sql("create datamap dm1  using 'mv' WITH DEFERRED REBUILD as select name,price from maintable")
+    checkExistence(sql("describe formatted dm1_table"), true, "Table Block Size  256 MB")
+  }
+
+  test("test delete segment by id on main table") {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) stored by 'carbondata'")
+    sql("insert into table maintable select 'abc',21,2000")
+    sql("insert into table maintable select 'abc',21,2000")
+    sql("Delete from table maintable where segment.id in (0)")
+    sql("drop datamap if exists dm1")
+    sql("create datamap dm1 using 'mv' WITH DEFERRED REBUILD as select name,sum(price) " +
+        "from maintable group by name")
+    sql("rebuild datamap dm1")
+    intercept[UnsupportedOperationException] {
+      sql("Delete from table maintable where segment.id in (1)")
+    }.getMessage.contains("Delete segment operation is not supported on tables which have mv datamap")
+    intercept[UnsupportedOperationException] {
+      sql("Delete from table dm1_table where segment.id in (0)")
+    }.getMessage.contains("Delete segment operation is not supported on mv table")
+  }
+
+  test("test delete segment by date on main table") {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) stored by 'carbondata'")
+    sql("insert into table maintable select 'abc',21,2000")
+    sql("insert into table maintable select 'abc',21,2000")
+    sql("Delete from table maintable where segment.id in (0)")
+    sql("drop datamap if exists dm1")
+    sql("create datamap dm1  using 'mv' WITH DEFERRED REBUILD as select name,sum(price) " +
+        "from maintable group by name")
+    sql("rebuild datamap dm1")
+    intercept[UnsupportedOperationException] {
+      sql("DELETE FROM TABLE maintable WHERE SEGMENT.STARTTIME BEFORE '2017-06-01 12:05:06'")
+    }.getMessage.contains("Delete segment operation is not supported on tables which have mv datamap")
+    intercept[UnsupportedOperationException] {
+      sql("DELETE FROM TABLE dm1_table WHERE SEGMENT.STARTTIME BEFORE '2017-06-01 12:05:06'")
+    }.getMessage.contains("Delete segment operation is not supported on mv table")
+  }
+
+  test("test partition table with mv") {
+    sql("drop table if exists par_table")
+    sql("CREATE TABLE par_table(id INT, name STRING, age INT) PARTITIONED BY(city string) STORED BY 'carbondata'")
+    sql("insert into par_table values(1,'abc',3,'def')")
+    sql("drop datamap if exists p1")
+    sql("create datamap p1 using 'mv' WITH DEFERRED REBUILD as select city, id from par_table")
+    sql("rebuild datamap p1")
+    intercept[MalformedCarbonCommandException] {
+      sql("alter table par_table drop partition (city='def')")
+    }.getMessage.contains("Drop Partition is not supported for datamap table or for tables which have child datamap")
+    sql("drop datamap if exists p1")
+  }
+
+  test("test direct load to mv datamap table") {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) stored by 'carbondata'")
+    sql("insert into table maintable select 'abc',21,2000")
+    sql("drop datamap if exists dm1")
+    sql("create datamap dm1 using 'mv' WITH DEFERRED REBUILD as select name " +
+        "from maintable")
+    sql("rebuild datamap dm1")
+    intercept[UnsupportedOperationException] {
+      sql("insert into dm1_table select 2")
+    }.getMessage.contains("Cannot insert/load data directly into pre-aggregate/child table")
+    sql("drop table IF EXISTS maintable")
+  }
+
+
+  test("test drop datamap with tablename") {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) stored by 'carbondata'")
+    sql("insert into table maintable select 'abc',21,2000")
+    sql("drop datamap if exists dm1 on table maintable")
+    sql("create datamap dm1 using 'mv' WITH DEFERRED REBUILD as select price " +
+        "from maintable")
+    sql("rebuild datamap dm1")
+    checkAnswer(sql("select price from maintable"), Seq(Row(2000)))
+    checkExistence(sql("show datamap on table maintable"), true, "dm1")
+    sql("drop datamap dm1 on table maintable")
+    checkExistence(sql("show datamap on table maintable"), false, "dm1")
+    sql("drop table IF EXISTS maintable")
+  }
+
+  test("test mv with attribute having qualifier") {
+    sql("drop table if exists maintable")
+    sql("create table maintable (product string) partitioned by (amount int) stored by 'carbondata' ")
+    sql("insert into maintable values('Mobile',2000)")
+    sql("drop datamap if exists p")
+    sql("Create datamap p using 'mv' as Select p.product, p.amount from maintable p where p.product = 'Mobile'")
+    sql("rebuild datamap p")
+    checkAnswer(sql("Select p.product, p.amount from maintable p where p.product = 'Mobile'"), Seq(Row("Mobile", 2000)))
+    sql("drop table IF EXISTS maintable")
+  }
+
+  test("test mv with non-carbon table") {
+    sql("drop table if exists noncarbon")
+    sql("create table noncarbon (product string,amount int)")
+    sql("insert into noncarbon values('Mobile',2000)")
+    sql("drop datamap if exists p")
+    intercept[MalformedCarbonCommandException] {
+      sql("Create datamap p using 'mv' as Select product from noncarbon")
+    }.getMessage.contains("Non-Carbon table does not support creating MV datamap")
+    sql("drop table if exists noncarbon")
+  }
+
+}
+
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/matching/TestSQLBatch.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/matching/TestSQLBatch.scala
index 96f1816..07cd232 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/matching/TestSQLBatch.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/matching/TestSQLBatch.scala
@@ -23,9 +23,9 @@ object TestSQLBatch {
   val sampleTestCases = Seq(
     ("case_1",
      s"""
-        |SELECT i_item_id
+        |SELECT i_item_id, i_item_sk
         |FROM Item
-        |WHERE i_item_sk = 1
+        |WHERE i_item_sk = 2
      """.stripMargin.trim,
      s"""
         |SELECT i_item_id, i_item_sk
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
index 49a62a2..7ba8300 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
@@ -211,7 +211,7 @@ class TestPreAggregateLoad extends SparkQueryTest with BeforeAndAfterAll with Be
         .stripMargin)
     assert(intercept[RuntimeException] {
       sql(s"insert into maintable_preagg_sum values(1, 30)")
-    }.getMessage.equalsIgnoreCase("Cannot insert/load data directly into pre-aggregate table"))
+    }.getMessage.equalsIgnoreCase("Cannot insert/load data directly into pre-aggregate/child table"))
   }
 
   test("test whether all segments are loaded into pre-aggregate table if segments are set on main table") {
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesUnsupportedSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesUnsupportedSuite.scala
index a7e425c..7fa2672 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesUnsupportedSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesUnsupportedSuite.scala
@@ -71,7 +71,7 @@ class TestTimeSeriesUnsupportedSuite extends QueryTest with BeforeAndAfterAll wi
       sql(s"INSERT INTO maintable_agg1_minute VALUES('2016-02-23 09:01:00.0', 60)")
     }
     assert(e.getMessage.equalsIgnoreCase(
-      "Cannot insert/load data directly into pre-aggregate table"))
+      "Cannot insert/load data directly into pre-aggregate/child table"))
 
     // check value after inserting
     checkAnswer(sql("SELECT * FROM maintable_agg1_minute"),
@@ -94,7 +94,7 @@ class TestTimeSeriesUnsupportedSuite extends QueryTest with BeforeAndAfterAll wi
       sql(s"INSERT INTO maintable_agg1_minute VALUES('2016-02-23 09:01:00.0', 60)")
     }
     assert(e.getMessage.equalsIgnoreCase(
-      "Cannot insert/load data directly into pre-aggregate table"))
+      "Cannot insert/load data directly into pre-aggregate/child table"))
   }
 
   test("test timeseries unsupported 3: don't support insert") {
@@ -118,7 +118,7 @@ class TestTimeSeriesUnsupportedSuite extends QueryTest with BeforeAndAfterAll wi
       sql(s"INSERT INTO maintable_agg1_minute VALUES('2016-02-23 09:01:00.0', 'hello', 60)")
     }
     assert(e.getMessage.equalsIgnoreCase(
-      "Cannot insert/load data directly into pre-aggregate table"))
+      "Cannot insert/load data directly into pre-aggregate/child table"))
   }
 
   test("test timeseries unsupported 4: don't support load") {
@@ -147,7 +147,7 @@ class TestTimeSeriesUnsupportedSuite extends QueryTest with BeforeAndAfterAll wi
          """.stripMargin)
     }
     assert(e.getMessage.equalsIgnoreCase(
-      "Cannot insert/load data directly into pre-aggregate table"))
+      "Cannot insert/load data directly into pre-aggregate/child table"))
   }
 
   test("test timeseries unsupported 5: don't support update") {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index d9dec68..3c6b265 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
 import org.apache.spark.sql.events.{MergeBloomIndexEventListener, MergeIndexEventListener}
 import org.apache.spark.sql.execution.command.cache._
-import org.apache.spark.sql.execution.command.mv.{AlterDataMaptableCompactionPostListener, LoadPostDataMapListener}
+import org.apache.spark.sql.execution.command.mv._
 import org.apache.spark.sql.execution.command.preaaggregate._
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
 import org.apache.spark.sql.hive._
@@ -195,6 +195,13 @@ object CarbonEnv {
       .addListener(classOf[DropTableCacheEvent], DropCacheBloomEventListener)
       .addListener(classOf[ShowTableCacheEvent], ShowCachePreAggEventListener)
       .addListener(classOf[ShowTableCacheEvent], ShowCacheBloomEventListener)
+      .addListener(classOf[DeleteSegmentByIdPreEvent], DataMapDeleteSegmentPreListener)
+      .addListener(classOf[DeleteSegmentByDatePreEvent], DataMapDeleteSegmentPreListener)
+      .addListener(classOf[AlterTableDropColumnPreEvent], DataMapDropColumnPreListener)
+      .addListener(classOf[AlterTableColRenameAndDataTypeChangePreEvent],
+        DataMapChangeDataTypeorRenameColumnPreListener)
+      .addListener(classOf[AlterTableAddColumnPreEvent], DataMapAddColumnsPreListener)
+
   }
 
   /**
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 0bafe04..b4e60fb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -122,6 +122,15 @@ case class CarbonDropDataMapCommand(
             dropDataMapFromSystemFolder(sparkSession)
             return Seq.empty
           }
+        } else if (mainTable != null) {
+          // If table is defined and datamap is MV datamap, then drop the datamap
+          val dmSchema = DataMapStoreManager.getInstance().getAllDataMapSchemas.asScala
+            .filter(dataMapSchema => dataMapSchema.getDataMapName.equalsIgnoreCase(dataMapName))
+          if (dmSchema.nonEmpty && (!dmSchema.head.isIndexDataMap &&
+                                    null != dmSchema.head.getRelationIdentifier)) {
+            dropDataMapFromSystemFolder(sparkSession)
+            return Seq.empty
+          }
         }
 
         // drop preaggregate datamap.
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index c142398..a2a9d3c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, Checker, DataCommand}
 import org.apache.spark.sql.optimizer.CarbonFilters
+import org.apache.spark.util.DataMapUtil
 
 import org.apache.carbondata.api.CarbonStore
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -71,7 +72,7 @@ case class CarbonCleanFilesCommand(
             isInternalCleanCall = true)
       }.toList
       cleanFileCommands.foreach(_.processMetadata(sparkSession))
-    } else if (CarbonTable.hasMVDataMap(carbonTable)) {
+    } else if (DataMapUtil.hasMVDataMap(carbonTable)) {
       val allDataMapSchemas = DataMapStoreManager.getInstance
         .getDataMapSchemasOfTable(carbonTable).asScala
         .filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala
index f94b73a..5b1d7e5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala
@@ -18,22 +18,32 @@
 package org.apache.spark.sql.execution.command.mv
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.command.AlterTableModel
 import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand
+import org.apache.spark.util.DataMapUtil
 
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
 import org.apache.carbondata.datamap.DataMapManager
-import org.apache.carbondata.events.{
-  AlterTableCompactionPreStatusUpdateEvent,
-  DeleteFromTablePostEvent, Event, OperationContext, OperationEventListener, UpdateTablePostEvent
-}
+import org.apache.carbondata.events._
 import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePostExecutionEvent
 import org.apache.carbondata.processing.merger.CompactionType
 
+
+object DataMapListeners {
+  def getDataMapTableColumns(dataMapSchema: DataMapSchema,
+      carbonTable: CarbonTable): mutable.Buffer[String] = {
+    val listOfColumns: mutable.Buffer[String] = new mutable.ArrayBuffer[String]()
+    listOfColumns.asJava
+      .addAll(dataMapSchema.getMainTableColumnList.get(carbonTable.getTableName))
+    listOfColumns
+  }
+}
+
 /**
  * Listener to trigger compaction on mv datamap after main table compaction
  */
@@ -139,3 +149,129 @@ object LoadPostDataMapListener extends OperationEventListener {
     }
   }
 }
+
+/**
+ * Listeners to block operations like delete segment on id or by date on tables
+ * having an mv datamap or on mv datamap tables
+ */
+object DataMapDeleteSegmentPreListener extends OperationEventListener {
+  /**
+   * Called on a specified event occurrence
+   *
+   * @param event
+   * @param operationContext
+   */
+  override def onEvent(event: Event, operationContext: OperationContext): Unit = {
+    val carbonTable = event match {
+      case e: DeleteSegmentByIdPreEvent =>
+        e.asInstanceOf[DeleteSegmentByIdPreEvent].carbonTable
+      case e: DeleteSegmentByDatePreEvent =>
+        e.asInstanceOf[DeleteSegmentByDatePreEvent].carbonTable
+    }
+    if (null != carbonTable) {
+      if (DataMapUtil.hasMVDataMap(carbonTable)) {
+        throw new UnsupportedOperationException(
+          "Delete segment operation is not supported on tables having child datamap")
+      }
+      if (carbonTable.isChildTable) {
+        throw new UnsupportedOperationException(
+          "Delete segment operation is not supported on datamap table")
+      }
+    }
+  }
+}
+
+object DataMapAddColumnsPreListener extends OperationEventListener {
+  /**
+   * Called on a specified event occurrence
+   *
+   * @param event
+   * @param operationContext
+   */
+  override def onEvent(event: Event, operationContext: OperationContext): Unit = {
+    val dataTypeChangePreListener = event.asInstanceOf[AlterTableAddColumnPreEvent]
+    val carbonTable = dataTypeChangePreListener.carbonTable
+    if (carbonTable.isChildTable) {
+      throw new UnsupportedOperationException(
+        s"Cannot add columns in a DataMap table " +
+        s"${ carbonTable.getDatabaseName }.${ carbonTable.getTableName }")
+    }
+  }
+}
+
+
+object DataMapDropColumnPreListener extends OperationEventListener {
+  /**
+   * Called on a specified event occurrence
+   *
+   * @param event
+   * @param operationContext
+   */
+  override def onEvent(event: Event, operationContext: OperationContext): Unit = {
+    val dropColumnChangePreListener = event.asInstanceOf[AlterTableDropColumnPreEvent]
+    val carbonTable = dropColumnChangePreListener.carbonTable
+    val alterTableDropColumnModel = dropColumnChangePreListener.alterTableDropColumnModel
+    val columnsToBeDropped = alterTableDropColumnModel.columns
+    if (DataMapUtil.hasMVDataMap(carbonTable)) {
+      val dataMapSchemaList = DataMapStoreManager.getInstance
+        .getDataMapSchemasOfTable(carbonTable).asScala
+      for (dataMapSchema <- dataMapSchemaList) {
+        if (null != dataMapSchema && !dataMapSchema.isIndexDataMap) {
+          val listOfColumns = DataMapListeners.getDataMapTableColumns(dataMapSchema, carbonTable)
+          val columnExistsInChild = listOfColumns.collectFirst {
+            case parentColumnName if columnsToBeDropped.contains(parentColumnName) =>
+              parentColumnName
+          }
+          if (columnExistsInChild.isDefined) {
+            throw new UnsupportedOperationException(
+              s"Column ${ columnExistsInChild.head } cannot be dropped because it exists " +
+              s"in " + dataMapSchema.getProviderName + " datamap:" +
+              s"${ dataMapSchema.getDataMapName }")
+          }
+        }
+      }
+    }
+    if (carbonTable.isChildTable) {
+      throw new UnsupportedOperationException(
+        s"Cannot drop columns present in a datamap table ${ carbonTable.getDatabaseName }." +
+        s"${ carbonTable.getTableName }")
+    }
+  }
+}
+
+object DataMapChangeDataTypeorRenameColumnPreListener
+  extends OperationEventListener {
+  /**
+   * Called on a specified event occurrence
+   *
+   * @param event
+   * @param operationContext
+   */
+  override def onEvent(event: Event, operationContext: OperationContext): Unit = {
+    val colRenameDataTypeChangePreListener = event
+      .asInstanceOf[AlterTableColRenameAndDataTypeChangePreEvent]
+    val carbonTable = colRenameDataTypeChangePreListener.carbonTable
+    val alterTableDataTypeChangeModel = colRenameDataTypeChangePreListener
+      .alterTableDataTypeChangeModel
+    val columnToBeAltered: String = alterTableDataTypeChangeModel.columnName
+    if (DataMapUtil.hasMVDataMap(carbonTable)) {
+      val dataMapSchemaList = DataMapStoreManager.getInstance
+        .getDataMapSchemasOfTable(carbonTable).asScala
+      for (dataMapSchema <- dataMapSchemaList) {
+        if (null != dataMapSchema && !dataMapSchema.isIndexDataMap) {
+          val listOfColumns = DataMapListeners.getDataMapTableColumns(dataMapSchema, carbonTable)
+          if (listOfColumns.contains(columnToBeAltered)) {
+            throw new UnsupportedOperationException(
+              s"Column $columnToBeAltered exists in a " + dataMapSchema.getProviderName +
+              " datamap. Drop " + dataMapSchema.getProviderName + "  datamap to continue")
+          }
+        }
+      }
+    }
+    if (carbonTable.isChildTable) {
+      throw new UnsupportedOperationException(
+        s"Cannot change data type or rename column for columns present in mv datamap table " +
+        s"${ carbonTable.getDatabaseName }.${ carbonTable.getTableName }")
+    }
+  }
+}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
index a9b581c..9119375 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
@@ -26,8 +26,9 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, AtomicRunnableCommand}
-import org.apache.spark.util.AlterTableUtil
+import org.apache.spark.util.{AlterTableUtil, DataMapUtil}
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.indexstore.PartitionSpec
@@ -69,6 +70,10 @@ case class CarbonAlterTableDropHivePartitionCommand(
     table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
     setAuditTable(table)
     setAuditInfo(Map("partition" -> specs.mkString(",")))
+    if (DataMapUtil.hasMVDataMap(table) || table.isChildTable) {
+      throw new MalformedCarbonCommandException(
+        "Drop Partition is not supported for datamap table or for tables which have child datamap")
+    }
     if (table.isHivePartitionTable) {
       var locks = List.empty[ICarbonLock]
       try {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 4a0b492..c4c3539 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 import org.apache.carbondata.common.exceptions.MetadataProcessException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, DataMapUtil, Segment}
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock}
@@ -717,9 +717,9 @@ object LoadPreAggregateTablePreListener extends OperationEventListener {
     val carbonLoadModel = loadEvent.getCarbonLoadModel
     val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val isInternalLoadCall = carbonLoadModel.isAggLoadRequest
-    if (table.isChildDataMap && !isInternalLoadCall) {
+    if ((table.isChildDataMap || table.isChildTable) && !isInternalLoadCall) {
       throw new UnsupportedOperationException(
-        "Cannot insert/load data directly into pre-aggregate table")
+        "Cannot insert/load data directly into pre-aggregate/child table")
     }
   }
 }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
index b729347..86b2d00 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
-import org.apache.spark.util.PartitionUtils
+import org.apache.spark.util.{DataMapUtil, PartitionUtils}
 
 import org.apache.carbondata.common.exceptions.MetadataProcessException
 import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
@@ -90,104 +90,8 @@ case class PreAggregateTableHelper(
       throw new MalformedDataMapCommandException(
         "Parent table name is different in select and create")
     }
-    var neworder = Seq[String]()
-    val parentOrder = parentTable.getSortColumns(parentTable.getTableName).asScala
-    parentOrder.foreach(parentcol =>
-      fields.filter(col => fieldRelationMap(col).aggregateFunction.isEmpty &&
-                           parentcol.equals(fieldRelationMap(col).
-                             columnTableRelationList.get(0).parentColumnName))
-        .map(cols => neworder :+= cols.column))
-    tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, neworder.mkString(","))
-    tableProperties.put("sort_scope", parentTable.getTableInfo.getFactTable.
-      getTableProperties.asScala.getOrElse("sort_scope", CarbonCommonConstants
-      .LOAD_SORT_SCOPE_DEFAULT))
-    tableProperties
-      .put(CarbonCommonConstants.TABLE_BLOCKSIZE, parentTable.getBlockSizeInMB.toString)
-    tableProperties.put(CarbonCommonConstants.FLAT_FOLDER,
-      parentTable.getTableInfo.getFactTable.getTableProperties.asScala.getOrElse(
-        CarbonCommonConstants.FLAT_FOLDER, CarbonCommonConstants.DEFAULT_FLAT_FOLDER))
-
-    // Datamap table name and columns are automatically added prefix with parent table name
-    // in carbon. For convenient, users can type column names same as the ones in select statement
-    // when config dmproperties, and here we update column names with prefix.
-    // If longStringColumn is not present in dm properties then we take long_string_columns from
-    // the parent table.
-    var longStringColumn = tableProperties.get(CarbonCommonConstants.LONG_STRING_COLUMNS)
-    if (longStringColumn.isEmpty) {
-      val longStringColumnInParents = parentTable.getTableInfo.getFactTable.getTableProperties
-        .asScala
-        .getOrElse(CarbonCommonConstants.LONG_STRING_COLUMNS, "").split(",").map(_.trim)
-      val varcharDatamapFields = scala.collection.mutable.ArrayBuffer.empty[String]
-      fieldRelationMap foreach (fields => {
-        val aggFunc = fields._2.aggregateFunction
-        val relationList = fields._2.columnTableRelationList
-        // check if columns present in datamap are long_string_col in parent table. If they are
-        // long_string_columns in parent, make them long_string_columns in datamap
-        if (aggFunc.isEmpty && relationList.size == 1 && longStringColumnInParents
-          .contains(relationList.head.head.parentColumnName)) {
-          varcharDatamapFields += relationList.head.head.parentColumnName
-        }
-      })
-      if (!varcharDatamapFields.isEmpty) {
-        longStringColumn = Option(varcharDatamapFields.mkString(","))
-      }
-    }
-
-    if (longStringColumn != None) {
-      val fieldNames = fields.map(_.column)
-      val newLongStringColumn = longStringColumn.get.split(",").map(_.trim).map{ colName =>
-        val newColName = parentTable.getTableName.toLowerCase() + "_" + colName
-        if (!fieldNames.contains(newColName)) {
-          throw new MalformedDataMapCommandException(
-            CarbonCommonConstants.LONG_STRING_COLUMNS.toUpperCase() + ":" + colName
-              + " does not in datamap")
-        }
-        newColName
-      }
-      tableProperties.put(CarbonCommonConstants.LONG_STRING_COLUMNS,
-        newLongStringColumn.mkString(","))
-    }
-
-    // inherit the local dictionary properties of main parent table
-    tableProperties
-      .put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
-        parentTable.getTableInfo.getFactTable.getTableProperties.asScala
-          .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE, "false"))
-    tableProperties
-      .put(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
-        parentTable.getTableInfo.getFactTable.getTableProperties.asScala
-          .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
-            CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT))
-    val parentDictInclude = parentTable.getTableInfo.getFactTable.getTableProperties.asScala
-      .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE, "").split(",")
-
-    val parentDictExclude = parentTable.getTableInfo.getFactTable.getTableProperties.asScala
-      .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE, "").split(",")
-
-    val newDictInclude =
-      parentDictInclude.flatMap(parentcol =>
-        fields.collect {
-          case col if fieldRelationMap(col).aggregateFunction.isEmpty &&
-                      parentcol.equals(fieldRelationMap(col).
-                        columnTableRelationList.get.head.parentColumnName) =>
-            col.column
-        })
-
-    val newDictExclude = parentDictExclude.flatMap(parentcol =>
-      fields.collect {
-        case col if fieldRelationMap(col).aggregateFunction.isEmpty &&
-                    parentcol.equals(fieldRelationMap(col).
-                      columnTableRelationList.get.head.parentColumnName) =>
-          col.column
-      })
-    if (newDictInclude.nonEmpty) {
-      tableProperties
-        .put(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE, newDictInclude.mkString(","))
-    }
-    if (newDictExclude.nonEmpty) {
-      tableProperties
-        .put(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE, newDictExclude.mkString(","))
-    }
+    DataMapUtil
+      .inheritTablePropertiesFromMainTable(parentTable, fields, fieldRelationMap, tableProperties)
     val tableIdentifier =
       TableIdentifier(parentTable.getTableName + "_" + dataMapName,
         Some(parentTable.getDatabaseName))
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index f41cfc1..6e3e398 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
 import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCommand}
 import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
-import org.apache.spark.util.AlterTableUtil
+import org.apache.spark.util.{AlterTableUtil, DataMapUtil}
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -81,8 +81,9 @@ private[sql] case class CarbonAlterTableRenameCommand(
       throw new MalformedCarbonCommandException("alter rename is not supported for index datamap")
     }
     // if table have create mv datamap, not support table rename
-    if (CarbonTable.hasMVDataMap(oldCarbonTable)) {
-      throw new MalformedCarbonCommandException("alter rename is not supported for mv datamap")
+    if (DataMapUtil.hasMVDataMap(oldCarbonTable) || oldCarbonTable.isChildTable) {
+      throw new MalformedCarbonCommandException(
+        "alter rename is not supported for datamap table or for tables which have child datamap")
     }
 
     var timeStamp = 0L
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 7d449b5..91d7675 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.datasources.{RefreshResource, RefreshTable
 import org.apache.spark.sql.hive.{CarbonRelation, CreateCarbonSourceTableAsSelectCommand}
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 import org.apache.spark.sql.types.StructField
-import org.apache.spark.util.{CarbonReflectionUtils, FileUtils, SparkUtil}
+import org.apache.spark.util.{CarbonReflectionUtils, DataMapUtil, FileUtils, SparkUtil}
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -322,7 +322,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
               throw new MalformedCarbonCommandException(
                 "Streaming property value is incorrect")
             }
-            if (CarbonTable.hasMVDataMap(carbonTable)) {
+            if (DataMapUtil.hasMVDataMap(carbonTable)) {
               throw new MalformedCarbonCommandException(
                 "The table which has MV datamap does not support set streaming property")
             }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index d875fdf..96b6000 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.hive
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable
 
 import org.apache.spark.SPARK_VERSION
 import org.apache.spark.sql._
@@ -30,12 +29,11 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.execution.command.mutation.CarbonProjectForDeleteCommand
 import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FileFormat, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat}
-import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CarbonException
-import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil}
+import org.apache.spark.util.{CarbonReflectionUtils, DataMapUtil, SparkUtil}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.{DataMapStoreManager, DataMapUtil}
+import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonUtil
@@ -71,7 +69,7 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
             "Update operation is not supported for pre-aggregate table")
         }
         val indexSchemas = DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
-        if (CarbonTable.hasMVDataMap(carbonTable)) {
+        if (DataMapUtil.hasMVDataMap(carbonTable)) {
           val allDataMapSchemas = DataMapStoreManager.getInstance
             .getDataMapSchemasOfTable(carbonTable).asScala
             .filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
@@ -214,7 +212,7 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
               "Delete operation is not supported for pre-aggregate table")
           }
           val indexSchemas = DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
-          if (CarbonTable.hasMVDataMap(carbonTable)) {
+          if (DataMapUtil.hasMVDataMap(carbonTable)) {
             val allDataMapSchemas = DataMapStoreManager.getInstance
               .getDataMapSchemasOfTable(carbonTable).asScala
               .filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DataMapUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DataMapUtil.scala
new file mode 100644
index 0000000..24e28ad
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DataMapUtil.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.IOException
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.execution.command.{DataMapField, Field}
+
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.MV
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
+
+/**
+ * Utility class for keeping all the utility methods common for pre-aggregate and mv datamap
+ */
+object DataMapUtil {
+
+  def inheritTablePropertiesFromMainTable(parentTable: CarbonTable,
+      fields: Seq[Field],
+      fieldRelationMap: scala.collection.mutable.LinkedHashMap[Field, DataMapField],
+      tableProperties: mutable.Map[String, String]): Unit = {
+    var neworder = Seq[String]()
+    val parentOrder = parentTable.getSortColumns(parentTable.getTableName).asScala
+    parentOrder.foreach(parentcol =>
+      fields.filter(col => fieldRelationMap(col).aggregateFunction.isEmpty &&
+                           fieldRelationMap(col).columnTableRelationList.size == 1 &&
+                           parentcol.equals(fieldRelationMap(col).
+                             columnTableRelationList.get(0).parentColumnName))
+        .map(cols => neworder :+= cols.column))
+    tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, neworder.mkString(","))
+    tableProperties.put("sort_scope", parentTable.getTableInfo.getFactTable.
+      getTableProperties.asScala.getOrElse("sort_scope", CarbonCommonConstants
+      .LOAD_SORT_SCOPE_DEFAULT))
+    tableProperties
+      .put(CarbonCommonConstants.TABLE_BLOCKSIZE, parentTable.getBlockSizeInMB.toString)
+    tableProperties.put(CarbonCommonConstants.FLAT_FOLDER,
+      parentTable.getTableInfo.getFactTable.getTableProperties.asScala.getOrElse(
+        CarbonCommonConstants.FLAT_FOLDER, CarbonCommonConstants.DEFAULT_FLAT_FOLDER))
+
+    // Datamap table name and columns are automatically added prefix with parent table name
+    // in carbon. For convenient, users can type column names same as the ones in select statement
+    // when config dmproperties, and here we update column names with prefix.
+    // If longStringColumn is not present in dm properties then we take long_string_columns from
+    // the parent table.
+    var longStringColumn = tableProperties.get(CarbonCommonConstants.LONG_STRING_COLUMNS)
+    if (longStringColumn.isEmpty) {
+      val longStringColumnInParents = parentTable.getTableInfo.getFactTable.getTableProperties
+        .asScala
+        .getOrElse(CarbonCommonConstants.LONG_STRING_COLUMNS, "").split(",").map(_.trim)
+      val varcharDatamapFields = scala.collection.mutable.ArrayBuffer.empty[String]
+      fieldRelationMap foreach (fields => {
+        val aggFunc = fields._2.aggregateFunction
+        val relationList = fields._2.columnTableRelationList
+        // check if columns present in datamap are long_string_col in parent table. If they are
+        // long_string_columns in parent, make them long_string_columns in datamap
+        if (aggFunc.isEmpty && relationList.size == 1 && longStringColumnInParents
+          .contains(relationList.head.head.parentColumnName)) {
+          varcharDatamapFields += relationList.head.head.parentColumnName
+        }
+      })
+      if (!varcharDatamapFields.isEmpty) {
+        longStringColumn = Option(varcharDatamapFields.mkString(","))
+      }
+    }
+
+    if (longStringColumn != None) {
+      val fieldNames = fields.map(_.column)
+      val newLongStringColumn = longStringColumn.get.split(",").map(_.trim).map { colName =>
+        val newColName = parentTable.getTableName.toLowerCase() + "_" + colName
+        if (!fieldNames.contains(newColName)) {
+          throw new MalformedDataMapCommandException(
+            CarbonCommonConstants.LONG_STRING_COLUMNS.toUpperCase() + ":" + colName
+            + " does not in datamap")
+        }
+        newColName
+      }
+      tableProperties.put(CarbonCommonConstants.LONG_STRING_COLUMNS,
+        newLongStringColumn.mkString(","))
+    }
+
+    // inherit the local dictionary properties of main parent table
+    tableProperties
+      .put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
+        parentTable.getTableInfo.getFactTable.getTableProperties.asScala
+          .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE, "false"))
+    tableProperties
+      .put(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
+        parentTable.getTableInfo.getFactTable.getTableProperties.asScala
+          .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
+            CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT))
+    val parentDictInclude = parentTable.getTableInfo.getFactTable.getTableProperties.asScala
+      .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE, "").split(",")
+
+    val parentDictExclude = parentTable.getTableInfo.getFactTable.getTableProperties.asScala
+      .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE, "").split(",")
+
+    val newDictInclude =
+      parentDictInclude.flatMap(parentcol =>
+        fields.collect {
+          case col if fieldRelationMap(col).aggregateFunction.isEmpty &&
+                      fieldRelationMap(col).columnTableRelationList.size == 1 &&
+                      parentcol.equals(fieldRelationMap(col).
+                        columnTableRelationList.get.head.parentColumnName) =>
+            col.column
+        })
+
+    val newDictExclude = parentDictExclude.flatMap(parentcol =>
+      fields.collect {
+        case col if fieldRelationMap(col).aggregateFunction.isEmpty &&
+                    fieldRelationMap(col).columnTableRelationList.size == 2 &&
+                    parentcol.equals(fieldRelationMap(col).
+                      columnTableRelationList.get.head.parentColumnName) =>
+          col.column
+      })
+    if (newDictInclude.nonEmpty) {
+      tableProperties
+        .put(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE, newDictInclude.mkString(","))
+    }
+    if (newDictExclude.nonEmpty) {
+      tableProperties
+        .put(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE, newDictExclude.mkString(","))
+    }
+  }
+
+  /**
+   * Return true if MV datamap present in the specified table
+   */
+  @throws[IOException]
+  def hasMVDataMap(carbonTable: CarbonTable): Boolean = {
+    val dataMapSchemaList = DataMapStoreManager.getInstance.
+      getDataMapSchemasOfTable(carbonTable).asScala
+    dataMapSchemaList.foreach { dataMapSchema =>
+      if (dataMapSchema.getProviderName.equalsIgnoreCase(MV.toString)) {
+        return true
+      }
+    }
+    false
+  }
+}