You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/21 12:20:41 UTC

[carbondata] branch master updated: [CARBONDATA-3338] Support Incremental DataLoad for MV Datamap[with single parent table]

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 595b1a4  [CARBONDATA-3338] Support Incremental DataLoad for MV Datamap[with single parent table]
595b1a4 is described below

commit 595b1a46de30a3daf110f97406f1a2a2ecd82e0a
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Fri Apr 19 12:52:48 2019 +0530

    [CARBONDATA-3338] Support Incremental DataLoad for MV Datamap[with single parent table]
    
    Support Incremental DataLoad for MV Datamap[with single parent table] for both lazy and non-lazy mv datamap
    New DDL for supporting compaction on mv datamap table.
    Please refer https://issues.apache.org/jira/browse/CARBONDATA-3296 for more details
    
    This closes #3179
---
 .../core/constants/CarbonCommonConstants.java      |  11 +
 .../carbondata/core/datamap/DataMapProvider.java   | 227 +++++++-
 .../carbondata/core/datamap/DataMapUtil.java       |  20 +
 .../core/datamap/dev/DataMapSyncStatus.java        |  84 +++
 .../datamap/status/DataMapSegmentStatusUtil.java   |  78 +++
 .../core/datamap/status/DataMapStatusManager.java  |  71 ++-
 .../status/DiskBasedDataMapStatusProvider.java     |   7 +
 .../core/metadata/schema/table/CarbonTable.java    |   6 +
 .../core/statusmanager/LoadMetadataDetails.java    |  13 +
 .../carbondata/mv/datamap/MVAnalyzerRule.scala     |  30 +-
 .../carbondata/mv/datamap/MVDataMapProvider.scala  |  82 ++-
 .../apache/carbondata/mv/datamap/MVHelper.scala    |  26 +-
 .../carbondata/mv/rewrite/MVCreateTestCase.scala   |  67 +--
 .../mv/rewrite/MVIncrementalLoadingTestcase.scala  | 583 +++++++++++++++++++++
 .../carbondata/mv/rewrite/MVSampleTestCase.scala   |   8 -
 .../carbondata/mv/rewrite/MVTPCDSTestCase.scala    |   9 -
 .../carbondata/mv/rewrite/MVTpchTestCase.scala     |  14 -
 .../src/test/resources/products.csv                |   5 +
 .../src/test/resources/sales_data.csv              |   5 +
 .../carbondata/datamap/IndexDataMapProvider.java   |   9 +-
 .../datamap/PreAggregateDataMapProvider.java       |   9 +-
 .../spark/rdd/CarbonDataRDDFactory.scala           |   9 +
 .../scala/org/apache/spark/sql/CarbonEnv.scala     |   8 +-
 .../datamap/CarbonCreateDataMapCommand.scala       |  11 +-
 .../datamap/CarbonDataMapRebuildCommand.scala      |   5 +-
 .../CarbonAlterTableCompactionCommand.scala        |   3 +
 .../management/CarbonCleanFilesCommand.scala       |  14 +
 .../mutation/CarbonProjectForDeleteCommand.scala   |  12 +
 .../mutation/CarbonProjectForUpdateCommand.scala   |  12 +-
 .../execution/command/mv/DataMapListeners.scala    | 141 +++++
 .../spark/sql/hive/CarbonAnalysisRules.scala       |  28 +-
 .../spark/sql/parser/CarbonSpark2SqlParser.scala   |  14 +-
 .../processing/merger/CarbonDataMergerUtil.java    |  24 +-
 .../processing/util/CarbonLoaderUtil.java          |  12 +
 34 files changed, 1520 insertions(+), 137 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 8ed9c1d..9375414 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2163,4 +2163,15 @@ public final class CarbonCommonConstants {
 
   public static final int CARBON_INDEX_SERVER_WORKER_THREADS_DEFAULT =
       500;
+
+  /**
+   * This property will be used to store datamap name
+   */
+  public static final String DATAMAP_NAME = "datamap_name";
+
+  /**
+   * This property will be used to store parentable name's associated with datamap
+   */
+  public static final String PARENT_TABLES = "parent_tables";
+
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
index cc05d31..fe2e7dd 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
@@ -18,13 +18,27 @@
 package org.apache.carbondata.core.datamap;
 
 import java.io.IOException;
+import java.util.*;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
 import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.datamap.status.DataMapSegmentStatusUtil;
+import org.apache.carbondata.core.locks.ICarbonLock;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.log4j.Logger;
 
 /**
  * DataMap is a accelerator for certain type of query. Developer can add new DataMap
@@ -58,6 +72,8 @@ public abstract class DataMapProvider {
   private CarbonTable mainTable;
   private DataMapSchema dataMapSchema;
 
+  private Logger LOGGER = LogServiceFactory.getLogService(this.getClass().getCanonicalName());
+
   public DataMapProvider(CarbonTable mainTable, DataMapSchema dataMapSchema) {
     this.mainTable = mainTable;
     this.dataMapSchema = dataMapSchema;
@@ -106,13 +122,214 @@ public abstract class DataMapProvider {
    * 1. after datamap creation and if `autoRefreshDataMap` is set to true
    * 2. user manually trigger REBUILD DATAMAP command
    */
-  public abstract void rebuild() throws IOException, NoSuchDataMapException;
+  public boolean rebuild() throws IOException, NoSuchDataMapException {
+    if (null == dataMapSchema.getRelationIdentifier()) {
+      return false;
+    }
+    String newLoadName = "";
+    String segmentMap = "";
+    AbsoluteTableIdentifier dataMapTableAbsoluteTableIdentifier = AbsoluteTableIdentifier
+        .from(dataMapSchema.getRelationIdentifier().getTablePath(),
+            dataMapSchema.getRelationIdentifier().getDatabaseName(),
+            dataMapSchema.getRelationIdentifier().getTableName());
+    SegmentStatusManager segmentStatusManager =
+        new SegmentStatusManager(dataMapTableAbsoluteTableIdentifier);
+    Map<String, List<String>> segmentMapping = new HashMap<>();
+    // Acquire table status lock to handle concurrent dataloading
+    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
+    try {
+      if (carbonLock.lockWithRetries()) {
+        LOGGER.info(
+            "Acquired lock for table" + dataMapSchema.getRelationIdentifier().getDatabaseName()
+                + "." + dataMapSchema.getRelationIdentifier().getTableName()
+                + " for table status updation");
+        String dataMapTableMetadataPath =
+            CarbonTablePath.getMetadataPath(dataMapSchema.getRelationIdentifier().getTablePath());
+        LoadMetadataDetails[] loadMetaDataDetails =
+            SegmentStatusManager.readLoadMetadata(dataMapTableMetadataPath);
+        List<LoadMetadataDetails> listOfLoadFolderDetails =
+            new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+        Collections.addAll(listOfLoadFolderDetails, loadMetaDataDetails);
+        if (dataMapSchema.isLazy()) {
+          // check if rebuild to datamap is already in progress and throw exception
+          if (!listOfLoadFolderDetails.isEmpty()) {
+            for (LoadMetadataDetails loadMetaDetail : loadMetaDataDetails) {
+              if ((loadMetaDetail.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS
+                  || loadMetaDetail.getSegmentStatus()
+                  == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) && SegmentStatusManager
+                  .isLoadInProgress(dataMapTableAbsoluteTableIdentifier,
+                      loadMetaDetail.getLoadName())) {
+                throw new RuntimeException("Rebuild to datamap " + dataMapSchema.getDataMapName()
+                    + " is already in progress");
+              }
+            }
+          }
+        }
+        boolean isFullRefresh = false;
+        if (null != dataMapSchema.getProperties().get("full_refresh")) {
+          isFullRefresh = Boolean.valueOf(dataMapSchema.getProperties().get("full_refresh"));
+        }
+        if (!isFullRefresh) {
+          if (!getSpecificSegmentsTobeLoaded(segmentMapping, listOfLoadFolderDetails)) {
+            return false;
+          }
+          segmentMap = new Gson().toJson(segmentMapping);
+        } else {
+          List<RelationIdentifier> relationIdentifiers = dataMapSchema.getParentTables();
+          for (RelationIdentifier relationIdentifier : relationIdentifiers) {
+            List<String> mainTableSegmentList =
+                DataMapUtil.getMainTableValidSegmentList(relationIdentifier);
+            if (mainTableSegmentList.isEmpty()) {
+              return false;
+            }
+            segmentMapping.put(relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT
+                + relationIdentifier.getTableName(), mainTableSegmentList);
+          }
+          segmentMap = new Gson().toJson(segmentMapping);
+        }
+
+        // To handle concurrent dataloading to datamap, create new loadMetaEntry and
+        // set segmentMap to new loadMetaEntry and pass new segmentId with load command
+        LoadMetadataDetails loadMetadataDetail = new LoadMetadataDetails();
+        String segmentId =
+            String.valueOf(SegmentStatusManager.createNewSegmentId(loadMetaDataDetails));
+        loadMetadataDetail.setLoadName(segmentId);
+        loadMetadataDetail.setSegmentStatus(SegmentStatus.INSERT_IN_PROGRESS);
+        loadMetadataDetail.setExtraInfo(segmentMap);
+        listOfLoadFolderDetails.add(loadMetadataDetail);
+        newLoadName = segmentId;
+
+        SegmentStatusManager.writeLoadDetailsIntoFile(CarbonTablePath
+                .getTableStatusFilePath(dataMapSchema.getRelationIdentifier().getTablePath()),
+            listOfLoadFolderDetails
+                .toArray(new LoadMetadataDetails[listOfLoadFolderDetails.size()]));
+      } else {
+        LOGGER.error(
+            "Not able to acquire the lock for Table status updation for table " + dataMapSchema
+                .getRelationIdentifier().getDatabaseName() + "." + dataMapSchema
+                .getRelationIdentifier().getTableName());
+      }
+    } finally {
+      if (carbonLock.unlock()) {
+        LOGGER.info("Table unlocked successfully after table status updation" + dataMapSchema
+            .getRelationIdentifier().getDatabaseName() + "." + dataMapSchema.getRelationIdentifier()
+            .getTableName());
+      } else {
+        LOGGER.error("Unable to unlock Table lock for table" + dataMapSchema.getRelationIdentifier()
+            .getDatabaseName() + "." + dataMapSchema.getRelationIdentifier().getTableName()
+            + " during table status updation");
+      }
+    }
+    return rebuildInternal(newLoadName, segmentMapping);
+  }
 
   /**
-   * Build the datamap incrementally by loading specified segment data
+   * This method will compare mainTable and dataMapTable segment List and loads only newly added
+   * segment from main table to dataMap table.
+   * In case if mainTable is compacted, then based on dataMap to mainTables segmentMapping, dataMap
+   * will be loaded
+   * Eg:
+   * case 1: Consider mainTableSegmentList: {0, 1, 2}, dataMapToMainTable segmentMap:
+   * { 0 -> 0, 1-> 1,2}. If (1, 2) segments of main table are compacted to 1.1 and new segment (3)
+   * is loaded to main table, then mainTableSegmentList will be updated to{0, 1.1, 3}.
+   * In this case, segment (1) of dataMap table will be marked for delete, and new segment
+   * {2 -> 1.1, 3} will be loaded to dataMap table
+   * case 2: Consider mainTableSegmentList: {0, 1, 2, 3}, dataMapToMainTable segmentMap:
+   * { 0 -> 0,1,2, 1-> 3}. If (1, 2) segments of main table are compacted to 1.1 and new segment
+   * (4) is loaded to main table, then mainTableSegmentList will be updated to {0, 1.1, 3, 4}.
+   * In this case, segment (0) of dataMap table will be marked for delete and segment (0) of
+   * main table will be added to validSegmentList which needs to be loaded again. Now, new dataMap
+   * table segment (2) with main table segmentList{2 -> 1.1, 4, 0} will be loaded to dataMap table.
+   * dataMapToMainTable segmentMap will be updated to {1 -> 3, 2 -> 1.1, 4, 0} after rebuild
    */
-  public void incrementalBuild(String[] segmentIds) {
-    throw new UnsupportedOperationException();
+  private boolean getSpecificSegmentsTobeLoaded(Map<String, List<String>> segmentMapping,
+      List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
+    List<RelationIdentifier> relationIdentifiers = dataMapSchema.getParentTables();
+    // invalidDataMapSegmentList holds segment list which needs to be marked for delete
+    HashSet<String> invalidDataMapSegmentList = new HashSet<>();
+    if (listOfLoadFolderDetails.isEmpty()) {
+      // If segment Map is empty, load all valid segments from main tables to dataMap
+      for (RelationIdentifier relationIdentifier : relationIdentifiers) {
+        List<String> mainTableSegmentList =
+            DataMapUtil.getMainTableValidSegmentList(relationIdentifier);
+        // If mainTableSegmentList is empty, no need to trigger load command
+        // TODO: handle in case of multiple tables load to datamap table
+        if (mainTableSegmentList.isEmpty()) {
+          return false;
+        }
+        segmentMapping.put(
+            relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT + relationIdentifier
+                .getTableName(), mainTableSegmentList);
+      }
+    } else {
+      for (RelationIdentifier relationIdentifier : relationIdentifiers) {
+        List<String> dataMapTableSegmentList = new ArrayList<>();
+        for (LoadMetadataDetails loadMetaDetail : listOfLoadFolderDetails) {
+          if (loadMetaDetail.getSegmentStatus() == SegmentStatus.SUCCESS
+              || loadMetaDetail.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS) {
+            Map<String, List<String>> segmentMaps =
+                DataMapSegmentStatusUtil.getSegmentMap(loadMetaDetail.getExtraInfo());
+            dataMapTableSegmentList.addAll(segmentMaps.get(
+                relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT
+                    + relationIdentifier.getTableName()));
+          }
+        }
+        List<String> dataMapSegmentList = new ArrayList<>(dataMapTableSegmentList);
+        // Get all segments for parent relationIdentifier
+        List<String> mainTableSegmentList =
+            DataMapUtil.getMainTableValidSegmentList(relationIdentifier);
+        dataMapTableSegmentList.removeAll(mainTableSegmentList);
+        mainTableSegmentList.removeAll(dataMapSegmentList);
+        if (mainTableSegmentList.isEmpty()) {
+          return false;
+        }
+        if (!dataMapTableSegmentList.isEmpty()) {
+          List<String> invalidMainTableSegmentList = new ArrayList<>();
+          // validMainTableSegmentList holds segment list which needs to be loaded again
+          HashSet<String> validMainTableSegmentList = new HashSet<>();
+
+          // For dataMap segments which are not in main table segment list(if main table
+          // is compacted), iterate over those segments and get dataMap segments which needs to
+          // be marked for delete and main table segments which needs to be loaded again
+          for (String segmentId : dataMapTableSegmentList) {
+            for (LoadMetadataDetails loadMetaDetail : listOfLoadFolderDetails) {
+              if (loadMetaDetail.getSegmentStatus() == SegmentStatus.SUCCESS
+                  || loadMetaDetail.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS) {
+                Map<String, List<String>> segmentMaps =
+                    DataMapSegmentStatusUtil.getSegmentMap(loadMetaDetail.getExtraInfo());
+                List<String> segmentIds = segmentMaps.get(
+                    relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT
+                        + relationIdentifier.getTableName());
+                if (segmentIds.contains(segmentId)) {
+                  segmentIds.remove(segmentId);
+                  validMainTableSegmentList.addAll(segmentIds);
+                  invalidMainTableSegmentList.add(segmentId);
+                  invalidDataMapSegmentList.add(loadMetaDetail.getLoadName());
+                }
+              }
+            }
+          }
+          // remove invalid segment from validMainTableSegmentList if present
+          validMainTableSegmentList.removeAll(invalidMainTableSegmentList);
+          // Add all valid segments of main table which needs to be loaded again
+          mainTableSegmentList.addAll(validMainTableSegmentList);
+          segmentMapping.put(relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT
+              + relationIdentifier.getTableName(), mainTableSegmentList);
+        } else {
+          segmentMapping.put(relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT
+              + relationIdentifier.getTableName(), mainTableSegmentList);
+        }
+      }
+    }
+    // Remove invalid datamap segments
+    if (!invalidDataMapSegmentList.isEmpty()) {
+      for (LoadMetadataDetails loadMetadataDetail : listOfLoadFolderDetails) {
+        if (invalidDataMapSegmentList.contains(loadMetadataDetail.getLoadName())) {
+          loadMetadataDetail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
+        }
+      }
+    }
+    return true;
   }
 
   /**
@@ -126,4 +343,6 @@ public abstract class DataMapProvider {
   public abstract DataMapFactory getDataMapFactory();
 
   public abstract boolean supportRebuild();
+
+  public abstract boolean rebuildInternal(String newLoadName, Map<String, List<String>> segmentMap);
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
index 95c69c1..0a604fb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
@@ -30,7 +30,9 @@ import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.ObjectSerializationUtil;
@@ -250,4 +252,22 @@ public class DataMapUtil {
     return ssm.getValidAndInvalidSegments();
   }
 
+  /**
+   * Returns valid segment list for a given RelationIdentifier
+   *
+   * @param relationIdentifier get list of segments for relation identifier
+   * @return list of valid segment id's
+   * @throws IOException
+   */
+  public static List<String> getMainTableValidSegmentList(RelationIdentifier relationIdentifier)
+      throws IOException {
+    List<String> segmentList = new ArrayList<>();
+    List<Segment> validSegments = new SegmentStatusManager(AbsoluteTableIdentifier
+        .from(relationIdentifier.getTablePath(), relationIdentifier.getDatabaseName(),
+            relationIdentifier.getTableName())).getValidAndInvalidSegments().getValidSegments();
+    for (Segment segment : validSegments) {
+      segmentList.add(segment.getSegmentNo());
+    }
+    return segmentList;
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapSyncStatus.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapSyncStatus.java
new file mode 100644
index 0000000..eb7bf47
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapSyncStatus.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap.dev;
+
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapUtil;
+import org.apache.carbondata.core.datamap.status.DataMapSegmentStatusUtil;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+/**
+ * Interface to check whether datamap can be enabled
+ */
+@InterfaceAudience.Developer("DataMap")
+public abstract class DataMapSyncStatus {
+
+  /**
+   * This method checks if main table and datamap table are synchronised or not. If synchronised
+   * return true to enable the datamap
+   *
+   * @param dataMapSchema of datamap to be disabled or enabled
+   * @return flag to enable or disable datamap
+   * @throws IOException
+   */
+  public static boolean canDataMapBeEnabled(DataMapSchema dataMapSchema) throws IOException {
+    boolean isDataMapInSync = true;
+    String metaDataPath =
+        CarbonTablePath.getMetadataPath(dataMapSchema.getRelationIdentifier().getTablePath());
+    LoadMetadataDetails[] dataMapLoadMetadataDetails =
+        SegmentStatusManager.readLoadMetadata(metaDataPath);
+    Map<String, List<String>> dataMapSegmentMap = new HashMap<>();
+    for (LoadMetadataDetails loadMetadataDetail : dataMapLoadMetadataDetails) {
+      Map<String, List<String>> segmentMap =
+          DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetail.getExtraInfo());
+      if (dataMapSegmentMap.isEmpty()) {
+        dataMapSegmentMap.putAll(segmentMap);
+      } else {
+        for (Map.Entry<String, List<String>> entry : segmentMap.entrySet()) {
+          if (null != dataMapSegmentMap.get(entry.getKey())) {
+            dataMapSegmentMap.get(entry.getKey()).addAll(entry.getValue());
+          }
+        }
+      }
+    }
+    List<RelationIdentifier> parentTables = dataMapSchema.getParentTables();
+    for (RelationIdentifier parentTable : parentTables) {
+      List<String> mainTableValidSegmentList =
+          DataMapUtil.getMainTableValidSegmentList(parentTable);
+      if (!mainTableValidSegmentList.isEmpty() && !dataMapSegmentMap.isEmpty()) {
+        isDataMapInSync = dataMapSegmentMap.get(
+            parentTable.getDatabaseName() + CarbonCommonConstants.POINT + parentTable
+                .getTableName()).containsAll(mainTableValidSegmentList);
+      } else if (dataMapSegmentMap.isEmpty() && !mainTableValidSegmentList.isEmpty()) {
+        isDataMapInSync = false;
+      }
+    }
+    return isDataMapInSync;
+  }
+}
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapSegmentStatusUtil.java b/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapSegmentStatusUtil.java
new file mode 100644
index 0000000..9e00104
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapSegmentStatusUtil.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap.status;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+
+import com.google.gson.Gson;
+
+/**
+ * Utility class to get updated segment mapping for datamap table
+ */
+public class DataMapSegmentStatusUtil {
+
+  /**
+   * This method will convert string to map and return segmentMap
+   */
+  public static Map<String, List<String>> getSegmentMap(String segmentMap) {
+    Gson gson = new Gson();
+    return gson.fromJson(segmentMap, Map.class);
+  }
+
+  /**
+   * In case of compaction on dataMap table,this method will merge the segment list of main table
+   * and return updated segment mapping
+   *
+   * @param mergedLoadName      to find which all segments are merged to new compacted segment
+   * @param dataMapSchema       of datamap table
+   * @param loadMetadataDetails of datamap table
+   * @return updated segment map after merging segment list
+   */
+  public static String getUpdatedSegmentMap(String mergedLoadName, DataMapSchema dataMapSchema,
+      LoadMetadataDetails[] loadMetadataDetails) {
+    Map<String, List<String>> segmentMapping = new HashMap<>();
+    List<RelationIdentifier> relationIdentifiers = dataMapSchema.getParentTables();
+    for (RelationIdentifier relationIdentifier : relationIdentifiers) {
+      for (LoadMetadataDetails loadMetadataDetail : loadMetadataDetails) {
+        if (loadMetadataDetail.getSegmentStatus() == SegmentStatus.COMPACTED) {
+          if (mergedLoadName.equalsIgnoreCase(loadMetadataDetail.getMergedLoadName())) {
+            if (segmentMapping.isEmpty()) {
+              segmentMapping.putAll(getSegmentMap(loadMetadataDetail.getExtraInfo()));
+            } else {
+              segmentMapping.get(relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT
+                  + relationIdentifier.getTableName()).addAll(
+                  getSegmentMap(loadMetadataDetail.getExtraInfo()).get(
+                      relationIdentifier.getDatabaseName() + CarbonCommonConstants.POINT
+                          + relationIdentifier.getTableName()));
+            }
+          }
+        }
+      }
+    }
+    Gson gson = new Gson();
+    return gson.toJson(segmentMapping);
+  }
+}
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java
index c2f97ea..96b053f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java
@@ -23,9 +23,19 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.locks.ICarbonLock;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.log4j.Logger;
 
 /**
  * Maintains the status of each datamap. As per the status query will decide whether to hit datamap
@@ -38,6 +48,9 @@ public class DataMapStatusManager {
 
   }
 
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(DataMapStatusManager.class.getName());
+
   /**
    * TODO Use factory when we have more storage providers
    */
@@ -95,9 +108,7 @@ public class DataMapStatusManager {
         DataMapStoreManager.getInstance().getDataMapSchemasOfTable(table);
     List<DataMapSchema> dataMapToBeDisabled = new ArrayList<>(allDataMapSchemas.size());
     for (DataMapSchema dataMap : allDataMapSchemas) {
-      // TODO all non datamaps like MV is now supports only lazy. Once the support is made the
-      // following check can be removed.
-      if (dataMap.isLazy() || !dataMap.isIndexDataMap()) {
+      if (dataMap.isLazy()) {
         dataMapToBeDisabled.add(dataMap);
       }
     }
@@ -122,9 +133,61 @@ public class DataMapStatusManager {
     }
   }
 
-  private static DataMapSchema getDataMapSchema(String dataMapName)
+  public static DataMapSchema getDataMapSchema(String dataMapName)
       throws IOException, NoSuchDataMapException {
     return DataMapStoreManager.getInstance().getDataMapSchema(dataMapName);
   }
 
+  /**
+   * This method will remove all segments of dataMap table in case of Insert-Overwrite/Update/Delete
+   * operations on main table
+   *
+   * @param allDataMapSchemas of main carbon table
+   * @throws IOException
+   */
+  public static void truncateDataMap(List<DataMapSchema> allDataMapSchemas)
+      throws IOException, NoSuchDataMapException {
+    for (DataMapSchema datamapschema : allDataMapSchemas) {
+      if (!datamapschema.isLazy()) {
+        disableDataMap(datamapschema.getDataMapName());
+      }
+      RelationIdentifier dataMapRelationIdentifier = datamapschema.getRelationIdentifier();
+      SegmentStatusManager segmentStatusManager = new SegmentStatusManager(AbsoluteTableIdentifier
+          .from(dataMapRelationIdentifier.getTablePath(),
+              dataMapRelationIdentifier.getDatabaseName(),
+              dataMapRelationIdentifier.getTableName()));
+      ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
+      try {
+        if (carbonLock.lockWithRetries()) {
+          LOGGER.info("Acquired lock for table" + dataMapRelationIdentifier.getDatabaseName() + "."
+              + dataMapRelationIdentifier.getTableName() + " for table status updation");
+          String metaDataPath =
+              CarbonTablePath.getMetadataPath(dataMapRelationIdentifier.getTablePath());
+          LoadMetadataDetails[] loadMetadataDetails =
+              SegmentStatusManager.readLoadMetadata(metaDataPath);
+          for (LoadMetadataDetails entry : loadMetadataDetails) {
+            entry.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
+          }
+          SegmentStatusManager.writeLoadDetailsIntoFile(
+              CarbonTablePath.getTableStatusFilePath(dataMapRelationIdentifier.getTablePath()),
+              loadMetadataDetails);
+        } else {
+          LOGGER.error("Not able to acquire the lock for Table status updation for table "
+              + dataMapRelationIdentifier.getDatabaseName() + "." + dataMapRelationIdentifier
+              .getTableName());
+        }
+      } finally {
+        if (carbonLock.unlock()) {
+          LOGGER.info(
+              "Table unlocked successfully after table status updation" + dataMapRelationIdentifier
+                  .getDatabaseName() + "." + dataMapRelationIdentifier.getTableName());
+        } else {
+          LOGGER.error(
+              "Unable to unlock Table lock for table" + dataMapRelationIdentifier.getDatabaseName()
+                  + "." + dataMapRelationIdentifier.getTableName()
+                  + " during table status updation");
+        }
+      }
+    }
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java b/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java
index 27cb1cc..723010e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java
@@ -31,6 +31,7 @@ import java.util.List;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.dev.DataMapSyncStatus;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
@@ -114,6 +115,12 @@ public class DiskBasedDataMapStatusProvider implements DataMapStatusStorageProvi
       locked = carbonTableStatusLock.lockWithRetries();
       if (locked) {
         LOG.info("Datamap status lock has been successfully acquired.");
+        if (dataMapStatus == DataMapStatus.ENABLED && !dataMapSchemas.get(0).isIndexDataMap()) {
+          // Enable datamap only if datamap tables and main table are in sync
+          if (!DataMapSyncStatus.canDataMapBeEnabled(dataMapSchemas.get(0))) {
+            return;
+          }
+        }
         DataMapStatusDetail[] dataMapStatusDetails = getDataMapStatusDetails();
         List<DataMapStatusDetail> dataMapStatusList = Arrays.asList(dataMapStatusDetails);
         dataMapStatusList = new ArrayList<>(dataMapStatusList);
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 0f1f628..afb5fd3 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -1018,6 +1018,12 @@ public class CarbonTable implements Serializable, Writable {
         !tableInfo.getParentRelationIdentifiers().isEmpty();
   }
 
+  public boolean isChildTable() {
+    return null != tableInfo.getFactTable().getTableProperties()
+        .get(CarbonCommonConstants.PARENT_TABLES) && !tableInfo.getFactTable().getTableProperties()
+        .get(CarbonCommonConstants.PARENT_TABLES).isEmpty();
+  }
+
   /**
    * Return true if this is an external table (table with property "_external"="true", this is
    * an internal table property set during table creation)
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
index 99dddba..54a6d0f 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
@@ -133,6 +133,11 @@ public class LoadMetadataDetails implements Serializable {
     return partitionCount;
   }
 
+  /**
+   * extraInfo will contain segment mapping Information for datamap table
+   */
+  private String extraInfo;
+
   @Deprecated
   public void setPartitionCount(String partitionCount) {
     this.partitionCount = partitionCount;
@@ -434,4 +439,12 @@ public class LoadMetadataDetails implements Serializable {
     return "LoadMetadataDetails{" + "loadStatus=" + loadStatus + ", loadName='" + loadName + '\''
         + ", loadStartTime='" + loadStartTime + '\'' + ", segmentFile='" + segmentFile + '\'' + '}';
   }
+
+  public String getExtraInfo() {
+    return extraInfo;
+  }
+
+  public void setExtraInfo(String extraInfo) {
+    this.extraInfo = extraInfo;
+  }
 }
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
index bb56e58..315c66b 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
@@ -27,9 +27,11 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo
 import org.apache.carbondata.datamap.DataMapManager
 import org.apache.carbondata.mv.rewrite.{SummaryDataset, SummaryDatasetCatalog}
 
@@ -88,7 +90,8 @@ class MVAnalyzerRule(sparkSession: SparkSession) extends Rule[LogicalPlan] {
     if (!plan.isInstanceOf[Command]  && !plan.isInstanceOf[DeserializeToObject]) {
       val catalogs = extractCatalogs(plan)
       !isDataMapReplaced(catalog.listAllValidSchema(), catalogs) &&
-      isDataMapExists(catalog.listAllValidSchema(), catalogs)
+      isDataMapExists(catalog.listAllValidSchema(), catalogs) &&
+      !isSegmentSetForMainTable(catalogs)
     } else {
       false
     }
@@ -136,4 +139,29 @@ class MVAnalyzerRule(sparkSession: SparkSession) extends Rule[LogicalPlan] {
     }
     catalogs
   }
+
+  /**
+   * Check if any segments are set for main table for Query. If any segments are set, then
+   * skip mv datamap table for query
+   */
+  def isSegmentSetForMainTable(catalogs: Seq[Option[CatalogTable]]): Boolean = {
+    catalogs.foreach { c =>
+      val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+      if (carbonSessionInfo != null) {
+        val segmentsToQuery = carbonSessionInfo.getSessionParams
+          .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+                       c.get.identifier.database.get + "." +
+                       c.get.identifier.table, "")
+        if (segmentsToQuery.isEmpty || segmentsToQuery.equalsIgnoreCase("*")) {
+          return false
+        } else {
+          return true
+        }
+      } else {
+        return false
+      }
+    }
+    false
+  }
+
 }
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
index 5ffc46a..26c4cb6 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
@@ -18,7 +18,9 @@ package org.apache.carbondata.mv.datamap
 
 import java.io.IOException
 
-import org.apache.spark.sql.SparkSession
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonSession, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
 import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
@@ -29,8 +31,11 @@ import org.apache.spark.sql.util.SparkSQLUtil
 
 import org.apache.carbondata.common.annotations.InterfaceAudience
 import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.{DataMapCatalog, DataMapProvider, DataMapStoreManager}
 import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory}
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager
 import org.apache.carbondata.core.indexstore.Blocklet
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
 import org.apache.carbondata.mv.rewrite.{SummaryDataset, SummaryDatasetCatalog}
@@ -43,6 +48,8 @@ class MVDataMapProvider(
   extends DataMapProvider(mainTable, dataMapSchema) {
   protected var dropTableCommand: CarbonDropTableCommand = null
 
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
   @throws[MalformedDataMapCommandException]
   @throws[IOException]
   override def initMeta(ctasSqlStatement: String): Unit = {
@@ -55,6 +62,11 @@ class MVDataMapProvider(
   }
 
   override def initData(): Unit = {
+    if (!dataMapSchema.isLazy) {
+      if (rebuild()) {
+        DataMapStatusManager.enableDataMap(dataMapSchema.getDataMapName)
+      }
+    }
   }
 
   @throws[IOException]
@@ -75,7 +87,8 @@ class MVDataMapProvider(
   }
 
   @throws[IOException]
-  override def rebuild(): Unit = {
+  override def rebuildInternal(newLoadName: String,
+      segmentMap: java.util.Map[String, java.util.List[String]]): Boolean = {
     val ctasQuery = dataMapSchema.getCtasQuery
     if (ctasQuery != null) {
       val identifier = dataMapSchema.getRelationIdentifier
@@ -91,29 +104,78 @@ class MVDataMapProvider(
       val queryPlan = SparkSQLUtil.execute(
         sparkSession.sql(updatedQuery).queryExecution.analyzed,
         sparkSession).drop("preAgg")
-      val header = logicalPlan.output.map(_.name).mkString(",")
+      var isOverwriteTable = false
+      val isFullRefresh =
+        if (null != dataMapSchema.getProperties.get("full_refresh")) {
+          dataMapSchema.getProperties.get("full_refresh").toBoolean
+        } else {
+          false
+        }
+      if (isFullRefresh) {
+        isOverwriteTable = true
+      }
+      val dataMapTable = CarbonTable
+        .buildFromTablePath(identifier.getTableName,
+          identifier.getDatabaseName,
+          identifier.getTablePath,
+          identifier.getTableId)
+      // Set specified segments for incremental load
+      val segmentMapIterator = segmentMap.entrySet().iterator()
+      while (segmentMapIterator.hasNext) {
+        val entry = segmentMapIterator.next()
+        setSegmentsToLoadDataMap(entry.getKey, entry.getValue)
+      }
+      val header = dataMapTable.getTableInfo.getFactTable.getListOfColumns.asScala
+        .filter { column =>
+          !column.getColumnName
+            .equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)
+        }.sortBy(_.getSchemaOrdinal).map(_.getColumnName).mkString(",")
       val loadCommand = CarbonLoadDataCommand(
         databaseNameOp = Some(identifier.getDatabaseName),
         tableName = identifier.getTableName,
         factPathFromUser = null,
         dimFilesPath = Seq(),
         options = scala.collection.immutable.Map("fileheader" -> header),
-        isOverwriteTable = true,
+        isOverwriteTable,
         inputSqlString = null,
         dataFrame = Some(queryPlan),
         updateModel = None,
         tableInfoOp = None,
-        internalOptions = Map.empty,
+        internalOptions = Map("mergedSegmentName" -> newLoadName),
         partition = Map.empty)
 
-      SparkSQLUtil.execute(loadCommand, sparkSession)
+      try {
+        SparkSQLUtil.execute(loadCommand, sparkSession)
+      } catch {
+        case ex: Exception =>
+          DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName)
+          LOGGER.error("Data Load failed for DataMap: ", ex)
+          return false
+      } finally {
+        unsetMainTableSegments()
+      }
     }
+    true
   }
 
-  @throws[IOException]
-  override def incrementalBuild(
-      segmentIds: Array[String]): Unit = {
-    throw new UnsupportedOperationException
+  /**
+   * This method will set main table segments which needs to be loaded to mv dataMap
+   */
+  private def setSegmentsToLoadDataMap(tableUniqueName: String,
+      mainTableSegmentList: java.util.List[String]): Unit = {
+    CarbonSession
+      .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+                 tableUniqueName, mainTableSegmentList.asScala.mkString(","))
+  }
+
+  private def unsetMainTableSegments(): Unit = {
+    val relationIdentifiers = dataMapSchema.getParentTables.asScala
+    for (relationIdentifier <- relationIdentifiers) {
+      CarbonSession
+        .threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+                     relationIdentifier.getDatabaseName + "." +
+                     relationIdentifier.getTableName)
+    }
   }
 
   override def createDataMapCatalog : DataMapCatalog[SummaryDataset] =
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index 9bed098..6d0b2d3 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -27,14 +27,17 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Expression, NamedExpression, ScalaUDF, SortOrder}
 import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join, LogicalPlan, Project}
 import org.apache.spark.sql.execution.command.{Field, TableModel, TableNewProcessor}
 import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
 import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, RelationIdentifier}
 import org.apache.carbondata.datamap.DataMapManager
@@ -81,6 +84,7 @@ object MVHelper {
     dmProperties.foreach(t => tableProperties.put(t._1, t._2))
 
     val selectTables = getTables(logicalPlan)
+    val parentTables = new util.ArrayList[String]()
     selectTables.foreach { selectTable =>
       val mainCarbonTable = try {
         Some(CarbonEnv.getCarbonTable(selectTable.identifier.database,
@@ -89,12 +93,14 @@ object MVHelper {
         // Exception handling if it's not a CarbonTable
         case ex : Exception => None
       }
-
+      parentTables.add(mainCarbonTable.get.getTableName)
       if (!mainCarbonTable.isEmpty && mainCarbonTable.get.isStreamingSink) {
         throw new MalformedCarbonCommandException(
           s"Streaming table does not support creating MV datamap")
       }
     }
+    tableProperties.put(CarbonCommonConstants.DATAMAP_NAME, dataMapSchema.getDataMapName)
+    tableProperties.put(CarbonCommonConstants.PARENT_TABLES, parentTables.asScala.mkString(","))
 
     // TODO inherit the table properties like sort order, sort scope and block size from parent
     // tables to mv datamap table
@@ -127,14 +133,20 @@ object MVHelper {
     dataMapSchema
       .setRelationIdentifier(new RelationIdentifier(tableIdentifier.database.get,
         tableIdentifier.table,
-        ""))
+        CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession).getTableId))
 
     val parentIdents = selectTables.map { table =>
-      new RelationIdentifier(table.database, table.identifier.table, "")
+      val relationIdentifier = new RelationIdentifier(table.database, table.identifier.table, "")
+      relationIdentifier.setTablePath(FileFactory.getUpdatedFilePath(table.location.toString))
+      relationIdentifier
     }
+    dataMapSchema.getRelationIdentifier.setTablePath(tablePath)
     dataMapSchema.setParentTables(new util.ArrayList[RelationIdentifier](parentIdents.asJava))
     dataMapSchema.getProperties.put("full_refresh", fullRebuild.toString)
     DataMapStoreManager.getInstance().saveDataMapSchema(dataMapSchema)
+    if (dataMapSchema.isLazy) {
+      DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName)
+    }
   }
 
   private def validateMVQuery(sparkSession: SparkSession,
@@ -241,6 +253,12 @@ object MVHelper {
         }.isDefined || isFullReload
         exp
     }
+    // TODO:- Remove this case when incremental datalaoding is supported for multiple tables
+    logicalPlan.transformDown {
+      case join@Join(l1, l2, jointype, condition) =>
+        isFullReload = true
+        join
+    }
     isFullReload
   }
 
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
index 16c0567..5016bbe 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -105,7 +105,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with simple and same projection") {
     sql("drop datamap if exists datamap1")
     sql("create datamap datamap1 using 'mv' as select empname, designation from fact_table1")
-    sql(s"rebuild datamap datamap1")
     val df = sql("select empname,designation from fact_table1")
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap1"))
@@ -116,7 +115,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with simple and sub projection") {
     sql("drop datamap if exists datamap2")
     sql("create datamap datamap2 using 'mv' as select empname, designation from fact_table1")
-    sql(s"rebuild datamap datamap2")
     val df = sql("select empname from fact_table1")
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap2"))
@@ -127,7 +125,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with simple and same projection with projection filter") {
     sql("drop datamap if exists datamap3")
     sql("create datamap datamap3 using 'mv' as select empname, designation from fact_table1")
-    sql(s"rebuild datamap datamap3")
     val frame = sql("select empname, designation from fact_table1 where empname='shivani'")
     val analyzed = frame.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap3"))
@@ -138,7 +135,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("test create datamap with simple and sub projection with non projection filter") {
     sql("create datamap datamap4 using 'mv' as select empname, designation from fact_table1")
-    sql(s"rebuild datamap datamap4")
     val frame = sql("select designation from fact_table1 where empname='shivani'")
     val analyzed = frame.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap4"))
@@ -148,7 +144,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("test create datamap with simple and sub projection with datamap filter") {
     sql("create datamap datamap5 using 'mv' as select empname, designation from fact_table1 where empname='shivani'")
-    sql(s"rebuild datamap datamap5")
     val frame = sql("select designation from fact_table1 where empname='shivani'")
     val analyzed = frame.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap5"))
@@ -158,7 +153,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("test create datamap with simple and same projection with datamap filter ") {
     sql("create datamap datamap6 using 'mv' as select empname, designation from fact_table1 where empname='shivani'")
-    sql(s"rebuild datamap datamap6")
     val frame = sql("select empname,designation from fact_table1 where empname='shivani'")
     val analyzed = frame.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap6"))
@@ -168,7 +162,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("test create datamap with simple and same projection with datamap filter and extra query column filter") {
     sql("create datamap datamap7 using 'mv' as select empname, designation from fact_table1 where empname='shivani'")
-    sql(s"rebuild datamap datamap7")
     val frame = sql(
       "select empname,designation from fact_table1 where empname='shivani' and designation='SA'")
     val analyzed = frame.queryExecution.analyzed
@@ -179,7 +172,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("test create datamap with simple and same projection with datamap filter and different column filter") {
     sql("create datamap datamap8 using 'mv' as select empname, designation from fact_table1 where empname='shivani'")
-    sql(s"rebuild datamap datamap8")
     val frame = sql("select empname,designation from fact_table1 where designation='SA'")
     val analyzed = frame.queryExecution.analyzed
     assert(!verifyMVDataMap(analyzed, "datamap8"))
@@ -189,7 +181,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("test create datamap with simple and same projection with datamap filter on non projection column and extra column filter") {
     sql("create datamap datamap9 using 'mv' as select empname, designation from fact_table1 where deptname='cloud'")
-    sql(s"rebuild datamap datamap9")
     val frame = sql("select empname,designation from fact_table1 where deptname='cloud'")
     val analyzed = frame.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap9"))
@@ -199,7 +190,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("test create datamap with simple and same projection with datamap filter on non projection column and no column filter") {
     sql("create datamap datamap10 using 'mv' as select empname, designation from fact_table1 where deptname='cloud'")
-    sql(s"rebuild datamap datamap10")
     val frame = sql("select empname,designation from fact_table1")
     val analyzed = frame.queryExecution.analyzed
     assert(!verifyMVDataMap(analyzed, "datamap10"))
@@ -209,7 +199,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("test create datamap with simple and same projection with datamap filter on non projection column and different column filter") {
     sql("create datamap datamap11 using 'mv' as select empname, designation from fact_table1 where deptname='cloud'")
-    sql(s"rebuild datamap datamap11")
     val frame = sql("select empname,designation from fact_table1 where designation='SA'")
     val analyzed = frame.queryExecution.analyzed
     assert(!verifyMVDataMap(analyzed, "datamap11"))
@@ -220,7 +209,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with simple and same group by query") {
     sql("drop datamap if exists datamap12")
     sql("create datamap datamap12 using 'mv' as select empname, sum(utilization) from fact_table1 group by empname")
-    sql(s"rebuild datamap datamap12")
     val frame = sql("select empname, sum(utilization) from fact_table1 group by empname")
     val analyzed = frame.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap12"))
@@ -231,7 +219,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with simple and sub group by query") {
     sql("drop datamap if exists datamap13")
     sql("create datamap datamap13 using 'mv' as select empname, sum(utilization) from fact_table1 group by empname")
-    sql(s"rebuild datamap datamap13")
     val frame = sql("select sum(utilization) from fact_table1 group by empname")
     val analyzed = frame.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap13"))
@@ -242,7 +229,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with simple and sub group by query with filter on query") {
     sql("drop datamap if exists datamap14")
     sql("create datamap datamap14 using 'mv' as select empname, sum(utilization) from fact_table1 group by empname")
-    sql(s"rebuild datamap datamap14")
     val frame = sql(
       "select empname,sum(utilization) from fact_table1 group by empname having empname='shivani'")
     val analyzed = frame.queryExecution.analyzed
@@ -254,7 +240,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with simple and sub group and sub projection by query with filter on query") {
     sql("drop datamap if exists datamap32")
     sql("create datamap datamap32 using 'mv' as select empname, sum(utilization) from fact_table1 group by empname")
-    sql(s"rebuild datamap datamap32")
     val frame = sql(
       "select empname, sum(utilization) from fact_table1 group by empname having empname='shivani'")
     val analyzed = frame.queryExecution.analyzed
@@ -265,7 +250,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("test create datamap with simple and sub group by query with filter on datamap") {
     sql("create datamap datamap15 using 'mv' as select empname, sum(utilization) from fact_table1 where empname='shivani' group by empname")
-    sql(s"rebuild datamap datamap15")
     val frame = sql(
       "select empname,sum(utilization) from fact_table1 where empname='shivani' group by empname")
     val analyzed = frame.queryExecution.analyzed
@@ -276,7 +260,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("test create datamap with simple and sub group by query with filter on datamap and no filter on query") {
     sql("create datamap datamap16 using 'mv' as select empname, sum(utilization) from fact_table1 where empname='shivani' group by empname")
-    sql(s"rebuild datamap datamap16")
     val frame = sql("select empname,sum(utilization) from fact_table1 group by empname")
     val analyzed = frame.queryExecution.analyzed
     assert(!verifyMVDataMap(analyzed, "datamap16"))
@@ -286,7 +269,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("test create datamap with simple and same group by with expression") {
     sql("create datamap datamap17 using 'mv' as select empname, sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table1 group by empname")
-    sql(s"rebuild datamap datamap17")
     val frame = sql(
       "select empname, sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table1 group" +
       " by empname")
@@ -300,7 +282,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with simple and sub group by with expression") {
     sql("drop datamap if exists datamap18")
     sql("create datamap datamap18 using 'mv' as select empname, sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table1 group by empname")
-    sql(s"rebuild datamap datamap18")
     val frame = sql(
       "select sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table1 group by empname")
     val analyzed = frame.queryExecution.analyzed
@@ -312,7 +293,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with simple and sub count group by with expression") {
     sql("drop datamap if exists datamap19")
     sql("create datamap datamap19 using 'mv' as select empname, count(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table1 group by empname")
-    sql(s"rebuild datamap datamap19")
     val frame = sql(
       "select count(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table1 group by empname")
     val analyzed = frame.queryExecution.analyzed
@@ -324,7 +304,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with simple and sub group by with expression and filter on query") {
     sql("drop datamap if exists datamap20")
     sql("create datamap datamap20 using 'mv' as select empname, sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table1 group by empname")
-    sql(s"rebuild datamap datamap20")
     val frame = sql(
       "select sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table1 where " +
       "empname='shivani' group by empname")
@@ -338,7 +317,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with simple join") {
     sql("drop datamap if exists datamap21")
     sql("create datamap datamap21 using 'mv' as select t1.empname as c1, t2.designation, t2.empname as c2 from fact_table1 t1 inner join fact_table2 t2  on (t1.empname = t2.empname)")
-    sql(s"rebuild datamap datamap21")
     val frame = sql(
       "select t1.empname as c1, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = t2.empname")
     val analyzed = frame.queryExecution.analyzed
@@ -350,7 +328,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with simple join and filter on query") {
     sql("drop datamap if exists datamap22")
     sql("create datamap datamap22 using 'mv' as select t1.empname, t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname)")
-    sql(s"rebuild datamap datamap22")
     val frame = sql(
       "select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = " +
       "t2.empname and t1.empname='shivani'")
@@ -365,7 +342,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with simple join and filter on query and datamap") {
     sql("drop datamap if exists datamap23")
     sql("create datamap datamap23 using 'mv' as select t1.empname, t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) where t1.empname='shivani'")
-    sql(s"rebuild datamap datamap23")
     val frame = sql(
       "select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = " +
       "t2.empname and t1.empname='shivani'")
@@ -379,7 +355,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with simple join and filter on datamap and no filter on query") {
     sql("drop datamap if exists datamap24")
     sql("create datamap datamap24 using 'mv' as select t1.empname, t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) where t1.empname='shivani'")
-    sql(s"rebuild datamap datamap24")
     val frame = sql(
       "select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = t2.empname")
     val analyzed = frame.queryExecution.analyzed
@@ -391,7 +366,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with multiple join") {
     sql("drop datamap if exists datamap25")
     sql("create datamap datamap25 using 'mv' as select t1.empname as c1, t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) inner join fact_table3 t3  on (t1.empname=t3.empname)")
-    sql(s"rebuild datamap datamap25")
     val frame = sql(
       "select t1.empname as c1, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = t2.empname")
     val analyzed = frame.queryExecution.analyzed
@@ -406,7 +380,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
   ignore("test create datamap with simple join on datamap and multi join on query") {
     sql("create datamap datamap26 using 'mv' as select t1.empname, t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname)")
-    sql(s"rebuild datamap datamap26")
     val frame = sql(
       "select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2,fact_table3 " +
       "t3  where t1.empname = t2.empname and t1.empname=t3.empname")
@@ -419,7 +392,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("test create datamap with join with group by") {
     sql("create datamap datamap27 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 inner join fact_table2 t2  on (t1.empname = t2.empname) group by t1.empname, t2.designation")
-    sql(s"rebuild datamap datamap27")
     val frame = sql(
       "select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2  " +
       "where t1.empname = t2.empname group by t1.empname, t2.designation")
@@ -433,7 +405,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with join with group by and sub projection") {
     sql("drop datamap if exists datamap28")
     sql("create datamap datamap28 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 inner join fact_table2 t2  on (t1.empname = t2.empname) group by t1.empname, t2.designation")
-    sql(s"rebuild datamap datamap28")
     val frame = sql(
       "select t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2  where " +
       "t1.empname = t2.empname group by t2.designation")
@@ -447,7 +418,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with join with group by and sub projection with filter") {
     sql("drop datamap if exists datamap29")
     sql("create datamap datamap29 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 inner join fact_table2 t2  on (t1.empname = t2.empname) group by t1.empname, t2.designation")
-    sql(s"rebuild datamap datamap29")
     val frame = sql(
       "select t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2  where " +
       "t1.empname = t2.empname and t1.empname='shivani' group by t2.designation")
@@ -461,7 +431,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   ignore("test create datamap with join with group by with filter") {
     sql("drop datamap if exists datamap30")
     sql("create datamap datamap30 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation")
-    sql(s"rebuild datamap datamap30")
     val frame = sql(
       "select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2  " +
       "where t1.empname = t2.empname and t2.designation='SA' group by t1.empname, t2.designation")
@@ -475,7 +444,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   ignore("test create datamap with expression on projection") {
     sql(s"drop datamap if exists datamap31")
     sql("create datamap datamap31 using 'mv' as select empname, designation, utilization, projectcode from fact_table1 ")
-    sql(s"rebuild datamap datamap31")
     val frame = sql(
       "select empname, designation, utilization+projectcode from fact_table1")
     val analyzed = frame.queryExecution.analyzed
@@ -487,7 +455,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with simple and sub group by query and count agg") {
     sql(s"drop datamap if exists datamap32")
     sql("create datamap datamap32 using 'mv' as select empname, count(utilization) from fact_table1 group by empname")
-    sql(s"rebuild datamap datamap32")
     val frame = sql("select empname,count(utilization) from fact_table1 where empname='shivani' group by empname")
     val analyzed = frame.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap32"))
@@ -498,7 +465,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   ignore("test create datamap with simple and sub group by query and avg agg") {
     sql(s"drop datamap if exists datamap33")
     sql("create datamap datamap33 using 'mv' as select empname, avg(utilization) from fact_table1 group by empname")
-    sql(s"rebuild datamap datamap33")
     val frame = sql("select empname,avg(utilization) from fact_table1 where empname='shivani' group by empname")
     val analyzed = frame.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap33"))
@@ -509,7 +475,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   ignore("test create datamap with left join with group by") {
     sql("drop datamap if exists datamap34")
     sql("create datamap datamap34 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2  on t1.empname = t2.empname group by t1.empname, t2.designation")
-    sql(s"rebuild datamap datamap34")
     val frame = sql(
       "select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2  " +
       "on t1.empname = t2.empname group by t1.empname, t2.designation")
@@ -522,7 +487,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
   ignore("test create datamap with simple and group by query with filter on datamap but not on projection") {
     sql("create datamap datamap35 using 'mv' as select designation, sum(utilization) from fact_table1 where empname='shivani' group by designation")
-    sql(s"rebuild datamap datamap35")
     val frame = sql(
       "select designation, sum(utilization) from fact_table1 where empname='shivani' group by designation")
     val analyzed = frame.queryExecution.analyzed
@@ -533,7 +497,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
   ignore("test create datamap with simple and sub group by query with filter on datamap but not on projection") {
     sql("create datamap datamap36 using 'mv' as select designation, sum(utilization) from fact_table1 where empname='shivani' group by designation")
-    sql(s"rebuild datamap datamap36")
     val frame = sql(
       "select sum(utilization) from fact_table1 where empname='shivani' group by designation")
     val analyzed = frame.queryExecution.analyzed
@@ -545,7 +508,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with agg push join with sub group by ") {
     sql("drop datamap if exists datamap37")
     sql("create datamap datamap37 using 'mv' as select empname, designation, sum(utilization) from fact_table1 group by empname, designation")
-    sql(s"rebuild datamap datamap37")
     val frame = sql(
       "select t1.empname, sum(t1.utilization) from fact_table1 t1,fact_table2 t2  " +
       "where t1.empname = t2.empname group by t1.empname")
@@ -559,7 +521,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with agg push join with group by ") {
     sql("drop datamap if exists datamap38")
     sql("create datamap datamap38 using 'mv' as select empname, designation, sum(utilization) from fact_table1 group by empname, designation")
-    sql(s"rebuild datamap datamap38")
     val frame = sql(
       "select t1.empname, t1.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2  " +
       "where t1.empname = t2.empname group by t1.empname,t1.designation")
@@ -573,7 +534,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   ignore("test create datamap with agg push join with group by with filter") {
     sql("drop datamap if exists datamap39")
     sql("create datamap datamap39 using 'mv' as select empname, designation, sum(utilization) from fact_table1 group by empname, designation ")
-    sql(s"rebuild datamap datamap39")
     val frame = sql(
       "select t1.empname, t1.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2  " +
       "where t1.empname = t2.empname and t1.empname='shivani' group by t1.empname,t1.designation")
@@ -587,7 +547,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with more agg push join with group by with filter") {
     sql("drop datamap if exists datamap40")
     sql("create datamap datamap40 using 'mv' as select empname, designation, sum(utilization), count(utilization) from fact_table1 group by empname, designation ")
-    sql(s"rebuild datamap datamap40")
     val frame = sql(
       "select t1.empname, t1.designation, sum(t1.utilization),count(t1.utilization) from fact_table1 t1,fact_table2 t2  " +
       "where t1.empname = t2.empname and t1.empname='shivani' group by t1.empname,t1.designation")
@@ -601,7 +560,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   ignore("test create datamap with left join with group by with filter") {
     sql("drop datamap if exists datamap41")
     sql("create datamap datamap41 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2  on t1.empname = t2.empname group by t1.empname, t2.designation")
-    sql(s"rebuild datamap datamap41")
     val frame = sql(
       "select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2  " +
       "on t1.empname = t2.empname where t1.empname='shivani' group by t1.empname, t2.designation")
@@ -615,7 +573,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   ignore("test create datamap with left join with sub group by") {
     sql("drop datamap if exists datamap42")
     sql("create datamap datamap42 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2  on t1.empname = t2.empname group by t1.empname, t2.designation")
-    sql(s"rebuild datamap datamap42")
     val frame = sql(
       "select t1.empname, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2  " +
       "on t1.empname = t2.empname group by t1.empname")
@@ -629,7 +586,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   ignore("test create datamap with left join with sub group by with filter") {
     sql("drop datamap if exists datamap43")
     sql("create datamap datamap43 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2  on t1.empname = t2.empname group by t1.empname, t2.designation")
-    sql(s"rebuild datamap datamap43")
     val frame = sql(
       "select t1.empname, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2  " +
       "on t1.empname = t2.empname where t1.empname='shivani' group by t1.empname")
@@ -643,7 +599,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   ignore("test create datamap with left join with sub group by with filter on mv") {
     sql("drop datamap if exists datamap44")
     sql("create datamap datamap44 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2  on t1.empname = t2.empname where t1.empname='shivani' group by t1.empname, t2.designation")
-    sql(s"rebuild datamap datamap44")
     val frame = sql(
       "select t1.empname, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2  " +
       "on t1.empname = t2.empname where t1.empname='shivani' group by t1.empname")
@@ -658,7 +613,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     sql("drop datamap if exists datamap45")
 
     sql("create datamap datamap45 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 join fact_table2 t2  on t1.empname = t2.empname group by t1.empname, t2.designation")
-    sql(s"rebuild datamap datamap45")
     // During spark optimizer it converts the left outer join queries with equi join if any filter present on right side table
     val frame = sql(
       "select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2  " +
@@ -678,7 +632,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
     sql(" insert into test4 select 'babu',12,12").show()
     sql("create datamap mv13 using 'mv' as select name,sum(salary) from test4 group by name")
-    sql("rebuild datamap mv13")
     val frame = sql(
       "select name,sum(salary) from test4 group by name")
     val analyzed = frame.queryExecution.analyzed
@@ -689,7 +642,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
     sql("drop datamap if exists MV_order")
     sql("create datamap MV_order using 'mv' as select empname,sum(salary) as total from fact_table1 group by empname")
-    sql("rebuild datamap MV_order")
     val frame = sql(
       "select empname,sum(salary) as total from fact_table1 group by empname order by empname")
     val analyzed = frame.queryExecution.analyzed
@@ -701,7 +653,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     sql("drop datamap if exists MV_order")
     sql("drop datamap if exists MV_desc_order")
     sql("create datamap MV_order using 'mv' as select empname,sum(salary)+sum(utilization) as total from fact_table1 group by empname")
-    sql("rebuild datamap MV_order")
     val frame = sql(
       "select empname,sum(salary)+sum(utilization) as total from fact_table1 group by empname order by empname")
     val analyzed = frame.queryExecution.analyzed
@@ -712,7 +663,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
     sql("drop datamap if exists MV_order")
     sql("create datamap MV_order using 'mv' as select empname,sum(salary)+sum(utilization) as total from fact_table1 group by empname order by empname DESC")
-    sql("rebuild datamap MV_order")
     val frame = sql(
       "select empname,sum(salary)+sum(utilization) as total from fact_table1 group by empname order by empname DESC")
     val analyzed = frame.queryExecution.analyzed
@@ -724,7 +674,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
     sql("drop datamap if exists MV_order")
     sql("create datamap MV_order using 'mv' as select empname,sum(salary)+sum(utilization) as total from fact_table1 group by empname order by empname DESC")
-    sql("rebuild datamap MV_order")
     val frame = sql(
       "select empname,sum(salary)+sum(utilization) as total from fact_table1 where empname = 'ravi' group by empname order by empname DESC")
     val analyzed = frame.queryExecution.analyzed
@@ -739,14 +688,13 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     sql("create table test1( name string,country string,age int,salary int) stored by 'carbondata'")
     sql("insert into test1 select 'name1','USA',12,23")
     sql("create datamap datamv2 using 'mv' as select country,sum(salary) from test1 group by country")
-    sql("rebuild datamap datamv2")
     val frame = sql("select country,sum(salary) from test1 where country='USA' group by country")
     val analyzed = frame.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamv2"))
     sql("insert into test1 select 'name1','USA',12,23")
     val frame1 = sql("select country,sum(salary) from test1 where country='USA' group by country")
     val analyzed1 = frame1.queryExecution.analyzed
-    assert(!verifyMVDataMap(analyzed1, "datamv2"))
+    assert(verifyMVDataMap(analyzed1, "datamv2"))
     sql("drop datamap if exists datamv2")
     sql("drop table if exists test1")
   }
@@ -755,7 +703,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
     sql("drop datamap if exists MV_exp")
     sql("create datamap MV_exp using 'mv' as select sum(salary),substring(empname,2,5),designation from fact_table1 group by substring(empname,2,5),designation")
-    sql("rebuild datamap MV_exp")
     val frame = sql(
       "select sum(salary),substring(empname,2,5),designation from fact_table1 group by substring(empname,2,5),designation")
     val analyzed = frame.queryExecution.analyzed
@@ -776,7 +723,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
       """.stripMargin)
     sql("drop datamap if exists MV_exp")
     sql("create datamap MV_exp using 'mv' as select doj,sum(salary) from xy.fact_tablexy group by doj")
-    sql("rebuild datamap MV_exp")
     val frame = sql(
       "select doj,sum(salary) from xy.fact_tablexy group by doj")
     val analyzed = frame.queryExecution.analyzed
@@ -795,7 +741,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     sql(" insert into mvtable1 select 'n3',12,12")
     sql(" insert into mvtable1 select 'n4',12,12")
     sql("create datamap map1 using 'mv' as select name,sum(salary) from mvtable1 group by name")
-    sql("rebuild datamap map1")
     val frame = sql("select name,sum(salary) from mvtable1 group by name limit 1")
     val analyzed = frame.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "map1"))
@@ -807,7 +752,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
     sql("drop datamap if exists datamap_comp_maxsumminavg")
     sql("create datamap datamap_comp_maxsumminavg using 'mv' as select empname,max(projectenddate),sum(salary),min(projectjoindate),avg(attendance) from fact_table1 group by empname")
-    sql("rebuild datamap datamap_comp_maxsumminavg")
     val frame = sql(
       "select empname,max(projectenddate),sum(salary),min(projectjoindate),avg(attendance) from fact_table1 group by empname")
     val analyzed = frame.queryExecution.analyzed
@@ -821,7 +765,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     intercept[UnsupportedOperationException] {
       sql(
         "create datamap mv_unional using 'mv' as Select Z.deptname From (Select deptname,empname From fact_table1 Union All Select deptname,empname from fact_table2) Z")
-      sql("rebuild datamap mv_unional")
     }
     sql("drop datamap if exists mv_unional")
   }
@@ -833,7 +776,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
       sql(
         "create datamap MV_exp using 'mv' as select sum(case when deptno=11 and (utilization=92) then salary else 0 end) as t from fact_table1 group by empname")
 
-      sql("rebuild datamap MV_exp")
       val frame = sql(
         "select sum(case when deptno=11 and (utilization=92) then salary else 0 end) as t from fact_table1 group by empname")
       val analyzed = frame.queryExecution.analyzed
@@ -853,7 +795,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
     }
     sql("show datamap").show()
-    sql("rebuild datamap MV_exp1")
     val frame = sql(
       "select empname, sum(utilization) from fact_table1 group by empname")
     val analyzed = frame.queryExecution.analyzed
@@ -866,7 +807,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
     sql("drop datamap if exists datamap46")
     sql("create datamap datamap46 using 'mv' as select deptname, sum(salary) from fact_table1 group by deptname")
-    sql("rebuild datamap datamap46")
     val frame = sql(
       "select deptname as babu, sum(salary) from fact_table1 as tt group by deptname")
     val analyzed = frame.queryExecution.analyzed
@@ -878,7 +818,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
     sql("drop datamap if exists datamap_subqry")
     sql("create datamap datamap_subqry using 'mv' as select empname, min(salary) from fact_table1 group by empname")
-    sql("rebuild datamap datamap_subqry")
     val frame = sql(
       "SELECT max(utilization) FROM fact_table1 WHERE salary IN (select min(salary) from fact_table1 group by empname ) group by empname")
     val analyzed = frame.queryExecution.analyzed
@@ -889,10 +828,8 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   test("jira carbondata-2539-1") {
     sql("drop datamap if exists datamap_comp_maxsumminavg")
     sql("create datamap datamap_comp_maxsumminavg using 'mv' as select empname,max(projectenddate),sum(salary),min(projectjoindate),avg(attendance) from fact_table1 group by empname")
-    sql("rebuild datamap datamap_comp_maxsumminavg")
     sql("drop datamap if exists datamap_subqry")
     sql("create datamap datamap_subqry using 'mv' as select min(salary) from fact_table1")
-    sql("rebuild datamap datamap_subqry")
     val frame = sql(
       "SELECT max(utilization) FROM fact_table1 WHERE salary IN (select min(salary) from fact_table1) group by empname")
     val analyzed = frame.queryExecution.analyzed
@@ -985,7 +922,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     sql("drop datamap if exists dm_stream_test3")
     sql("create datamap dm_stream_test3 using 'mv' as select empname, sum(utilization) from " +
         "fact_table1 group by empname")
-    sql("rebuild datamap dm_stream_test3")
     val exception_tb_mv3: Exception = intercept[Exception] {
       sql("alter table fact_table1 set tblproperties('streaming'='true')")
     }
@@ -1009,7 +945,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     sql("insert into all_table select 1,1,null,1,1,1,null,1,1,1,1,1,1,1,1,1,1,1,1")
 
     sql("create datamap all_table_mv on table all_table using 'mv' as " + querySQL)
-    sql("rebuild datamap all_table_mv")
 
     val frame = sql(querySQL)
     val analyzed = frame.queryExecution.analyzed
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
new file mode 100644
index 0000000..bbd7b4c
--- /dev/null
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
@@ -0,0 +1,583 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.status.DataMapSegmentStatusUtil
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
+
+
+/**
+ * Test Class to verify Incremental Load on  MV Datamap
+ */
+
+class MVIncrementalLoadingTestcase extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll(): Unit = {
+    sql("drop table IF EXISTS test_table")
+    sql("drop table IF EXISTS test_table1")
+    sql("drop table IF EXISTS main_table")
+    sql("drop table IF EXISTS dimensiontable")
+    sql("drop table if exists products")
+    sql("drop table if exists sales")
+    sql("drop table if exists products1")
+    sql("drop table if exists sales1")
+  }
+
+  test("test Incremental Loading on rebuild MV Datamap") {
+    //create table and load data
+    createTableFactTable("test_table")
+    loadDataToFactTable("test_table")
+    createTableFactTable("test_table1")
+    loadDataToFactTable("test_table1")
+    //create datamap on table test_table
+    sql("drop datamap if exists datamap1")
+    sql(
+      "create datamap datamap1 using 'mv' with deferred rebuild as select empname, designation " +
+      "from test_table")
+    val query: String = "select empname from test_table"
+    val df1 = sql(s"$query")
+    val analyzed1 = df1.queryExecution.analyzed
+    assert(!verifyMVDataMap(analyzed1, "datamap1"))
+    sql(s"rebuild datamap datamap1")
+    val dataMapTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+      "datamap1_table"
+    )
+    var loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+    var segmentMap = DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetails(0).getExtraInfo)
+    val segmentList = new java.util.ArrayList[String]()
+    segmentList.add("0")
+    assert(segmentList.containsAll( segmentMap.get("default.test_table")))
+    val df2 = sql(s"$query")
+    val analyzed2 = df2.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed2, "datamap1"))
+    loadDataToFactTable("test_table")
+    loadDataToFactTable("test_table1")
+    sql(s"rebuild datamap datamap1")
+    loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+    segmentMap = DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetails(1).getExtraInfo)
+    segmentList.clear()
+    segmentList.add("1")
+    assert(segmentList.containsAll( segmentMap.get("default.test_table")))
+    checkAnswer(sql("select empname, designation from test_table"),
+      sql("select empname, designation from test_table1"))
+    val df3 = sql(s"$query")
+    val analyzed3 = df3.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed3, "datamap1"))
+    loadDataToFactTable("test_table")
+    loadDataToFactTable("test_table1")
+    val df4 = sql(s"$query")
+    val analyzed4 = df4.queryExecution.analyzed
+    assert(!verifyMVDataMap(analyzed4, "datamap1"))
+    checkAnswer(sql("select empname, designation from test_table"),
+      sql("select empname, designation from test_table1"))
+  }
+
+  test("test MV incremental loading with main table having Marked for Delete segments") {
+    createTableFactTable("test_table")
+    loadDataToFactTable("test_table")
+    createTableFactTable("test_table1")
+    loadDataToFactTable("test_table1")
+    loadDataToFactTable("test_table")
+    loadDataToFactTable("test_table1")
+    sql("Delete from table test_table where segment.id in (0)")
+    sql("Delete from table test_table1 where segment.id in (0)")
+    sql("drop datamap if exists datamap1")
+    sql("create datamap datamap1 using 'mv'  with deferred rebuild as select empname, designation " +
+      "from test_table")
+    loadDataToFactTable("test_table")
+    loadDataToFactTable("test_table1")
+    sql(s"rebuild datamap datamap1")
+    val dataMapTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+      "datamap1_table")
+    val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+    val segmentMap = DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetails(0).getExtraInfo)
+    val segmentList = new java.util.ArrayList[String]()
+    segmentList.add("1")
+    segmentList.add("2")
+    assert(segmentList.containsAll( segmentMap.get("default.test_table")))
+    checkAnswer(sql("select empname, designation from test_table"),
+      sql("select empname, designation from test_table1"))
+    dropTable("test_table")
+    dropTable("test_table1")
+  }
+
+  test("test MV incremental loading with update operation on main table") {
+    sql("drop table IF EXISTS main_table")
+    sql("drop table IF EXISTS testtable")
+    sql("create table main_table(a string,b string,c int) stored by 'carbondata'")
+    sql("insert into main_table values('a','abc',1)")
+    sql("insert into main_table values('b','bcd',2)")
+    sql("create table testtable(a string,b string,c int) stored by 'carbondata'")
+    sql("insert into testtable values('a','abc',1)")
+    sql("insert into testtable values('b','bcd',2)")
+    sql("drop datamap if exists datamap1")
+    sql("create datamap datamap1 using 'mv' with deferred rebuild as select a, sum(b) from main_table group by a")
+    sql(s"rebuild datamap datamap1")
+    var df = sql(
+      s"""select a, sum(b) from main_table group by a""".stripMargin)
+    var analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap1"))
+    checkAnswer(sql(" select a, sum(b) from testtable group by a"),
+      sql(" select a, sum(b) from main_table group by a"))
+    sql("update main_table set(a) = ('aaa') where b = 'abc'").show(false)
+    sql("update testtable set(a) = ('aaa') where b = 'abc'").show(false)
+    val dataMapTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+      "datamap1_table"
+    )
+    var loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+    assert(loadMetadataDetails(0).getSegmentStatus == SegmentStatus.MARKED_FOR_DELETE)
+    checkAnswer(sql("select * from main_table"), sql("select * from testtable"))
+    sql(s"rebuild datamap datamap1")
+    loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+    val segmentMap = DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetails(1).getExtraInfo)
+    val segmentList = new java.util.ArrayList[String]()
+    segmentList.add("0")
+    segmentList.add("1")
+    assert(segmentList.containsAll( segmentMap.get("default.main_table")))
+    df = sql(s""" select a, sum(b) from main_table group by a""".stripMargin)
+    analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap1"))
+    checkAnswer(sql(" select a, sum(b) from testtable group by a"),
+      sql(" select a, sum(b) from main_table group by a"))
+    sql("drop table IF EXISTS main_table")
+    sql("drop table IF EXISTS testtable")
+  }
+
+  test("test compaction on mv datamap table") {
+    createTableFactTable("test_table")
+    loadDataToFactTable("test_table")
+    sql("drop datamap if exists datamap1")
+    sql(
+      "create datamap datamap1 using 'mv'  with deferred rebuild  as select empname, designation " +
+      "from test_table")
+    loadDataToFactTable("test_table")
+    sql(s"rebuild datamap datamap1")
+    loadDataToFactTable("test_table")
+    sql(s"rebuild datamap datamap1")
+    loadDataToFactTable("test_table")
+    sql(s"rebuild datamap datamap1")
+    checkExistence(sql("show segments for table datamap1_table"), false, "0.1")
+    sql("alter datamap datamap1 compact 'major'")
+    val dataMapTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+      "datamap1_table"
+    )
+    val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+    val segmentMap = DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetails(3).getExtraInfo)
+    val segmentList = new java.util.ArrayList[String]()
+    segmentList.add("0")
+    segmentList.add("1")
+    segmentList.add("2")
+    segmentList.add("3")
+    assert(segmentList.containsAll( segmentMap.get("default.test_table")))
+    checkExistence(sql("show segments for table datamap1_table"), true, "0.1")
+    sql("clean files for table datamap1_table")
+    sql("drop table IF EXISTS test_table")
+  }
+
+  test("test auto-compaction on mv datamap table") {
+    sql("set carbon.enable.auto.load.merge=true")
+    createTableFactTable("test_table")
+    loadDataToFactTable("test_table")
+    sql("drop datamap if exists datamap1")
+    sql(
+      "create datamap datamap1 using 'mv'  with deferred rebuild as select empname, designation " +
+      "from test_table")
+    loadDataToFactTable("test_table")
+    sql(s"rebuild datamap datamap1")
+    loadDataToFactTable("test_table")
+    sql(s"rebuild datamap datamap1")
+    loadDataToFactTable("test_table")
+    sql(s"rebuild datamap datamap1")
+    loadDataToFactTable("test_table")
+    sql(s"rebuild datamap datamap1")
+    loadDataToFactTable("test_table")
+    sql(s"rebuild datamap datamap1")
+    loadDataToFactTable("test_table")
+    sql(s"rebuild datamap datamap1")
+    sql("clean files for table datamap1_table")
+    sql("clean files for table test_table")
+    val dataMapTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+      "datamap1_table")
+    val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+    val segmentMap = DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetails(2).getExtraInfo)
+    val segmentList = new java.util.ArrayList[String]()
+    segmentList.add("0.1")
+    segmentList.add("4")
+    segmentList.add("5")
+    segmentList.add("6")
+    assert(segmentList.containsAll(segmentMap.get("default.test_table")))
+    dropTable("test_table")
+  }
+
+  test("test insert overwrite") {
+    sql("drop table IF EXISTS test_table")
+    sql("create table test_table(a string,b string,c int) stored by 'carbondata'")
+    sql("insert into test_table values('a','abc',1)")
+    sql("insert into test_table values('b','bcd',2)")
+    sql("drop datamap if exists datamap1")
+    sql(
+      "create datamap datamap1 using 'mv'  with deferred rebuild as select a, sum(b) from test_table  group by a")
+    sql(s"rebuild datamap datamap1")
+    checkAnswer(sql(" select a, sum(b) from test_table  group by a"), Seq(Row("a", null), Row("b", null)))
+    sql("insert overwrite table test_table select 'd','abc',3")
+    val dataMapTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+      "datamap1_table")
+    var loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+    assert(loadMetadataDetails(0).getSegmentStatus == SegmentStatus.MARKED_FOR_DELETE)
+    checkAnswer(sql(" select a, sum(b) from test_table  group by a"), Seq(Row("d", null)))
+    sql(s"rebuild datamap datamap1")
+    loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+    val segmentMap = DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetails(1).getExtraInfo)
+    val segmentList = new java.util.ArrayList[String]()
+    segmentList.add("2")
+    assert(segmentList.containsAll( segmentMap.get("default.test_table")))
+    sql("drop table IF EXISTS test_table")
+  }
+
+  test("test inner join with mv") {
+    sql("drop table if exists products")
+    sql("create table products (product string, amount int) stored by 'carbondata' ")
+    sql(s"load data INPATH '$resourcesPath/products.csv' into table products")
+    sql("drop table if exists sales")
+    sql("create table sales (product string, quantity int) stored by 'carbondata'")
+    sql(s"load data INPATH '$resourcesPath/sales_data.csv' into table sales")
+    sql("drop datamap if exists innerjoin")
+    sql("Create datamap innerjoin using 'mv'  with deferred rebuild as Select p.product, p.amount, s.quantity, s.product from " +
+        "products p, sales s where p.product=s.product")
+    sql("drop table if exists products1")
+    sql("create table products1 (product string, amount int) stored by 'carbondata' ")
+    sql(s"load data INPATH '$resourcesPath/products.csv' into table products1")
+    sql("drop table if exists sales1")
+    sql("create table sales1 (product string, quantity int) stored by 'carbondata'")
+    sql(s"load data INPATH '$resourcesPath/sales_data.csv' into table sales1")
+    sql(s"rebuild datamap innerjoin")
+    checkAnswer(sql("Select p.product, p.amount, s.quantity from products1 p, sales1 s where p.product=s.product"),
+      sql("Select p.product, p.amount, s.quantity from products p, sales s where p.product=s.product"))
+    sql("insert into products values('Biscuits',10)")
+    sql("insert into products1 values('Biscuits',10)")
+    sql("rebuild datamap innerjoin")
+    checkAnswer(sql("Select p.product, p.amount, s.quantity from products1 p, sales1 s where p.product=s.product"),
+      sql("Select p.product, p.amount, s.quantity from products p, sales s where p.product=s.product"))
+    sql("insert into sales values('Biscuits',100)")
+    sql("insert into sales1 values('Biscuits',100)")
+    checkAnswer(sql("Select p.product, p.amount, s.quantity from products1 p, sales1 s where p.product=s.product"),
+      sql("Select p.product, p.amount, s.quantity from products p, sales s where p.product=s.product"))
+  }
+
+  test("test set segments with main table having mv datamap") {
+    sql("drop table IF EXISTS main_table")
+    sql("drop table IF EXISTS test_table")
+    sql("create table main_table(a string,b string,c int) stored by 'carbondata'")
+    sql("insert into main_table values('a','abc',1)")
+    sql("insert into main_table values('b','bcd',2)")
+    sql("create table test_table(a string,b string,c int) stored by 'carbondata'")
+    sql("insert into test_table values('a','abc',1)")
+    sql("insert into test_table values('b','bcd',2)")
+    sql("drop datamap if exists datamap_mt")
+    sql(
+      "create datamap datamap_mt using 'mv'  with deferred rebuild as select a, sum(b) from main_table  group by a")
+    sql(s"rebuild datamap datamap_mt")
+    checkAnswer(sql("select a, sum(b) from main_table  group by a"),
+      sql("select a, sum(b) from test_table  group by a"))
+    sql("SET carbon.input.segments.default.main_table = 1")
+    sql("SET carbon.input.segments.default.test_table=1")
+    checkAnswer(sql("select a, sum(b) from main_table  group by a"),
+      sql("select a, sum(b) from test_table  group by a"))
+    sql("drop table IF EXISTS main_table")
+    sql("drop table IF EXISTS test_table")
+  }
+
+
+  test("test set segments with main table having mv datamap before rebuild") {
+    sql("drop table IF EXISTS main_table")
+    sql("create table main_table(a string,b string,c int) stored by 'carbondata'")
+    sql("insert into main_table values('a','abc',1)")
+    sql("insert into main_table values('b','bcd',2)")
+    sql("drop datamap if exists datamap1")
+    sql("create datamap datamap1 using 'mv'  with deferred rebuild as select a, sum(c) from main_table  group by a")
+    sql("SET carbon.input.segments.default.main_table=1")
+    sql(s"rebuild datamap datamap1")
+    val df = sql("select a, sum(c) from main_table  group by a")
+    val analyzed = df.queryExecution.analyzed
+    assert(!verifyMVDataMap(analyzed, "datamap1"))
+    sql("reset")
+    checkAnswer(sql("select a, sum(c) from main_table  group by a"), Seq(Row("a", 1), Row("b", 2)))
+    val df1= sql("select a, sum(c) from main_table  group by a")
+    val analyzed1 = df1.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed1, "datamap1"))
+    sql("drop table IF EXISTS main_table")
+  }
+
+  test("test datamap table after datamap table compaction- custom") {
+    sql("drop table IF EXISTS main_table")
+    sql("create table main_table(a string,b string,c int) stored by 'carbondata'")
+    sql("insert into main_table values('a','abc',1)")
+    sql("insert into main_table values('b','bcd',2)")
+    sql("drop datamap if exists datamap1")
+    sql("create datamap datamap1 using 'mv'  with deferred rebuild as select a, sum(b) from main_table  group by a")
+    sql(s"rebuild datamap datamap1")
+    sql("insert into main_table values('a','abc',1)")
+    sql("insert into main_table values('b','bcd',2)")
+    sql(s"rebuild datamap datamap1")
+    sql("alter datamap datamap1 compact 'custom' where segment.id in (0,1)")
+    val dataMapTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+      "datamap1_table")
+    val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+    assert(loadMetadataDetails(0).getSegmentStatus == SegmentStatus.COMPACTED)
+    assert(loadMetadataDetails(1).getSegmentStatus == SegmentStatus.COMPACTED)
+    var segmentMap = DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetails(2).getExtraInfo)
+    val segmentList = new java.util.ArrayList[String]()
+    segmentList.add("0")
+    segmentList.add("1")
+    segmentList.add("2")
+    segmentList.add("3")
+    assert(segmentList.containsAll(segmentMap.get("default.main_table")))
+    sql("drop table IF EXISTS main_table")
+  }
+
+  test("test sum(a) + sum(b)") {
+    // Full rebuild will happen in this case
+    sql("drop table IF EXISTS main_table")
+    sql("create table main_table(a int,b int,c int) stored by 'carbondata'")
+    sql("insert into main_table values(1,2,3)")
+    sql("insert into main_table values(1,4,5)")
+    sql("drop datamap if exists datamap_1")
+    sql("create datamap datamap_1 using 'mv'  with deferred rebuild as select sum(a)+sum(b) from main_table")
+    checkAnswer(sql("select sum(a)+sum(b) from main_table"), Seq(Row(8)))
+    sql("rebuild datamap datamap_1")
+    checkAnswer(sql("select sum(a)+sum(b) from main_table"), Seq(Row(8)))
+    sql("insert into main_table values(1,2,3)")
+    sql("insert into main_table values(1,4,5)")
+    checkAnswer(sql("select sum(a)+sum(b) from main_table"), Seq(Row(16)))
+    sql("rebuild datamap datamap_1")
+    checkAnswer(sql("select sum(a)+sum(b) from main_table"), Seq(Row(16)))
+    sql("drop table IF EXISTS main_table")
+  }
+
+  test("test Incremental Loading on non-lazy mv Datamap") {
+    //create table and load data
+    createTableFactTable("test_table")
+    loadDataToFactTable("test_table")
+    createTableFactTable("test_table1")
+    loadDataToFactTable("test_table1")
+    //create datamap on table test_table
+    sql("drop datamap if exists datamap1")
+    sql(
+      "create datamap datamap1 using 'mv' as select empname, designation " +
+      "from test_table")
+    val query: String = "select empname from test_table"
+    val dataMapTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+      "datamap1_table"
+    )
+    var loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+    var segmentMap = DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetails(0).getExtraInfo)
+    val segmentList = new java.util.ArrayList[String]()
+    segmentList.add("0")
+    assert(segmentList.containsAll( segmentMap.get("default.test_table")))
+    val df2 = sql(s"$query")
+    val analyzed2 = df2.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed2, "datamap1"))
+    loadDataToFactTable("test_table")
+    loadDataToFactTable("test_table1")
+    loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+    segmentMap = DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetails(1).getExtraInfo)
+    segmentList.clear()
+    segmentList.add("1")
+    assert(segmentList.containsAll( segmentMap.get("default.test_table")))
+    checkAnswer(sql("select empname, designation from test_table"),
+      sql("select empname, designation from test_table1"))
+    val df3 = sql(s"$query")
+    val analyzed3 = df3.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed3, "datamap1"))
+    loadDataToFactTable("test_table")
+    loadDataToFactTable("test_table1")
+    val df4 = sql(s"$query")
+    val analyzed4 = df4.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed4, "datamap1"))
+    checkAnswer(sql("select empname, designation from test_table"),
+      sql("select empname, designation from test_table1"))
+  }
+
+  test("test MV incremental loading on non-lazy datamap with update operation on main table") {
+    sql("drop table IF EXISTS main_table")
+    sql("drop table IF EXISTS testtable")
+    sql("create table main_table(a string,b string,c int) stored by 'carbondata'")
+    sql("insert into main_table values('a','abc',1)")
+    sql("insert into main_table values('b','bcd',2)")
+    sql("create table testtable(a string,b string,c int) stored by 'carbondata'")
+    sql("insert into testtable values('a','abc',1)")
+    sql("insert into testtable values('b','bcd',2)")
+    sql("drop datamap if exists datamap1")
+    sql("create datamap datamap1 using 'mv' as select a, sum(b) from main_table group by a")
+    var df = sql(s"""select a, sum(b) from main_table group by a""".stripMargin)
+    var analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap1"))
+    checkAnswer(sql(" select a, sum(b) from testtable group by a"),
+      sql(" select a, sum(b) from main_table group by a"))
+    sql("update main_table set(a) = ('aaa') where b = 'abc'").show(false)
+    sql("update testtable set(a) = ('aaa') where b = 'abc'").show(false)
+    val dataMapTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+      "datamap1_table")
+    var loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+    assert(loadMetadataDetails(0).getSegmentStatus == SegmentStatus.MARKED_FOR_DELETE)
+    var segmentMap = DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetails(1).getExtraInfo)
+    val segmentList = new java.util.ArrayList[String]()
+    segmentList.add("0")
+    segmentList.add("1")
+    assert(segmentList.containsAll(segmentMap.get("default.main_table")))
+    df = sql(s""" select a, sum(b) from main_table group by a""".stripMargin)
+    analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap1"))
+    checkAnswer(sql(" select a, sum(b) from testtable group by a"),
+      sql(" select a, sum(b) from main_table group by a"))
+    sql("drop table IF EXISTS main_table")
+    sql("drop table IF EXISTS testtable")
+  }
+
+  test("test MV incremental loading on non-lazy datamap with delete operation on main table") {
+    sql("drop table IF EXISTS main_table")
+    sql("drop table IF EXISTS testtable")
+    sql("create table main_table(a string,b string,c int) stored by 'carbondata'")
+    sql("insert into main_table values('a','abc',1)")
+    sql("insert into main_table values('b','bcd',2)")
+    sql("create table testtable(a string,b string,c int) stored by 'carbondata'")
+    sql("insert into testtable values('a','abc',1)")
+    sql("insert into testtable values('b','bcd',2)")
+    sql("drop datamap if exists datamap1")
+    sql("create datamap datamap1 using 'mv' as select a, sum(b) from main_table group by a")
+    var df = sql(s"""select a, sum(b) from main_table group by a""".stripMargin)
+    var analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap1"))
+    checkAnswer(sql(" select a, sum(b) from testtable group by a"),
+      sql(" select a, sum(b) from main_table group by a"))
+    sql("delete from  main_table  where b = 'abc'").show(false)
+    sql("delete from  testtable  where b = 'abc'").show(false)
+    val dataMapTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+      "datamap1_table")
+    var loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+    assert(loadMetadataDetails(0).getSegmentStatus == SegmentStatus.MARKED_FOR_DELETE)
+    var segmentMap = DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetails(1).getExtraInfo)
+    val segmentList = new java.util.ArrayList[String]()
+    segmentList.add("0")
+    segmentList.add("1")
+    assert(segmentList.containsAll(segmentMap.get("default.main_table")))
+    df = sql(s""" select a, sum(b) from main_table group by a""".stripMargin)
+    analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap1"))
+    checkAnswer(sql(" select a, sum(b) from testtable group by a"),
+      sql(" select a, sum(b) from main_table group by a"))
+    sql("drop table IF EXISTS main_table")
+    sql("drop table IF EXISTS testtable")
+  }
+
+  test("test whether datamap table is compacted after main table compaction") {
+    sql("drop table IF EXISTS main_table")
+    sql("create table main_table(a string,b string,c int) stored by 'carbondata'")
+    sql("insert into main_table values('a','abc',1)")
+    sql("insert into main_table values('b','bcd',2)")
+    sql("drop datamap if exists datamap1")
+    sql("create datamap datamap1 using 'mv' as select a, sum(b) from main_table group by a")
+    sql("insert into main_table values('c','abc',1)")
+    sql("insert into main_table values('d','bcd',2)")
+    sql("alter table main_table compact 'major'")
+    checkExistence(sql("show segments for table main_table"), true, "0.1")
+    checkExistence(sql("show segments for table datamap1_table"), true, "0.1")
+    sql("drop table IF EXISTS main_table")
+  }
+
+  test("test delete record when table contains single segment") {
+    sql("drop table IF EXISTS main_table")
+    sql("create table main_table(a string,b string,c int) stored by 'carbondata'")
+    sql("insert into main_table values('a','abc',1)")
+    sql("drop datamap if exists datamap1")
+    sql("create datamap datamap1 using 'mv' as select a, sum(b) from main_table group by a")
+    sql("delete from  main_table  where b = 'abc'").show(false)
+    val dataMapTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+      "datamap1_table")
+    val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+    assert(loadMetadataDetails.length == 1)
+    assert(loadMetadataDetails(0).getSegmentStatus == SegmentStatus.MARKED_FOR_DELETE)
+  }
+
+  test("set segments on datamap table") {
+    sql("drop table IF EXISTS main_table")
+    sql("create table main_table(a string,b string,c int) stored by 'carbondata'")
+    sql("insert into main_table values('a','abc',1)")
+    sql("drop datamap if exists datamap1")
+    sql("create datamap datamap1 using 'mv' as select a,b from main_table")
+    sql("insert into main_table values('b','abcd',1)")
+    sql("SET carbon.input.segments.default.datamap1_table=0")
+    assert(sql("select a,b from main_table").count() == 1)
+    sql("drop table IF EXISTS main_table")
+  }
+
+  def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = {
+    val tables = logicalPlan collect {
+      case l: LogicalRelation => l.catalogTable.get
+    }
+    tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName + "_table"))
+  }
+
+  override def afterAll(): Unit = {
+    sql("drop table if exists products")
+    sql("drop table if exists sales")
+    sql("drop table if exists products1")
+    sql("drop table if exists sales1")
+    sql("drop table IF EXISTS test_table")
+    sql("drop table IF EXISTS test_table1")
+    sql("drop table IF EXISTS main_table")
+    sql("drop table IF EXISTS dimensiontable")
+  }
+
+  private def createTableFactTable(tableName: String) = {
+    sql(s"drop table IF EXISTS $tableName")
+    sql(
+      s"""
+         | CREATE TABLE $tableName (empname String, designation String, doj Timestamp,
+         |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+         |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+         |  utilization int,salary int)
+         | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+  }
+
+  private def loadDataToFactTable(tableName: String) = {
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE $tableName  OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+  }
+}
\ No newline at end of file
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVSampleTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVSampleTestCase.scala
index 6068ef5..1747e51 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVSampleTestCase.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVSampleTestCase.scala
@@ -83,7 +83,6 @@ class MVSampleTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with sampleTestCases case_1") {
     sql(s"drop datamap if exists datamap_sm1")
     sql(s"create datamap datamap_sm1 using 'mv' as ${sampleTestCases(0)._2}")
-    sql(s"rebuild datamap datamap_sm1")
     val df = sql(sampleTestCases(0)._3)
     val analyzed = df.queryExecution.analyzed
     assert(!verifyMVDataMap(analyzed, "datamap_sm1"))
@@ -93,7 +92,6 @@ class MVSampleTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with sampleTestCases case_3") {
     sql(s"drop datamap if exists datamap_sm2")
     sql(s"create datamap datamap_sm2 using 'mv' as ${sampleTestCases(2)._2}")
-    sql(s"rebuild datamap datamap_sm2")
     val df = sql(sampleTestCases(2)._3)
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap_sm2"))
@@ -103,7 +101,6 @@ class MVSampleTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with sampleTestCases case_4") {
     sql(s"drop datamap if exists datamap_sm3")
     sql(s"create datamap datamap_sm3 using 'mv' as ${sampleTestCases(3)._2}")
-    sql(s"rebuild datamap datamap_sm3")
     val df = sql(sampleTestCases(3)._3)
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap_sm3"))
@@ -113,7 +110,6 @@ class MVSampleTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with sampleTestCases case_5") {
     sql(s"drop datamap if exists datamap_sm4")
     sql(s"create datamap datamap_sm4 using 'mv' as ${sampleTestCases(4)._2}")
-    sql(s"rebuild datamap datamap_sm4")
     val df = sql(sampleTestCases(4)._3)
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap_sm4"))
@@ -123,7 +119,6 @@ class MVSampleTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with sampleTestCases case_6") {
     sql(s"drop datamap if exists datamap_sm5")
     sql(s"create datamap datamap_sm5 using 'mv' as ${sampleTestCases(5)._2}")
-    sql(s"rebuild datamap datamap_sm5")
     val df = sql(sampleTestCases(5)._3)
     val analyzed = df.queryExecution.analyzed
     assert(!verifyMVDataMap(analyzed, "datamap_sm5"))
@@ -133,7 +128,6 @@ class MVSampleTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with sampleTestCases case_7") {
     sql(s"drop datamap if exists datamap_sm6")
     sql(s"create datamap datamap_sm6 using 'mv' as ${sampleTestCases(6)._2}")
-    sql(s"rebuild datamap datamap_sm6")
     val df = sql(sampleTestCases(6)._3)
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap_sm6"))
@@ -143,7 +137,6 @@ class MVSampleTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with sampleTestCases case_8") {
     sql(s"drop datamap if exists datamap_sm7")
     sql(s"create datamap datamap_sm7 using 'mv' as ${sampleTestCases(7)._2}")
-    sql(s"rebuild datamap datamap_sm7")
     val df = sql(sampleTestCases(7)._3)
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap_sm7"))
@@ -153,7 +146,6 @@ class MVSampleTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with sampleTestCases case_9") {
     sql(s"drop datamap if exists datamap_sm8")
     sql(s"create datamap datamap_sm8 using 'mv' as ${sampleTestCases(8)._2}")
-    sql(s"rebuild datamap datamap_sm8")
     val df = sql(sampleTestCases(8)._3)
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap_sm8"))
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTPCDSTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTPCDSTestCase.scala
index b2d03e1..7304527 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTPCDSTestCase.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTPCDSTestCase.scala
@@ -51,7 +51,6 @@ class MVTPCDSTestCase extends QueryTest with BeforeAndAfterAll {
   ignore("test create datamap with tpcds_1_4_testCases case_1") {
     sql(s"drop datamap if exists datamap_tpcds1")
     sql(s"create datamap datamap_tpcds1 using 'mv' as ${tpcds_1_4_testCases(0)._2}")
-    sql(s"rebuild datamap datamap_tpcds1")
     val df = sql(tpcds_1_4_testCases(0)._3)
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap_tpcds1"))
@@ -61,7 +60,6 @@ class MVTPCDSTestCase extends QueryTest with BeforeAndAfterAll {
   ignore("test create datamap with tpcds_1_4_testCases case_3") {
     sql(s"drop datamap if exists datamap_tpcds3")
     sql(s"create datamap datamap_tpcds3 using 'mv' as ${tpcds_1_4_testCases(2)._2}")
-    sql(s"rebuild datamap datamap_tpcds3")
     val df = sql(tpcds_1_4_testCases(2)._3)
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap_tpcds3"))
@@ -71,7 +69,6 @@ class MVTPCDSTestCase extends QueryTest with BeforeAndAfterAll {
   ignore("test create datamap with tpcds_1_4_testCases case_4") {
     sql(s"drop datamap if exists datamap_tpcds4")
     sql(s"create datamap datamap_tpcds4 using 'mv' as ${tpcds_1_4_testCases(3)._2}")
-    sql(s"rebuild datamap datamap_tpcds4")
     val df = sql(tpcds_1_4_testCases(3)._3)
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap_tpcds4"))
@@ -81,7 +78,6 @@ class MVTPCDSTestCase extends QueryTest with BeforeAndAfterAll {
   ignore("test create datamap with tpcds_1_4_testCases case_5") {
     sql(s"drop datamap if exists datamap_tpcds5")
     sql(s"create datamap datamap_tpcds5 using 'mv' as ${tpcds_1_4_testCases(4)._2}")
-    sql(s"rebuild datamap datamap_tpcds5")
     val df = sql(tpcds_1_4_testCases(4)._3)
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap_tpcds5"))
@@ -91,7 +87,6 @@ class MVTPCDSTestCase extends QueryTest with BeforeAndAfterAll {
   ignore("test create datamap with tpcds_1_4_testCases case_6") {
     sql(s"drop datamap if exists datamap_tpcds6")
     sql(s"create datamap datamap_tpcds6 using 'mv' as ${tpcds_1_4_testCases(5)._2}")
-    sql(s"rebuild datamap datamap_tpcds6")
     val df = sql(tpcds_1_4_testCases(5)._3)
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap_tpcds6"))
@@ -101,7 +96,6 @@ class MVTPCDSTestCase extends QueryTest with BeforeAndAfterAll {
   ignore("test create datamap with tpcds_1_4_testCases case_8") {
     sql(s"drop datamap if exists datamap_tpcds8")
     sql(s"create datamap datamap_tpcds8 using 'mv' as ${tpcds_1_4_testCases(7)._2}")
-    sql(s"rebuild datamap datamap_tpcds8")
     val df = sql(tpcds_1_4_testCases(7)._3)
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap_tpcds8"))
@@ -111,7 +105,6 @@ class MVTPCDSTestCase extends QueryTest with BeforeAndAfterAll {
   ignore("test create datamap with tpcds_1_4_testCases case_11") {
     sql(s"drop datamap if exists datamap_tpcds11")
     sql(s"create datamap datamap_tpcds11 using 'mv' as ${tpcds_1_4_testCases(10)._2}")
-    sql(s"rebuild datamap datamap_tpcds11")
     val df = sql(tpcds_1_4_testCases(10)._3)
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap_tpcds11"))
@@ -121,7 +114,6 @@ class MVTPCDSTestCase extends QueryTest with BeforeAndAfterAll {
   ignore("test create datamap with tpcds_1_4_testCases case_15") {
     sql(s"drop datamap if exists datamap_tpcds15")
     sql(s"create datamap datamap_tpcds15 using 'mv' as ${tpcds_1_4_testCases(14)._2}")
-    sql(s"rebuild datamap datamap_tpcds15")
     val df = sql(tpcds_1_4_testCases(14)._3)
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap_tpcds15"))
@@ -131,7 +123,6 @@ class MVTPCDSTestCase extends QueryTest with BeforeAndAfterAll {
   ignore("test create datamap with tpcds_1_4_testCases case_16") {
     sql(s"drop datamap if exists datamap_tpcds16")
     sql(s"create datamap datamap_tpcds16 using 'mv' as ${tpcds_1_4_testCases(15)._2}")
-    sql(s"rebuild datamap datamap_tpcds16")
     val df = sql(tpcds_1_4_testCases(15)._3)
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap_tpcds16"))
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala
index ff5bdac..5788a23 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala
@@ -71,7 +71,6 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with tpch1") {
     sql(s"drop datamap if exists datamap1")
     sql("create datamap datamap1 using 'mv' as select l_returnflag, l_linestatus,l_shipdate, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge,count(*) as count_order from lineitem group by l_returnflag, l_linestatus,l_shipdate")
-    sql(s"rebuild datamap datamap1")
     val df = sql("select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge,count(*) as count_order from lineitem where l_shipdate <= date('1998-09-02') group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus")
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap1"))
@@ -82,7 +81,6 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with tpch1 with order") {
     sql(s"drop datamap if exists datamap2")
     sql("create datamap datamap2 using 'mv' as select l_returnflag, l_linestatus,l_shipdate, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge from lineitem group by l_returnflag, l_linestatus,l_shipdate order by l_returnflag, l_linestatus")
-    sql(s"rebuild datamap datamap2")
     val df = sql("select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge from lineitem where l_shipdate <= date('1998-09-02') group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus")
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap2"))
@@ -93,7 +91,6 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with tpch1 with sub group by") {
     sql(s"drop datamap if exists datamap3")
     sql("create datamap datamap3 using 'mv' as select l_returnflag, l_linestatus,l_shipdate, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge from lineitem group by l_returnflag, l_linestatus,l_shipdate")
-    sql(s"rebuild datamap datamap3")
     val df = sql("select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price from lineitem where l_shipdate <= date('1998-09-02') group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus")
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap3"))
@@ -104,7 +101,6 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
   ignore("test create datamap with tpch3") {
     sql(s"drop datamap if exists datamap4")
     sql("create datamap datamap4 using 'mv' as select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority from customer, orders, lineitem where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group by l_orderkey, o_orderdate, o_shippriority")
-    sql(s"rebuild datamap datamap4")
     val df = sql("select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority from customer, orders, lineitem where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate limit 10")
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap4"))
@@ -115,7 +111,6 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with tpch3 with no filters on mv") {
     sql(s"drop datamap if exists datamap5")
     sql("create datamap datamap5 using 'mv' as select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority,c_mktsegment,l_shipdate from customer, orders, lineitem where c_custkey = o_custkey and l_orderkey = o_orderkey group by l_orderkey, o_orderdate, o_shippriority,c_mktsegment,l_shipdate")
-    sql(s"rebuild datamap datamap5")
     val df = sql("select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority from customer, orders, lineitem where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate limit 10")
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap5"))
@@ -126,7 +121,6 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
   ignore("test create datamap with tpch3 with filters on mv and all filter columns on projection") {
     sql(s"drop datamap if exists datamap5")
     sql("create datamap datamap5 using 'mv' as select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority,c_mktsegment,l_shipdate from customer, orders, lineitem where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group by l_orderkey, o_orderdate, o_shippriority,c_mktsegment,l_shipdate")
-    sql(s"rebuild datamap datamap5")
     val df = sql("select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority from customer, orders, lineitem where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate limit 10")
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap5"))
@@ -137,7 +131,6 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
   ignore("test create datamap with tpch4 (core issue)") {
     sql(s"drop datamap if exists datamap6")
     sql("create datamap datamap6 using 'mv' as select o_orderpriority, count(*) as order_count from orders where o_orderdate >= date('1993-07-01') and o_orderdate < date('1993-10-01') and exists ( select * from lineitem where l_orderkey = o_orderkey and l_commitdate < l_receiptdate ) group by o_orderpriority order by o_orderpriority")
-    sql(s"rebuild datamap datamap6")
     val df = sql("select o_orderpriority, count(*) as order_count from orders where o_orderdate >= date('1993-07-01') and o_orderdate < date('1993-10-01') and exists ( select * from lineitem where l_orderkey = o_orderkey and l_commitdate < l_receiptdate ) group by o_orderpriority order by o_orderpriority")
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap6"))
@@ -148,7 +141,6 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
   ignore("test create datamap with tpch5") {
     sql(s"drop datamap if exists datamap7")
     sql("create datamap datamap7 using 'mv' as select n_name, sum(l_extendedprice * (1 - l_discount)) as revenue from customer, orders, lineitem, supplier, nation, region where c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and c_nationkey = s_nationkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'ASIA' and o_orderdate >= date('1994-01-01') and o_orderdate < date('1995-01-01') group by n_name")
-    sql(s"rebuild datamap datamap7")
     val df = sql("select n_name, sum(l_extendedprice * (1 - l_discount)) as revenue from customer, orders, lineitem, supplier, nation, region where c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and c_nationkey = s_nationkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'ASIA' and o_orderdate >= date('1994-01-01') and o_orderdate < date('1995-01-01') group by n_name order by revenue desc")
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap7"))
@@ -159,7 +151,6 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with tpch5 with no filters on mv") {
     sql(s"drop datamap if exists datamap8")
     sql("create datamap datamap8 using 'mv' as select n_name,o_orderdate,r_name, sum(l_extendedprice * (1 - l_discount)) as revenue from customer, orders, lineitem, supplier, nation, region where c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and c_nationkey = s_nationkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey  group by n_name,o_orderdate,r_name")
-    sql(s"rebuild datamap datamap8")
     val df = sql("select n_name, sum(l_extendedprice * (1 - l_discount)) as revenue from customer, orders, lineitem, supplier, nation, region where c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and c_nationkey = s_nationkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'ASIA' and o_orderdate >= date('1994-01-01') and o_orderdate < date('1995-01-01') group by n_name order by revenue desc")
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap8"))
@@ -170,7 +161,6 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with tpch6") {
     sql(s"drop datamap if exists datamap9")
     sql("create datamap datamap9 using 'mv' as select sum(l_extendedprice * l_discount) as revenue from lineitem where l_shipdate >= date('1994-01-01') and l_shipdate < date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24")
-    sql(s"rebuild datamap datamap9")
     val df = sql("select sum(l_extendedprice * l_discount) as revenue from lineitem where l_shipdate >= date('1994-01-01') and l_shipdate < date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24")
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap9"))
@@ -182,7 +172,6 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with tpch6 with no filters on mv") {
     sql(s"drop datamap if exists datamap10")
     sql("create datamap datamap10 using 'mv' as select sum(l_extendedprice * l_discount) as revenue,l_shipdate,l_discount,l_quantity from lineitem group by l_shipdate,l_discount,l_quantity")
-    sql(s"rebuild datamap datamap10")
     val df = sql("select sum(l_extendedprice * l_discount) as revenue from lineitem where l_shipdate >= date('1994-01-01') and l_shipdate < date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24")
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap10"))
@@ -194,7 +183,6 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with tpch7 part of query1") {
     sql(s"drop datamap if exists datamap11")
     sql("create datamap datamap11 using 'mv' as select l_shipdate,n_name , l_extendedprice , l_discount from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey")
-    sql(s"rebuild datamap datamap11")
     val df = sql("select year(l_shipdate) as l_year, l_extendedprice * (1 - l_discount) as volume from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey and ( (n1.n_name = 'FRANCE') or (n1.n_name = 'GERMANY') ) and l_shipdate between date('1995-01-01') and date('1996-12-31')")
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap11"))
@@ -205,7 +193,6 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
   test("test create datamap with tpch7 part of query2 (core issue)") {
     sql(s"drop datamap if exists datamap12")
     sql("create datamap datamap12 using 'mv' as select n1.n_name, l_shipdate, l_extendedprice ,l_discount from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey")
-    sql(s"rebuild datamap datamap12")
     val df = sql("select supp_nation, l_year, sum(volume) as revenue from ( select n1.n_name as supp_nation, year(l_shipdate) as l_year, l_extendedprice * (1 - l_discount) as volume from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey and ( (n1.n_name = 'FRANCE' ) or (n1.n_name = 'GERMANY') ) and l_shipdate between date('1995-01-01') and date('19 [...]
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap12"))
@@ -216,7 +203,6 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
   ignore("test create datamap with tpch7 part of query3 (self join issue)") {
     sql(s"drop datamap if exists datamap13")
     sql("create datamap datamap13 using 'mv' as select n1.n_name as supp_nation, n2.n_name as cust_nation, l_shipdate, l_extendedprice * (1 - l_discount) as volume from supplier,lineitem,orders,customer,nation n1,nation n2 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n2.n_nationkey")
-    sql(s"rebuild datamap datamap13")
     val df = sql("select supp_nation, cust_nation, l_year, sum(volume) as revenue from ( select n1.n_name as supp_nation, n2.n_name as cust_nation, year(l_shipdate) as l_year, l_extendedprice * (1 - l_discount) as volume from supplier,lineitem,orders,customer,nation n1,nation n2 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n2.n_nationkey and ( (n1.n_name = 'FRANCE' and n2.n_name = 'GERMANY') or (n1.n_ [...]
     val analyzed = df.queryExecution.analyzed
     assert(verifyMVDataMap(analyzed, "datamap13"))
diff --git a/integration/spark-common-test/src/test/resources/products.csv b/integration/spark-common-test/src/test/resources/products.csv
new file mode 100644
index 0000000..3d0c6a7
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/products.csv
@@ -0,0 +1,5 @@
+product,amount
+Mobile,2000
+Laptop,3000
+Kettle,70
+Washing Machine,1000
\ No newline at end of file
diff --git a/integration/spark-common-test/src/test/resources/sales_data.csv b/integration/spark-common-test/src/test/resources/sales_data.csv
new file mode 100644
index 0000000..c80b9e5
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/sales_data.csv
@@ -0,0 +1,5 @@
+product,quantity
+Mobile,1
+Laptop,10
+Chocolates,200
+Biscuits,800
\ No newline at end of file
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
index 8e1f22a..f6031d1 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.datamap;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.exceptions.MetadataProcessException;
@@ -98,8 +99,9 @@ public class IndexDataMapProvider extends DataMapProvider {
   }
 
   @Override
-  public void rebuild() {
+  public boolean rebuild() {
     IndexDataMapRebuildRDD.rebuildDataMap(sparkSession, getMainTable(), getDataMapSchema());
+    return true;
   }
 
   private DataMapFactory<? extends DataMap> createDataMapFactory()
@@ -132,4 +134,9 @@ public class IndexDataMapProvider extends DataMapProvider {
   public boolean supportRebuild() {
     return dataMapFactory.supportRebuild();
   }
+
+  @Override
+  public boolean rebuildInternal(String newLoadName, Map<String, List<String>> segmentMap) {
+    return false;
+  }
 }
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
index 233c41f..72390ce 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.datamap;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
@@ -94,10 +95,11 @@ public class PreAggregateDataMapProvider extends DataMapProvider {
   }
 
   @Override
-  public void rebuild() {
+  public boolean rebuild() {
     if (helper != null) {
       helper.initData(sparkSession);
     }
+    return true;
   }
 
   @Override
@@ -109,4 +111,9 @@ public class PreAggregateDataMapProvider extends DataMapProvider {
   public boolean supportRebuild() {
     return false;
   }
+
+  @Override
+  public boolean rebuildInternal(String newLoadName, Map<String, List<String>> segmentMap) {
+    return false;
+  }
 }
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 82c893f..682e76c 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -953,6 +953,15 @@ object CarbonDataRDDFactory {
       throw new Exception(errorMessage)
     } else {
       DataMapStatusManager.disableAllLazyDataMaps(carbonTable)
+      if (overwriteTable) {
+        val allDataMapSchemas = DataMapStoreManager.getInstance
+          .getDataMapSchemasOfTable(carbonTable).asScala
+          .filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
+                                   !dataMapSchema.isIndexDataMap).asJava
+        if (!allDataMapSchemas.isEmpty) {
+          DataMapStatusManager.truncateDataMap(allDataMapSchemas)
+        }
+      }
     }
     (done, metadataDetails)
   }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 149e45e..d9dec68 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
 import org.apache.spark.sql.events.{MergeBloomIndexEventListener, MergeIndexEventListener}
 import org.apache.spark.sql.execution.command.cache._
+import org.apache.spark.sql.execution.command.mv.{AlterDataMaptableCompactionPostListener, LoadPostDataMapListener}
 import org.apache.spark.sql.execution.command.preaaggregate._
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
 import org.apache.spark.sql.hive._
@@ -37,7 +38,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util._
 import org.apache.carbondata.datamap.{TextMatchMaxDocUDF, TextMatchUDF}
 import org.apache.carbondata.events._
-import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
+import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
 import org.apache.carbondata.spark.rdd.SparkReadSupport
 import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
 
@@ -173,6 +174,8 @@ object CarbonEnv {
       .addListener(classOf[LoadTablePreExecutionEvent], LoadPreAggregateTablePreListener)
       .addListener(classOf[AlterTableCompactionPreStatusUpdateEvent],
         AlterPreAggregateTableCompactionPostListener)
+      .addListener(classOf[AlterTableCompactionPreStatusUpdateEvent],
+        AlterDataMaptableCompactionPostListener)
       .addListener(classOf[LoadMetadataEvent], LoadProcessMetaListener)
       .addListener(classOf[LoadMetadataEvent], CompactionProcessMetaListener)
       .addListener(classOf[LoadTablePostStatusUpdateEvent], CommitPreAggregateListener)
@@ -183,6 +186,9 @@ object CarbonEnv {
         AlterTableDropPartitionPostStatusListener)
       .addListener(classOf[AlterTableDropPartitionMetaEvent], AlterTableDropPartitionMetaListener)
       .addListener(classOf[LoadTablePreStatusUpdateEvent], new MergeIndexEventListener)
+      .addListener(classOf[LoadTablePostExecutionEvent], LoadPostDataMapListener)
+      .addListener(classOf[UpdateTablePostEvent], LoadPostDataMapListener )
+      .addListener(classOf[DeleteFromTablePostEvent], LoadPostDataMapListener )
       .addListener(classOf[AlterTableMergeIndexEvent], new MergeIndexEventListener)
       .addListener(classOf[BuildDataMapPostExecutionEvent], new MergeBloomIndexEventListener)
       .addListener(classOf[DropTableCacheEvent], DropCachePreAggEventListener)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 271a19b..d9a6490 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -87,13 +87,6 @@ case class CarbonCreateDataMapCommand(
 
     val property = dmProperties.map(x => (x._1.trim, x._2.trim)).asJava
     val javaMap = new java.util.HashMap[String, String](property)
-    // for MV, it is deferred rebuild by default and cannot be non-deferred rebuild
-    if (dataMapSchema.getProviderName.equalsIgnoreCase(DataMapClassProvider.MV.getShortName)) {
-      if (!deferredRebuild) {
-        LOGGER.warn(s"DEFERRED REBUILD is enabled by default for MV datamap $dataMapName")
-      }
-      deferredRebuild = true
-    }
     javaMap.put(DataMapProperty.DEFERRED_REBUILD, deferredRebuild.toString)
     dataMapSchema.setProperties(javaMap)
 
@@ -180,6 +173,10 @@ case class CarbonCreateDataMapCommand(
             operationContext)
         }
       }
+      if (null != dataMapSchema.getRelationIdentifier && !dataMapSchema.isIndexDataMap &&
+          !dataMapSchema.isLazy) {
+        DataMapStatusManager.enableDataMap(dataMapName)
+      }
     }
     Seq.empty
   }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
index 267fedd..345c5a5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
@@ -23,6 +23,8 @@ import org.apache.spark.sql.execution.command.DataCommand
 
 import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.MV
+import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.datamap.DataMapManager
 import org.apache.carbondata.events.{UpdateDataMapPostExecutionEvent, _}
@@ -50,7 +52,8 @@ case class CarbonDataMapRebuildCommand(
       }
     }
     val schema = schemaOption.get
-    if (!schema.isLazy) {
+    if (!schema.isLazy &&
+        (schema.isIndexDataMap || schema.isInstanceOf[AggregationDataMapSchema])) {
       throw new MalformedDataMapCommandException(
         s"Non-lazy datamap $dataMapName does not support rebuild")
     }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 9d8bf90..ee8e8f2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -34,6 +34,8 @@ import org.apache.spark.util.AlterTableUtil
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapUtil
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.exception.ConcurrentOperationException
@@ -320,6 +322,7 @@ case class CarbonAlterTableCompactionCommand(
           throw e
       } finally {
         updateLock.unlock()
+        DataMapStatusManager.disableAllLazyDataMaps(carbonTable)
       }
     }
   }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index 4b5b4b6..c142398 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.MV
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events._
@@ -70,6 +71,19 @@ case class CarbonCleanFilesCommand(
             isInternalCleanCall = true)
       }.toList
       cleanFileCommands.foreach(_.processMetadata(sparkSession))
+    } else if (CarbonTable.hasMVDataMap(carbonTable)) {
+      val allDataMapSchemas = DataMapStoreManager.getInstance
+        .getDataMapSchemasOfTable(carbonTable).asScala
+        .filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
+                                 !dataMapSchema.isIndexDataMap)
+      cleanFileCommands = allDataMapSchemas.map {
+        dataMapSchema =>
+          val relationIdentifier = dataMapSchema.getRelationIdentifier
+          CarbonCleanFilesCommand(
+            Some(relationIdentifier.getDatabaseName), Some(relationIdentifier.getTableName),
+            isInternalCleanCall = true)
+      }.toList
+      cleanFileCommands.foreach(_.processMetadata(sparkSession))
     } else if (carbonTable.isChildDataMap && !isInternalCleanCall) {
       throwMetadataException(
         carbonTable.getDatabaseName, carbonTable.getTableName,
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index 8b8fe0d..3521c97 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -17,12 +17,16 @@
 
 package org.apache.spark.sql.execution.command.mutation
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command._
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
@@ -118,6 +122,14 @@ private[sql] case class CarbonProjectForDeleteCommand(
         throw new Exception(executorErrors.errorMsg)
       }
 
+      val allDataMapSchemas = DataMapStoreManager.getInstance
+        .getDataMapSchemasOfTable(carbonTable).asScala
+        .filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
+                                 !dataMapSchema.isIndexDataMap).asJava
+      if (!allDataMapSchemas.isEmpty) {
+        DataMapStatusManager.truncateDataMap(allDataMapSchemas)
+      }
+
       // trigger post event for Delete from table
       val deleteFromTablePostEvent: DeleteFromTablePostEvent =
         DeleteFromTablePostEvent(sparkSession, carbonTable)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 9fbf745..20a8435 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.command.mutation
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
 import org.apache.spark.sql.execution.command._
@@ -28,7 +30,8 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
@@ -171,6 +174,13 @@ private[sql] case class CarbonProjectForUpdateCommand(
       HorizontalCompaction.tryHorizontalCompaction(
         sparkSession, carbonTable, isUpdateOperation = true)
 
+      val allDataMapSchemas = DataMapStoreManager.getInstance
+        .getDataMapSchemasOfTable(carbonTable).asScala
+        .filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
+                                 !dataMapSchema.isIndexDataMap).asJava
+      if (!allDataMapSchemas.isEmpty) {
+        DataMapStatusManager.truncateDataMap(allDataMapSchemas)
+      }
       // trigger event for Update table
       val updateTablePostEvent: UpdateTablePostEvent =
         UpdateTablePostEvent(sparkSession, carbonTable)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala
new file mode 100644
index 0000000..f94b73a
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala
@@ -0,0 +1,141 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.command.mv
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.command.AlterTableModel
+import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand
+
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.datamap.DataMapManager
+import org.apache.carbondata.events.{
+  AlterTableCompactionPreStatusUpdateEvent,
+  DeleteFromTablePostEvent, Event, OperationContext, OperationEventListener, UpdateTablePostEvent
+}
+import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePostExecutionEvent
+import org.apache.carbondata.processing.merger.CompactionType
+
+/**
+ * Listener to trigger compaction on mv datamap after main table compaction
+ */
+object AlterDataMaptableCompactionPostListener extends OperationEventListener {
+  /**
+   * Called on a specified event occurrence
+   *
+   * @param event
+   * @param operationContext
+   */
+  override def onEvent(event: Event, operationContext: OperationContext): Unit = {
+    val compactionEvent = event.asInstanceOf[AlterTableCompactionPreStatusUpdateEvent]
+    val carbonTable = compactionEvent.carbonTable
+    val compactionType = compactionEvent.carbonMergerMapping.campactionType
+    if (compactionType == CompactionType.CUSTOM) {
+      return
+    }
+    val carbonLoadModel = compactionEvent.carbonLoadModel
+    val sparkSession = compactionEvent.sparkSession
+    val allDataMapSchemas = DataMapStoreManager.getInstance
+      .getDataMapSchemasOfTable(carbonTable).asScala
+      .filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
+                               !dataMapSchema.isIndexDataMap)
+    if (!allDataMapSchemas.asJava.isEmpty) {
+      allDataMapSchemas.foreach { dataMapSchema =>
+        val childRelationIdentifier = dataMapSchema.getRelationIdentifier
+        val alterTableModel = AlterTableModel(Some(childRelationIdentifier.getDatabaseName),
+          childRelationIdentifier.getTableName,
+          None,
+          compactionType.toString,
+          Some(System.currentTimeMillis()),
+          "")
+        operationContext.setProperty(
+          dataMapSchema.getRelationIdentifier.getDatabaseName + "_" +
+          dataMapSchema.getRelationIdentifier.getTableName + "_Segment",
+          carbonLoadModel.getSegmentId)
+        CarbonAlterTableCompactionCommand(alterTableModel, operationContext = operationContext)
+          .run(sparkSession)
+      }
+    }
+  }
+}
+
+/**
+ * Listener to trigger data load on mv datamap after main table data load
+ */
+object LoadPostDataMapListener extends OperationEventListener {
+  /**
+   * Called on a specified event occurrence
+   *
+   * @param event
+   * @param operationContext
+   */
+  override def onEvent(event: Event, operationContext: OperationContext): Unit = {
+    val sparkSession = SparkSession.getActiveSession.get
+    val carbonTable: CarbonTable =
+      event match {
+        case event: LoadTablePostExecutionEvent =>
+          val carbonLoadModelOption = Some(event.getCarbonLoadModel)
+          if (carbonLoadModelOption.isDefined) {
+            val carbonLoadModel = carbonLoadModelOption.get
+            carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+          } else {
+            null
+          }
+        case event: UpdateTablePostEvent =>
+          val table = Some(event.carbonTable)
+          if (table.isDefined) {
+            table.get
+          } else {
+            null
+          }
+        case event: DeleteFromTablePostEvent =>
+          val table = Some(event.carbonTable)
+          if (table.isDefined) {
+            table.get
+          } else {
+            null
+          }
+        case _ => null
+      }
+    if (null != carbonTable) {
+      val allDataMapSchemas = DataMapStoreManager.getInstance
+        .getDataMapSchemasOfTable(carbonTable).asScala
+        .filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
+                                 !dataMapSchema.isIndexDataMap)
+      if (!allDataMapSchemas.asJava.isEmpty) {
+        allDataMapSchemas.foreach { dataMapSchema =>
+          if (!dataMapSchema.isLazy) {
+            val provider = DataMapManager.get()
+              .getDataMapProvider(carbonTable, dataMapSchema, sparkSession)
+            try {
+              provider.rebuild()
+            } catch {
+              case ex: Exception =>
+                DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName)
+                throw ex
+            }
+            DataMapStatusManager.enableDataMap(dataMapSchema.getDataMapName)
+          }
+        }
+      }
+    }
+  }
+}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 62e2d85..d875fdf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -35,7 +35,9 @@ import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, DataMapUtil}
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonUtil
 
 case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[LogicalPlan] {
@@ -69,10 +71,22 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
             "Update operation is not supported for pre-aggregate table")
         }
         val indexSchemas = DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
-        if (!indexSchemas.isEmpty) {
+        if (CarbonTable.hasMVDataMap(carbonTable)) {
+          val allDataMapSchemas = DataMapStoreManager.getInstance
+            .getDataMapSchemasOfTable(carbonTable).asScala
+            .filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
+                                     !dataMapSchema.isIndexDataMap).asJava
+          allDataMapSchemas.asScala.foreach { dataMapSchema =>
+            DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName)
+          }
+        } else if (!indexSchemas.isEmpty) {
           throw new UnsupportedOperationException(
             "Update operation is not supported for table which has index datamaps")
         }
+        if (carbonTable.isChildTable) {
+          throw new UnsupportedOperationException(
+            "Update operation is not supported for mv datamap table")
+        }
       }
       val tableRelation = if (SparkUtil.isSparkVersionEqualTo("2.1")) {
         relation
@@ -200,7 +214,15 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
               "Delete operation is not supported for pre-aggregate table")
           }
           val indexSchemas = DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
-          if (!indexSchemas.isEmpty) {
+          if (CarbonTable.hasMVDataMap(carbonTable)) {
+            val allDataMapSchemas = DataMapStoreManager.getInstance
+              .getDataMapSchemasOfTable(carbonTable).asScala
+              .filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
+                                       !dataMapSchema.isIndexDataMap).asJava
+            allDataMapSchemas.asScala.foreach { dataMapSchema =>
+              DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName)
+            }
+          } else if (!indexSchemas.isEmpty) {
             throw new UnsupportedOperationException(
               "Delete operation is not supported for table which has index datamaps")
           }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 5f5cc12..1c3b7cf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -77,7 +77,8 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
 
   protected lazy val startCommand: Parser[LogicalPlan] =
     loadManagement | showLoads | alterTable | restructure | updateTable | deleteRecords |
-    alterPartition | datamapManagement | alterTableFinishStreaming | stream | cli | cacheManagement
+    alterPartition | datamapManagement | alterTableFinishStreaming | stream | cli |
+    cacheManagement | alterDataMap
 
   protected lazy val loadManagement: Parser[LogicalPlan] =
     deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew
@@ -246,6 +247,17 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
         CarbonDataMapRebuildCommand(datamap, tableIdent)
     }
 
+  protected lazy val alterDataMap: Parser[LogicalPlan] =
+    ALTER ~> DATAMAP ~> (ident <~ ".").? ~ ident ~ (COMPACT ~ stringLit) ~
+    (WHERE ~> (SEGMENT ~ "." ~ ID) ~> IN ~> "(" ~> repsep(segmentId, ",") <~ ")").? <~
+    opt(";") ^^ {
+      case dbName ~ datamap ~ (compact ~ compactType) ~ segs =>
+        val altertablemodel =
+          AlterTableModel(convertDbNameToLowerCase(dbName), datamap + "_table", None, compactType,
+            Some(System.currentTimeMillis()), null, segs)
+        CarbonAlterTableCompactionCommand(altertablemodel)
+    }
+
   protected lazy val deleteRecords: Parser[LogicalPlan] =
     (DELETE ~> FROM ~> aliasTable) ~ restInput.? <~ opt(";") ^^ {
       case table ~ rest =>
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 01e47be..1d51592 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -35,15 +35,19 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException;
+import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.status.DataMapSegmentStatusUtil;
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails;
 import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
@@ -299,7 +303,8 @@ public final class CarbonDataMergerUtil {
    */
   public static boolean updateLoadMetadataWithMergeStatus(List<LoadMetadataDetails> loadsToMerge,
       String metaDataFilepath, String mergedLoadNumber, CarbonLoadModel carbonLoadModel,
-      CompactionType compactionType, String segmentFile) throws IOException {
+      CompactionType compactionType, String segmentFile) throws IOException,
+      NoSuchDataMapException {
     boolean tableStatusUpdationStatus = false;
     AbsoluteTableIdentifier identifier =
         carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
@@ -351,6 +356,23 @@ public final class CarbonDataMergerUtil {
           loadMetadataDetails.setMajorCompacted("true");
         }
 
+        if (carbonTable.isChildTable()) {
+          // If table is child table, then get segment mapping and set to extraInfo
+          DataMapSchema dataMapSchema = null;
+          try {
+            dataMapSchema = DataMapStatusManager.getDataMapSchema(
+                carbonTable.getTableInfo().getFactTable().getTableProperties()
+                    .get(CarbonCommonConstants.DATAMAP_NAME));
+          } catch (NoSuchDataMapException e) {
+            throw e;
+          }
+          if (null != dataMapSchema) {
+            String segmentMap = DataMapSegmentStatusUtil
+                .getUpdatedSegmentMap(mergedLoadNumber, dataMapSchema, loadDetails);
+            loadMetadataDetails.setExtraInfo(segmentMap);
+          }
+        }
+
         List<LoadMetadataDetails> updatedDetailsList = new ArrayList<>(Arrays.asList(loadDetails));
 
         // put the merged folder entry
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index a396978..3769671 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -263,15 +263,26 @@ public final class CarbonLoaderUtil {
           String segmentId =
               String.valueOf(SegmentStatusManager.createNewSegmentId(listOfLoadFolderDetailsArray));
           loadModel.setLoadMetadataDetails(listOfLoadFolderDetails);
+          LoadMetadataDetails entryTobeRemoved = null;
           // Segment id would be provided in case this is compaction flow for aggregate data map.
           // If that is true then used the segment id as the load name.
           if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildDataMap() && !loadModel
               .getSegmentId().isEmpty()) {
             newMetaEntry.setLoadName(loadModel.getSegmentId());
+          } else if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildTable()
+              && !loadModel.getSegmentId().isEmpty()) {
+            for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
+              if (entry.getLoadName().equalsIgnoreCase(loadModel.getSegmentId())) {
+                newMetaEntry.setLoadName(loadModel.getSegmentId());
+                newMetaEntry.setExtraInfo(entry.getExtraInfo());
+                entryTobeRemoved = entry;
+              }
+            }
           } else {
             newMetaEntry.setLoadName(segmentId);
             loadModel.setSegmentId(segmentId);
           }
+          listOfLoadFolderDetails.remove(entryTobeRemoved);
           // Exception should be thrown if:
           // 1. If insert overwrite is in progress and any other load or insert operation
           // is triggered
@@ -299,6 +310,7 @@ public final class CarbonLoaderUtil {
           for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
             if (entry.getLoadName().equals(newMetaEntry.getLoadName())
                 && entry.getLoadStartTime() == newMetaEntry.getLoadStartTime()) {
+              newMetaEntry.setExtraInfo(entry.getExtraInfo());
               found = true;
               break;
             }