You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2020/05/13 07:50:31 UTC

[carbondata] branch master updated: [CARBONDATA-3800] Load data to SI and MV after insert stage command

This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 66b7533  [CARBONDATA-3800] Load data to SI and MV after insert stage command
66b7533 is described below

commit 66b7533f6079f36548f7b5410815d601eea5d2a0
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Wed May 6 14:51:09 2020 +0530

    [CARBONDATA-3800] Load data to SI and MV after insert stage command
    
    Why is this PR needed?
    1. Data is not loaded to child tables(SI and MV) after executing insert stage command.
    2. Datamap keyword still exists in some files
    
    What changes were proposed in this PR?
    1. Add Load Pre and Post listener's in CarbonInsertStageCommand to trigger data load
       to Secondary indexes and materialized views.
    2. Rename datamap to index
    
    This closes #3747
---
 .../core/constants/CarbonCommonConstants.java      |   5 +
 .../carbondata/core/index/AbstractIndexJob.java    |   2 +-
 .../org/apache/carbondata/core/index/Segment.java  |   2 +-
 .../apache/carbondata/core/index/TableIndex.java   |   2 +-
 .../apache/carbondata/core/index/dev/Index.java    |   4 +-
 .../core/indexstore/AbstractMemoryDMStore.java     |   2 +-
 .../core/indexstore/BlockletIndexStore.java        |   4 +-
 .../core/indexstore/SafeMemoryDMStore.java         |   2 +-
 .../core/indexstore/UnsafeMemoryDMStore.java       |   2 +-
 .../core/indexstore/blockletindex/BlockIndex.java  |   2 +-
 .../blockletindex/BlockletDataRefNode.java         |   2 +-
 .../carbondata/core/indexstore/row/IndexRow.java   |   2 +-
 .../core/indexstore/row/IndexRowImpl.java          |   2 +-
 .../core/indexstore/row/UnsafeIndexRow.java        |   2 +-
 .../org/apache/carbondata/core/view/MVManager.java |   2 +-
 .../apache/carbondata/core/view/MVProvider.java    |   2 +-
 .../core/metadata/schema/table/TableInfoTest.java  |  39 +--
 docs/configuration-parameters.md                   |   2 +-
 .../apache/carbondata/examplesCI/RunExamples.scala |   2 +-
 .../hadoop/api/CarbonTableInputFormat.java         |   2 +-
 .../index/examples/MinMaxIndexIndexFactory.java    |   4 +-
 .../examples/MinMaxIndexSuite.scala                |   0
 .../index/lucene/LuceneFineGrainIndex.java         |   2 +-
 .../carbon/flink/TestCarbonPartitionWriter.scala   | 238 +++++++++------
 .../org/apache/carbon/flink/TestCarbonWriter.scala | 336 ++++++++++-----------
 .../cluster/sdv/generated/LuceneTestCase.scala     |   4 +-
 ...opDataMapEvents.scala => DropIndexEvents.scala} |   0
 .../spark/rdd/CarbonDataRDDFactory.scala           |   2 +-
 .../carbondata/streaming/StreamSinkFactory.scala   |   7 +-
 .../scala/org/apache/spark/sql/CarbonSession.scala |   2 +-
 .../spark/sql/events/MergeIndexEventListener.scala |  11 +-
 .../command/cache/CarbonShowCacheCommand.scala     |  25 +-
 .../command/management/CarbonAddLoadCommand.scala  |   2 +-
 .../management/CarbonInsertFromStageCommand.scala  |  46 ++-
 .../schema/CarbonAlterTableRenameCommand.scala     |   2 +-
 .../Jobs/BlockletIndexDetailsWithSchema.java       |   6 +-
 .../Jobs/BlockletIndexInputFormat.java             |   4 +-
 .../Jobs/SparkBlockletIndexLoaderJob.scala         |   4 +-
 .../org/apache/spark/sql/test/util/QueryTest.scala |   1 +
 .../org/apache/spark/util/AlterTableUtil.scala     |   2 +-
 .../bloom/BloomCoarseGrainIndexFunctionSuite.scala |   2 +-
 .../index/bloom/BloomCoarseGrainIndexSuite.scala   |   4 +-
 .../index/lucene/LuceneCoarseGrainIndexSuite.scala |  16 +-
 .../index/lucene/LuceneFineGrainIndexSuite.scala   |  48 +--
 .../spark/testsuite/index/CGIndexTestCase.scala    |   2 +-
 .../spark/testsuite/index/FGIndexTestCase.scala    |   2 +-
 ...mand.scala => TestCreateMVWithTimeSeries.scala} |   2 +-
 .../processing/index/IndexWriterListener.java      |   4 +-
 .../processing/loading/events/LoadEvents.java      |  30 --
 .../store/writer/AbstractFactDataWriter.java       |   2 +-
 50 files changed, 463 insertions(+), 431 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 130eca1..ccdbb5a 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2427,4 +2427,9 @@ public final class CarbonCommonConstants {
    */
   @CarbonProperty
   public static final String DISABLE_SQL_REWRITE = "disable_sql_rewrite";
+
+  /**
+   * property which defines the insert stage flow
+   */
+  public static final String IS_INSERT_STAGE = "is_insert_stage";
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/index/AbstractIndexJob.java b/core/src/main/java/org/apache/carbondata/core/index/AbstractIndexJob.java
index 8b87012..fbf023a 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/AbstractIndexJob.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/AbstractIndexJob.java
@@ -26,7 +26,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
 /**
- * abstract class for data map job
+ * abstract class for index job
  */
 public abstract class AbstractIndexJob implements IndexJob {
 
diff --git a/core/src/main/java/org/apache/carbondata/core/index/Segment.java b/core/src/main/java/org/apache/carbondata/core/index/Segment.java
index 7f36474..e76253d 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/Segment.java
@@ -72,7 +72,7 @@ public class Segment implements Serializable, Writable {
   private long indexSize = 0;
 
   /**
-   * Whether to cache the segment data maps in executors or not.
+   * Whether to cache the segment indexes in executors or not.
    */
   private boolean isCacheable = true;
 
diff --git a/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java b/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java
index 5b6be91..a76b533 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/TableIndex.java
@@ -671,7 +671,7 @@ public final class TableIndex extends OperationEventListener {
       for (Index index : indices) {
         if (index.isScanRequired(filterExp)) {
           // If any one task in a given segment contains the data that means the segment need to
-          // be scanned and we need to validate further data maps in the same segment
+          // be scanned and we need to validate further indexes in the same segment
           prunedSegments.add(segment);
           break;
         }
diff --git a/core/src/main/java/org/apache/carbondata/core/index/dev/Index.java b/core/src/main/java/org/apache/carbondata/core/index/dev/Index.java
index 9f1d646..0270ab1 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/dev/Index.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/dev/Index.java
@@ -57,13 +57,13 @@ public interface Index<T extends Blocklet> {
       CarbonTable carbonTable, FilterExecuter filterExecuter);
 
   /**
-   * Prune the data maps for finding the row count. It returns a Map of
+   * Prune the indexes for finding the row count. It returns a Map of
    * blockletpath and the row count
    */
   long getRowCount(Segment segment, List<PartitionSpec> partitions);
 
   /**
-   * Prune the data maps for finding the row count for each block. It returns a Map of
+   * Prune the indexes for finding the row count for each block. It returns a Map of
    * blockletpath and the row count
    */
   Map<String, Long> getRowCountForEachBlock(Segment segment, List<PartitionSpec> partitions,
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java
index 2019838..8d29e78 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/AbstractMemoryDMStore.java
@@ -24,7 +24,7 @@ import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 
 /**
- * Store the data map row @{@link IndexRow}
+ * Store the index row @{@link IndexRow}
  */
 public abstract class AbstractMemoryDMStore implements Serializable {
 
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletIndexStore.java
index 9000382..e9231bf 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletIndexStore.java
@@ -248,7 +248,7 @@ public class BlockletIndexStore
   @Override
   public void put(TableBlockIndexUniqueIdentifierWrapper tableBlockIndexUniqueIdentifierWrapper,
       BlockletIndexWrapper wrapper) throws IOException {
-    // As dataMap will use unsafe memory, it is not recommended to overwrite an existing entry
+    // As index will use unsafe memory, it is not recommended to overwrite an existing entry
     // as in that case clearing unsafe memory need to be taken card. If at all index entry
     // in the cache need to be overwritten then use the invalidate interface
     // and then use the put interface
@@ -266,7 +266,7 @@ public class BlockletIndexStore
         lruCache.put(tableBlockIndexUniqueIdentifierWrapper.getTableBlockIndexUniqueIdentifier()
             .getUniqueTableSegmentIdentifier(), wrapper, wrapper.getMemorySize(), expirationTime);
       } catch (Throwable e) {
-        // clear all the memory acquired by data map in case of any failure
+        // clear all the memory acquired by index in case of any failure
         for (Index blockletIndex : indexes) {
           blockletIndex.clear();
         }
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
index 37ac092..f904067 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java
@@ -26,7 +26,7 @@ import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
 /**
- * Store the data map row @{@link IndexRow} data to memory.
+ * Store the index row @{@link IndexRow} data to memory.
  */
 public class SafeMemoryDMStore extends AbstractMemoryDMStore {
 
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
index 3d0bf9e..4fd0ebe 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -32,7 +32,7 @@ import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET;
 import static org.apache.carbondata.core.memory.CarbonUnsafe.getUnsafe;
 
 /**
- * Store the data map row @{@link IndexRow} data to unsafe.
+ * Store the index row @{@link IndexRow} data to unsafe.
  */
 public class UnsafeMemoryDMStore extends AbstractMemoryDMStore {
 
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
index 3ab3cd4..0ad7940 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
@@ -879,7 +879,7 @@ public class BlockIndex extends CoarseGrainIndex
   }
 
   /**
-   * Get the index file name of the blocklet data map
+   * Get the index file name of the blocklet index
    *
    * @return
    */
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNode.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNode.java
index 24ca522..6893876 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNode.java
@@ -37,7 +37,7 @@ import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
 import org.apache.carbondata.core.util.BitSetGroup;
 
 /**
- * wrapper for blocklet data map data
+ * wrapper for blocklet index data
  */
 public class BlockletDataRefNode implements DataRefNode {
 
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/IndexRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/IndexRow.java
index cf66f97..ca16371 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/IndexRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/IndexRow.java
@@ -79,7 +79,7 @@ public abstract class IndexRow implements Serializable {
     for (int i = 0; i < schemas.length; i++) {
       len += getSizeInBytes(i);
     }
-    // for last offset in unsafe data map row
+    // for last offset in unsafe index row
     len += 4;
     return len;
   }
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/IndexRowImpl.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/IndexRowImpl.java
index 2d0770d..0cc8f18 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/IndexRowImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/IndexRowImpl.java
@@ -21,7 +21,7 @@ import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 
 /**
- * Data map row.
+ * Index row.
  */
 public class IndexRowImpl extends IndexRow {
 
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeIndexRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeIndexRow.java
index b6ebc14..5b9d382 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeIndexRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeIndexRow.java
@@ -24,7 +24,7 @@ import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET;
 import static org.apache.carbondata.core.memory.CarbonUnsafe.getUnsafe;
 
 /**
- * Unsafe implementation of data map row.
+ * Unsafe implementation of index row.
  */
 public class UnsafeIndexRow extends IndexRow {
 
diff --git a/core/src/main/java/org/apache/carbondata/core/view/MVManager.java b/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
index b2adc5f..37658df 100644
--- a/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
@@ -143,7 +143,7 @@ public abstract class MVManager {
   /**
    * Drops the mv schema from storage
    *
-   * @param viewName data map name
+   * @param viewName index name
    */
   public void deleteSchema(String databaseName, String viewName) throws IOException {
     schemaProvider.dropSchema(this, databaseName, viewName);
diff --git a/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java b/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
index 6593299..429f274 100644
--- a/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
@@ -366,7 +366,7 @@ public class MVProvider {
   }
 
   /**
-   * Data map schema provider of a database.
+   * Index schema provider of a database.
    */
   private static final class SchemaProvider {
 
diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableInfoTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableInfoTest.java
index b45aabd..2410579 100644
--- a/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableInfoTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/metadata/schema/table/TableInfoTest.java
@@ -127,7 +127,7 @@ public class TableInfoTest extends TestCase {
         + "\"tableProperties\":{\"sort_columns\":\"c1\",\"comment\":\"\","
         + "\"local_dictionary_enable\":\"true\"}},\"lastUpdatedTime\":1530534235537,"
         + "\"tablePath\":\"/store/carbonversion_1_1/testinttype1\","
-        + "\"isTransactionalTable\":true,\"dataMapSchemaList\":[],"
+        + "\"isTransactionalTable\":true,"
         + "\"parentRelationIdentifiers\":[],\"isSchemaModified\":false}");
     TableInfo tableInfo = CarbonUtil.convertGsonToTableInfo(properties);
     // the schema evolution should not be null
@@ -164,42 +164,7 @@ public class TableInfoTest extends TestCase {
         + "\":[{\"timeStamp\":1531389794988,\"added\":[],\"removed\":[]}]},"
         + "\"tableProperties\":{\"sort_columns\":\"c1\",\"comment\":\"\"}},"
         + "\"lastUpdatedTime\":1531389794988,\"tablePath\":"
-        + "\"/opt/store/carbonversion_1_3/testinttype3\",\"dataMapSchemaList\":"
-        + "[{\"dataMapName\":\"dm1\",\"className\":"
-        + "\"org.apache.carbondata.core.datamap.AggregateDataMap\","
-        + "\"relationIdentifier\":{\"databaseName\":\"carbonversion_1_3\","
-        + "\"tableName\":\"testinttype3_dm1\",\"tableId\":"
-        + "\"97ccae02-c821-4601-a782-69e715671419\"},\"childSchema\":"
-        + "{\"tableId\":\"97ccae02-c821-4601-a782-69e715671419\",\"tableName\":"
-        + "\"testinttype3_dm1\",\"listOfColumns\":[{\"dataType\":{\"id\":0,"
-        + "\"precedenceOrder\":0,\"name\":\"STRING\",\"sizeInBytes\":-1},"
-        + "\"columnName\":\"testinttype3_c1\",\"columnUniqueId\":"
-        + "\"e72ec46b-f41d-43e8-82d9-9b44714a3f36\",\"columnReferenceId\":"
-        + "\"e72ec46b-f41d-43e8-82d9-9b44714a3f36\",\"isColumnar\":true,\"encodingList"
-        + "\":[\"INVERTED_INDEX\"],\"isDimensionColumn\":true,\"columnGroupId\":-1,"
-        + "\"scale\":-1,\"precision\":-1,\"schemaOrdinal\":0,\"numberOfChild\":0,"
-        + "\"invisible\":false,\"isSortColumn\":true,\"aggFunction\":\"\","
-        + "\"parentColumnTableRelations\":[{\"relationIdentifier\":{\"databaseName\":"
-        + "\"carbonversion_1_3\",\"tableName\":\"testinttype3\",\"tableId\":"
-        + "\"453fa0dd-721d-41b7-9378-f6d6122daf36\"},\"columnId\":"
-        + "\"c84e7e3b-5682-4b46-8c72-0f2f341a0a49\",\"columnName\":\"c1\"}],"
-        + "\"timeSeriesFunction\":\"\"},{\"dataType\":{\"id\":7,\"precedenceOrder\":5,"
-        + "\"name\":\"LONG\",\"sizeInBytes\":8},\"columnName\":\"testinttype3_c2_sum\","
-        + "\"columnUniqueId\":\"4d77c528-c830-4f8b-943b-bac9ee9f9af7\",\"columnReferenceId"
-        + "\":\"4d77c528-c830-4f8b-943b-bac9ee9f9af7\",\"isColumnar\":true,\"encodingList"
-        + "\":[],\"isDimensionColumn\":false,\"columnGroupId\":-1,\"scale\":-1,\"precision"
-        + "\":-1,\"schemaOrdinal\":1,\"numberOfChild\":0,\"invisible\":false,\"isSortColumn"
-        + "\":false,\"aggFunction\":\"sum\",\"parentColumnTableRelations\":"
-        + "[{\"relationIdentifier\":{\"databaseName\":\"carbonversion_1_3\",\"tableName"
-        + "\":\"testinttype3\",\"tableId\":\"453fa0dd-721d-41b7-9378-f6d6122daf36\"},"
-        + "\"columnId\":\"008dc283-beca-4a3e-ad40-b7916aa67795\",\"columnName\":\"c2\"}],"
-        + "\"timeSeriesFunction\":\"\"}],\"schemaEvalution\":{\"schemaEvolutionEntryList\":"
-        + "[{\"timeStamp\":1531389797829,\"added\":[],\"removed\":[]}]},\"tableProperties\""
-        + ":{\"sort_scope\":\"LOCAL_SORT\",\"sort_columns\":\"testinttype3_c1\","
-        + "\"table_blocksize\":\"1024\",\"comment\":\"\"}},\"properties\":"
-        + "{\"QUERYTYPE\":\"AGGREGATION\",\"CHILD_SELECT QUERY\":"
-        + "\"c2VsZWN0IGMxLCBzdW0oYzIpIGZyb20gdGVzdGludHR5cGUzIGdyb3VwIGJ5IGMx\"}}],"
-        + "\"parentRelationIdentifiers\":[],\"isSchemaModified\":false}");
+        + "\"/opt/store/carbonversion_1_3/testinttype3\"}");
     TableInfo tableInfo = CarbonUtil.convertGsonToTableInfo(properties);
     // the schema evolution should not be null
     assertTrue(null != tableInfo.getFactTable());
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index e4a8159..3d81321 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -41,7 +41,7 @@ This section provides the details of all the configurations required for the Car
 | carbon.lock.path | TABLEPATH | This configuration specifies the path where lock files have to be created. Recommended to configure zookeeper lock type or configure HDFS lock path(to this property) in case of S3 file system as locking is not feasible on S3. |
 | enable.offheap.sort | true | Whether carbondata will use offheap or onheap memory. By default, the value is true and carbondata will use the property value from *carbon.unsafe.working.memory.in.mb* or *carbon.unsafe.driver.working.memory.in.mb* as the amount of memory; if it is false, carbondata will use the minimum value between the configured amount of unsafe memory and the 60% of JVM Heap Memory as the amount of memory. |
 | carbon.unsafe.working.memory.in.mb | 512 | CarbonData supports storing data in off-heap memory for certain operations during data loading and query. This helps to avoid the Java GC and thereby improve the overall performance. The Minimum value recommeded is 512MB. Any value below this is reset to default value of 512MB. **NOTE:** The below formulas explain how to arrive at the off-heap size required.<u>Memory Required For Data Loading per executor: </u>(*carbon.number.of.cores.while.lo [...]
-| carbon.unsafe.driver.working.memory.in.mb | (none) | CarbonData supports storing data in unsafe on-heap memory in driver for certain operations like insert into, query for loading datamap cache. The Minimum value recommended is 512MB. If this configuration is not set, carbondata will use the value of `carbon.unsafe.working.memory.in.mb`. |
+| carbon.unsafe.driver.working.memory.in.mb | (none) | CarbonData supports storing data in unsafe on-heap memory in driver for certain operations like insert into, query for loading index cache. The Minimum value recommended is 512MB. If this configuration is not set, carbondata will use the value of `carbon.unsafe.working.memory.in.mb`. |
 | carbon.update.sync.folder | /tmp/carbondata | CarbonData maintains last modification time entries in modifiedTime.mdt to determine the schema changes and reload only when necessary. This configuration specifies the path where the file needs to be written. |
 | carbon.invisible.segments.preserve.count | 200 | CarbonData maintains each data load entry in tablestatus file. The entries from this file are not deleted for those segments that are compacted or dropped, but are made invisible. If the number of data loads are very high, the size and number of entries in tablestatus file can become too many causing unnecessary reading of all data. This configuration specifies the number of segment entries to be maintained afte they are compacted or dro [...]
 | carbon.lock.retries | 3 | CarbonData ensures consistency of operations by blocking certain operations from running in parallel. In order to block the operations from running in parallel, lock is obtained on the table. This configuration specifies the maximum number of retries to obtain the lock for any operations other than load. **NOTE:** Data manupulation operations like Compaction,UPDATE,DELETE  or LOADING,UPDATE,DELETE are not allowed to run in parallel. How ever data loading can h [...]
diff --git a/examples/spark/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala b/examples/spark/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala
index 5c2b767..7b09e57 100644
--- a/examples/spark/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala
+++ b/examples/spark/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala
@@ -104,7 +104,7 @@ class RunExamples extends QueryTest with BeforeAndAfterAll {
     TableLevelCompactionOptionExample.exampleBody(spark)
   }
 
-  test("LuceneDataMapExample") {
+  test("LuceneIndexExample") {
     LuceneIndexExample.exampleBody(spark)
   }
 
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 8d77b92..bca03f8 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -427,7 +427,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
       List<PartitionSpec> partitions, boolean isUpdateFlow) throws IOException {
     // Normal query flow goes to CarbonInputFormat#getPrunedBlocklets and initialize the
     // pruning info for table we queried. But here count star query without filter uses a different
-    // query plan, and no pruning info is initialized. When it calls default data map to
+    // query plan, and no pruning info is initialized. When it calls default index to
     // prune(with a null filter), exception will occur during setting pruning info.
     // Considering no useful information about block/blocklet pruning for such query
     // (actually no pruning), so we disable explain collector here
diff --git a/index/examples/src/minmaxindex/main/java/org/apache/carbondata/index/examples/MinMaxIndexIndexFactory.java b/index/examples/src/minmaxindex/main/java/org/apache/carbondata/index/examples/MinMaxIndexIndexFactory.java
index c3d5dfd..b3d99cd 100644
--- a/index/examples/src/minmaxindex/main/java/org/apache/carbondata/index/examples/MinMaxIndexIndexFactory.java
+++ b/index/examples/src/minmaxindex/main/java/org/apache/carbondata/index/examples/MinMaxIndexIndexFactory.java
@@ -97,7 +97,7 @@ public class MinMaxIndexIndexFactory extends CoarseGrainIndexFactory {
   }
 
   /**
-   * getSecondaryIndexes Factory method Initializes the Min Max Data Map and returns.
+   * getSecondaryIndexes Factory method Initializes the Min Max Index and returns.
    *
    * @param segment
    * @return
@@ -140,7 +140,7 @@ public class MinMaxIndexIndexFactory extends CoarseGrainIndexFactory {
   }
 
   /**
-   * Clearing the data map.
+   * Clearing the index.
    */
   @Override
   public void clear() {
diff --git a/index/examples/src/minmaxindex/test/scala/org/apache/carbondata/datamap/examples/MinMaxIndexSuite.scala b/index/examples/src/minmaxindex/test/scala/org/apache/carbondata/index/examples/MinMaxIndexSuite.scala
similarity index 100%
rename from index/examples/src/minmaxindex/test/scala/org/apache/carbondata/datamap/examples/MinMaxIndexSuite.scala
rename to index/examples/src/minmaxindex/test/scala/org/apache/carbondata/index/examples/MinMaxIndexSuite.scala
diff --git a/index/lucene/src/main/java/org/apache/carbondata/index/lucene/LuceneFineGrainIndex.java b/index/lucene/src/main/java/org/apache/carbondata/index/lucene/LuceneFineGrainIndex.java
index 09572d0..3351c16 100644
--- a/index/lucene/src/main/java/org/apache/carbondata/index/lucene/LuceneFineGrainIndex.java
+++ b/index/lucene/src/main/java/org/apache/carbondata/index/lucene/LuceneFineGrainIndex.java
@@ -102,7 +102,7 @@ public class LuceneFineGrainIndex extends FineGrainIndex {
   }
 
   /**
-   * It is called to load the data map to memory or to initialize it.
+   * It is called to load the index to memory or to initialize it.
    */
   public void init(IndexModel indexModel) throws IOException {
     long startTime = System.currentTimeMillis();
diff --git a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
index 73284ff..ac17df2 100644
--- a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
+++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
@@ -37,30 +37,20 @@ import org.apache.spark.sql.test.util.QueryTest
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
+import org.scalatest.BeforeAndAfterAll
 
-class TestCarbonPartitionWriter extends QueryTest {
+class TestCarbonPartitionWriter extends QueryTest with BeforeAndAfterAll{
 
   val tableName = "test_flink_partition"
+  val dataTempPath = targetTestClass + "/data/temp/"
 
   test("Writing flink data to local partition carbon table") {
-    sql(s"DROP TABLE IF EXISTS $tableName").collect()
-    sql(
-      s"""
-         | CREATE TABLE $tableName (stringField string, intField int, shortField short)
-         | STORED AS carbondata
-         | PARTITIONED BY (hour_ string, date_ string)
-         | TBLPROPERTIES ('SORT_COLUMNS'='hour_,date_,stringField', 'SORT_SCOPE'='GLOBAL_SORT')
-      """.stripMargin
-    ).collect()
-
-    val rootPath = System.getProperty("user.dir") + "/target/test-classes"
-
-    val dataTempPath = rootPath + "/data/temp/"
-
+    createPartitionTable
     try {
       val tablePath = storeLocation + "/" + tableName + "/"
-
-      val writerProperties = newWriterProperties(dataTempPath, storeLocation)
+      val writerProperties = newWriterProperties(dataTempPath)
       val carbonProperties = newCarbonProperties(storeLocation)
 
       val environment = StreamExecutionEnvironment.getExecutionEnvironment
@@ -69,53 +59,13 @@ class TestCarbonPartitionWriter extends QueryTest {
       environment.setRestartStrategy(RestartStrategies.noRestart)
 
       val dataCount = 1000
-      val source = new TestSource(dataCount) {
-        @throws[InterruptedException]
-        override def get(index: Int): Array[AnyRef] = {
-          val data = new Array[AnyRef](7)
-          data(0) = "test" + index
-          data(1) = index.asInstanceOf[AnyRef]
-          data(2) = 12345.asInstanceOf[AnyRef]
-          data(3) = Integer.toString(TestSource.randomCache.get().nextInt(24))
-          data(4) = "20191218"
-          data
-        }
-
-        @throws[InterruptedException]
-        override def onFinish(): Unit = {
-          Thread.sleep(5000L)
-        }
-      }
-      val stream = environment.addSource(source)
-      val factory = CarbonWriterFactory.builder("Local").build(
-        "default",
-        tableName,
-        tablePath,
-        new Properties,
-        writerProperties,
-        carbonProperties
-      )
-      val streamSink = StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), factory).build
-
-      stream.keyBy(new KeySelector[Array[AnyRef], AnyRef] {
-        override def getKey(value: Array[AnyRef]): AnyRef = value(3) // return hour_
-      }).addSink(streamSink)
-
-      try environment.execute
-      catch {
-        case exception: Exception =>
-          // TODO
-          throw new UnsupportedOperationException(exception)
-      }
-      assertResult(false)(FileFactory
-        .getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty)
+      val source = getTestSource(dataCount)
+      executeStreamingEnvironment(tablePath, writerProperties, carbonProperties, environment, source)
 
       sql(s"INSERT INTO $tableName STAGE")
 
       checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000)))
 
-    } finally {
-      sql(s"DROP TABLE IF EXISTS $tableName").collect()
     }
   }
 
@@ -130,15 +80,9 @@ class TestCarbonPartitionWriter extends QueryTest {
          | TBLPROPERTIES ('SORT_COLUMNS'='hour_,date_,stringField', 'SORT_SCOPE'='GLOBAL_SORT')
       """.stripMargin
     ).collect()
-
-    val rootPath = System.getProperty("user.dir") + "/target/test-classes"
-
-    val dataTempPath = rootPath + "/data/temp/"
-
     try {
       val tablePath = storeLocation + "/" + tableName + "/"
-
-      val writerProperties = newWriterProperties(dataTempPath, storeLocation)
+      val writerProperties = newWriterProperties(dataTempPath)
       val carbonProperties = newCarbonProperties(storeLocation)
 
       val environment = StreamExecutionEnvironment.getExecutionEnvironment
@@ -166,29 +110,7 @@ class TestCarbonPartitionWriter extends QueryTest {
           Thread.sleep(5000L)
         }
       }
-      val stream = environment.addSource(source)
-      val factory = CarbonWriterFactory.builder("Local").build(
-        "default",
-        tableName,
-        tablePath,
-        new Properties,
-        writerProperties,
-        carbonProperties
-      )
-      val streamSink = StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), factory).build
-
-      stream.keyBy(new KeySelector[Array[AnyRef], AnyRef] {
-        override def getKey(value: Array[AnyRef]): AnyRef = value(3) // return hour_
-      }).addSink(streamSink)
-
-      try environment.execute
-      catch {
-        case exception: Exception =>
-          // TODO
-          throw new UnsupportedOperationException(exception)
-      }
-      assertResult(false)(FileFactory
-        .getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty)
+      executeStreamingEnvironment(tablePath, writerProperties, carbonProperties, environment, source)
 
       sql(s"INSERT INTO $tableName STAGE")
 
@@ -201,14 +123,144 @@ class TestCarbonPartitionWriter extends QueryTest {
       assertResult(1)(rows.length)
       assertResult(Array[Byte](2, 3, 4))(rows(0).get(rows(0).fieldIndex("binaryfield")).asInstanceOf[GenericRowWithSchema](0))
 
-    } finally {
-      sql(s"DROP TABLE IF EXISTS $tableName").collect()
     }
   }
 
+  test("test insert stage into partition carbon table with secondary index") {
+    createPartitionTable
+    // create si index
+    sql(s"drop index if exists si_1 on $tableName")
+    sql(s"create index si_1 on $tableName(stringField1) as 'carbondata'")
+    try {
+      val tablePath = storeLocation + "/" + tableName + "/"
+      val writerProperties = newWriterProperties(dataTempPath)
+      val carbonProperties = newCarbonProperties(storeLocation)
+
+      val environment = StreamExecutionEnvironment.getExecutionEnvironment
+      environment.setParallelism(6)
+      environment.enableCheckpointing(2000L)
+      environment.setRestartStrategy(RestartStrategies.noRestart)
+
+      val dataCount = 10
+      val source = getTestSource(dataCount)
+      executeStreamingEnvironment(tablePath, writerProperties, carbonProperties, environment, source)
+
+      sql(s"INSERT INTO $tableName STAGE")
+
+      checkAnswer(sql("select count(*) from si_1"), Seq(Row(10))  )
+      checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(10)))
+      // check if query hits si
+      val df = sql(s"select stringField, intField from $tableName where stringField1 = 'si1'")
+      checkAnswer(df, Seq(Row("test1", 1)))
+      var isFilterHitSecondaryIndex = false
+      df.queryExecution.sparkPlan.transform {
+        case broadCastSIFilterPushDown: BroadCastSIFilterPushJoin =>
+          isFilterHitSecondaryIndex = true
+          broadCastSIFilterPushDown
+      }
+      assert(isFilterHitSecondaryIndex)
+    }
+  }
+
+  test("test insert stage into partition carbon table with materialized view") {
+    createPartitionTable
+    // create materialized view
+    sql(s"drop materialized view if exists mv_1")
+    sql(s"create materialized view mv_1 as select stringField, shortField from $tableName where intField=9")
+
+    try {
+      val tablePath = storeLocation + "/" + tableName + "/"
+      val writerProperties = newWriterProperties(dataTempPath)
+      val carbonProperties = newCarbonProperties(storeLocation)
+
+      val environment = StreamExecutionEnvironment.getExecutionEnvironment
+      environment.setParallelism(6)
+      environment.enableCheckpointing(2000L)
+      environment.setRestartStrategy(RestartStrategies.noRestart)
+
+      val dataCount = 10
+      val source = getTestSource(dataCount)
+      executeStreamingEnvironment(tablePath, writerProperties, carbonProperties, environment, source)
+
+      sql(s"INSERT INTO $tableName STAGE")
+
+      checkAnswer(sql(s"SELECT count(*) FROM mv_1"), Seq(Row(1)))
+      val df = sql(s"select stringField, shortField from $tableName where intField=9")
+      val tables = df.queryExecution.optimizedPlan collect {
+        case l: LogicalRelation => l.catalogTable.get
+      }
+      assert(tables.exists(_.identifier.table.equalsIgnoreCase("mv_1")))
+      checkAnswer(df, Seq(Row("test9",12345)))
+
+    }
+  }
+
+  private def getTestSource(dataCount: Int): TestSource = {
+    new TestSource(dataCount) {
+      @throws[InterruptedException]
+      override def get(index: Int): Array[AnyRef] = {
+        val data = new Array[AnyRef](7)
+        data(0) = "test" + index
+        data(1) = index.asInstanceOf[AnyRef]
+        data(2) = 12345.asInstanceOf[AnyRef]
+        data(3) = "si" + index
+        data(4) = Integer.toString(TestSource.randomCache.get().nextInt(24))
+        data
+      }
+
+      @throws[InterruptedException]
+      override def onFinish(): Unit = {
+        Thread.sleep(5000L)
+      }
+    }
+  }
+
+  private def createPartitionTable = {
+    sql(s"DROP TABLE IF EXISTS $tableName")
+    sql(
+      s"""
+         | CREATE TABLE $tableName (stringField string, intField int, shortField short, stringField1 string)
+         | STORED AS carbondata
+         | PARTITIONED BY (hour_ string)
+         | TBLPROPERTIES ('SORT_COLUMNS'='hour_,stringField', 'SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+  }
+
+  override def afterAll(): Unit = {
+    sql(s"DROP TABLE IF EXISTS $tableName")
+  }
+
+  private def executeStreamingEnvironment(tablePath: String,
+      writerProperties: Properties,
+      carbonProperties: Properties,
+      environment: StreamExecutionEnvironment,
+      source: TestSource): Unit = {
+    val stream = environment.addSource(source)
+    val factory = CarbonWriterFactory.builder("Local").build(
+      "default",
+      tableName,
+      tablePath,
+      new Properties,
+      writerProperties,
+      carbonProperties)
+    val streamSink = StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), factory).build
+
+    stream.keyBy(new KeySelector[Array[AnyRef], AnyRef] {
+      override def getKey(value: Array[AnyRef]): AnyRef = value(3) // return hour_
+    }).addSink(streamSink)
+
+    try environment.execute
+    catch {
+      case exception: Exception =>
+        // TODO
+        throw new UnsupportedOperationException(exception)
+    }
+    assertResult(false)(FileFactory
+      .getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty)
+  }
+
   private def newWriterProperties(
-     dataTempPath: String,
-     storeLocation: String) = {
+     dataTempPath: String) = {
     val properties = new Properties
     properties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, dataTempPath)
     properties
diff --git a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
index 396703d..c40273d 100644
--- a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
+++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
@@ -19,12 +19,15 @@ package org.apache.carbon.flink
 
 import java.util.Properties
 
+import org.apache.flink.api.common.JobExecutionResult
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.flink.api.common.restartstrategy.RestartStrategies
 import org.apache.flink.core.fs.Path
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.{CarbonEnv, Row}
 import org.apache.spark.sql.test.util.QueryTest
 
@@ -32,72 +35,25 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.spark.sql.execution.exchange.Exchange
+import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
+import org.scalatest.BeforeAndAfterAll
 
-class TestCarbonWriter extends QueryTest {
+class TestCarbonWriter extends QueryTest with BeforeAndAfterAll{
 
   val tableName = "test_flink"
   val bucketTableName = "insert_bucket_table"
+  val dataTempPath: String = targetTestClass + "/data/temp/"
 
   test("Writing flink data to local carbon table") {
-    sql(s"DROP TABLE IF EXISTS $tableName").collect()
-    sql(
-      s"""
-         | CREATE TABLE $tableName (stringField string, intField int, shortField short)
-         | STORED AS carbondata
-      """.stripMargin
-    ).collect()
-
-    val rootPath = System.getProperty("user.dir") + "/target/test-classes"
-
-    val dataTempPath = rootPath + "/data/temp/"
-
+    createTable
     try {
       val tablePath = storeLocation + "/" + tableName + "/"
-
-      val writerProperties = newWriterProperties(dataTempPath, storeLocation)
+      val writerProperties = newWriterProperties(dataTempPath)
       val carbonProperties = newCarbonProperties(storeLocation)
 
       val environment = StreamExecutionEnvironment.getExecutionEnvironment
-      environment.setParallelism(1)
       environment.enableCheckpointing(2000L)
-      environment.setRestartStrategy(RestartStrategies.noRestart)
-
-      val dataCount = 1000
-      val source = new TestSource(dataCount) {
-        @throws[InterruptedException]
-        override def get(index: Int): Array[AnyRef] = {
-          Thread.sleep(1L)
-          val data = new Array[AnyRef](3)
-          data(0) = "test" + index
-          data(1) = index.asInstanceOf[AnyRef]
-          data(2) = 12345.asInstanceOf[AnyRef]
-          data
-        }
-
-        @throws[InterruptedException]
-        override def onFinish(): Unit = {
-          Thread.sleep(5000L)
-        }
-      }
-      val stream = environment.addSource(source)
-      val factory = CarbonWriterFactory.builder("Local").build(
-        "default",
-        tableName,
-        tablePath,
-        new Properties,
-        writerProperties,
-        carbonProperties
-      )
-      val streamSink = StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), factory).build
-
-      stream.addSink(streamSink)
-
-      try environment.execute
-      catch {
-        case exception: Exception =>
-          // TODO
-          throw new UnsupportedOperationException(exception)
-      }
+      executeFlinkStreamingEnvironment(environment, writerProperties, carbonProperties)
 
       checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(0)))
 
@@ -113,76 +69,19 @@ class TestCarbonWriter extends QueryTest {
 
       checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(1000)))
       checkAnswer(sql(s"select count(intField) from $tableName where intField >= 900"), Seq(Row(100)))
-
-      // ensure the stage snapshot file and all stage files are deleted
-      assertResult(false)(FileFactory.isFileExist(CarbonTablePath.getStageSnapshotFile(tablePath)))
-      assertResult(true)(FileFactory.getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty)
-
-    } finally {
-      sql(s"DROP TABLE IF EXISTS $tableName").collect()
+      checkIfStageFilesAreDeleted(tablePath)
     }
   }
 
   test("test batch_file_count option") {
-    sql(s"DROP TABLE IF EXISTS $tableName").collect()
-    sql(
-      s"""
-         | CREATE TABLE $tableName (stringField string, intField int, shortField short)
-         | STORED AS carbondata
-      """.stripMargin
-    ).collect()
-
-    val rootPath = System.getProperty("user.dir") + "/target/test-classes"
-
-    val dataTempPath = rootPath + "/data/temp/"
-
+    createTable
     try {
-      val tablePath = storeLocation + "/" + tableName + "/"
-
-      val writerProperties = newWriterProperties(dataTempPath, storeLocation)
+      val writerProperties = newWriterProperties(dataTempPath)
       val carbonProperties = newCarbonProperties(storeLocation)
-
       writerProperties.put(CarbonLocalProperty.COMMIT_THRESHOLD, "100")
 
       val environment = StreamExecutionEnvironment.getExecutionEnvironment
-      environment.setParallelism(1)
-      environment.setRestartStrategy(RestartStrategies.noRestart)
-
-      val dataCount = 1000
-      val source = new TestSource(dataCount) {
-        @throws[InterruptedException]
-        override def get(index: Int): Array[AnyRef] = {
-          val data = new Array[AnyRef](3)
-          data(0) = "test" + index
-          data(1) = index.asInstanceOf[AnyRef]
-          data(2) = 12345.asInstanceOf[AnyRef]
-          data
-        }
-
-        @throws[InterruptedException]
-        override def onFinish(): Unit = {
-          Thread.sleep(5000L)
-        }
-      }
-      val stream = environment.addSource(source)
-      val factory = CarbonWriterFactory.builder("Local").build(
-        "default",
-        tableName,
-        tablePath,
-        new Properties,
-        writerProperties,
-        carbonProperties
-      )
-      val streamSink = StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), factory).build
-
-      stream.addSink(streamSink)
-
-      try environment.execute
-      catch {
-        case exception: Exception =>
-          // TODO
-          throw new UnsupportedOperationException(exception)
-      }
+      executeFlinkStreamingEnvironment(environment, writerProperties, carbonProperties)
 
       sql(s"INSERT INTO $tableName STAGE OPTIONS ('batch_file_count' = '5')")
 
@@ -191,8 +90,6 @@ class TestCarbonWriter extends QueryTest {
       sql(s"INSERT INTO $tableName STAGE OPTIONS ('batch_file_count' = '5')")
 
       checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000)))
-    } finally {
-      sql(s"DROP TABLE IF EXISTS $tableName").collect()
     }
   }
 
@@ -201,67 +98,24 @@ class TestCarbonWriter extends QueryTest {
     sql(s"DROP TABLE IF EXISTS $bucketTableName").collect()
     sql(
       s"""
-         | CREATE TABLE $tableName (stringField string, intField int, shortField short)
+         | CREATE TABLE $tableName (stringField string, intField int, shortField short, stringField1 string)
          | STORED AS carbondata TBLPROPERTIES ('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='stringField')
       """.stripMargin
     ).collect()
     sql(
       s"""
-         | CREATE TABLE $bucketTableName (stringField string, intField int, shortField short)
+         | CREATE TABLE $bucketTableName (stringField string, intField int, shortField short, stringField1 string)
          | STORED AS carbondata TBLPROPERTIES ('BUCKET_NUMBER'='10', 'BUCKET_COLUMNS'='stringField')
       """.stripMargin
     ).collect()
-
-    val rootPath = System.getProperty("user.dir") + "/target/test-classes"
-
-    val dataTempPath = rootPath + "/data/temp/"
-
     try {
-      val flinkTablePath = storeLocation + "/" + tableName + "/"
-
-      val writerProperties = newWriterProperties(dataTempPath, storeLocation)
+      val writerProperties = newWriterProperties(dataTempPath)
       val carbonProperties = newCarbonProperties(storeLocation)
-
       writerProperties.put(CarbonLocalProperty.COMMIT_THRESHOLD, "100")
 
       val environment = StreamExecutionEnvironment.getExecutionEnvironment
-      environment.setParallelism(1)
-      environment.setRestartStrategy(RestartStrategies.noRestart)
-
-      val dataCount = 1000
-      val source = new TestSource(dataCount) {
-        @throws[InterruptedException]
-        override def get(index: Int): Array[AnyRef] = {
-          val data = new Array[AnyRef](3)
-          data(0) = "test" + index
-          data(1) = index.asInstanceOf[AnyRef]
-          data(2) = 12345.asInstanceOf[AnyRef]
-          data
-        }
-
-        @throws[InterruptedException]
-        override def onFinish(): Unit = {
-          Thread.sleep(5000L)
-        }
-      }
-      val stream = environment.addSource(source)
-      val factory = CarbonWriterFactory.builder("Local").build(
-        "default",
-        tableName,
-        flinkTablePath,
-        new Properties,
-        writerProperties,
-        carbonProperties
-      )
-      val streamSink = StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), factory).build
-
-      stream.addSink(streamSink)
-
-      try environment.execute
-      catch {
-        case exception: Exception =>
-          throw new UnsupportedOperationException(exception)
-      }
+      executeFlinkStreamingEnvironment(environment, writerProperties, carbonProperties)
+
       sql(s"INSERT INTO $tableName STAGE OPTIONS ('batch_file_count' = '5')")
       val table = CarbonEnv.getCarbonTable(Option("default"), s"$tableName")(sqlContext.sparkSession)
       val segmentDir = FileFactory.getCarbonFile(table.getTablePath + "/Fact/Part0/Segment_0")
@@ -301,16 +155,156 @@ class TestCarbonWriter extends QueryTest {
            |from $tableName t1, $bucketTableName t2
            |where t1.stringField = t2.stringField) temp
       """.stripMargin), Row(1000))
-    } finally {
-      sql(s"DROP TABLE IF EXISTS $tableName").collect()
-      sql(s"DROP TABLE IF EXISTS $bucketTableName").collect()
     }
   }
 
+  test("test insert stage command with secondary index and bloomfilter") {
+    createTable
+    // create si and bloom index
+    sql(s"drop index if exists si_1 on $tableName")
+    sql(s"drop index if exists bloom_1 on $tableName")
+    sql(s"create index si_1 on $tableName(stringField1) as 'carbondata'")
+    sql(s"create index bloom_1 on $tableName(intField) as 'bloomfilter'")
+    try {
+      val tablePath = storeLocation + "/" + tableName + "/"
+      val writerProperties = newWriterProperties(dataTempPath)
+      val carbonProperties = newCarbonProperties(storeLocation)
+
+      val environment = StreamExecutionEnvironment.getExecutionEnvironment
+      environment.enableCheckpointing(2000L)
+      executeFlinkStreamingEnvironment(environment, writerProperties, carbonProperties)
+
+      // check count before insert stage
+      checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(0)))
+      checkAnswer(sql("select count(*) from si_1"), Seq(Row(0)))
+
+      sql(s"INSERT INTO $tableName STAGE")
+
+      checkAnswer(sql("select count(*) from si_1"), Seq(Row(1000)))
+      checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(1000)))
+      // check if query hits si
+      val df = sql(s"select stringField, intField from $tableName where stringField1 = 'si12'")
+      checkAnswer(df, Seq(Row("test12", 12)))
+      var isFilterHitSecondaryIndex = false
+      df.queryExecution.sparkPlan.transform {
+        case broadCastSIFilterPushDown: BroadCastSIFilterPushJoin =>
+          isFilterHitSecondaryIndex = true
+          broadCastSIFilterPushDown
+      }
+      assert(isFilterHitSecondaryIndex)
+
+      // check if query hits bloom filter
+      checkAnswer(sql(s"select intField,stringField1 from $tableName where intField = 99"), Seq(Row(99, "si99")))
+      CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true")
+      val explainBloom = sql(s"explain select intField,stringField1 from $tableName where intField = 99").collect()
+      assert(explainBloom(0).getString(0).contains(
+        """
+          |Table Scan on test_flink
+          | - total: 1 blocks, 1 blocklets
+          | - filter: (intfield <> null and intfield = 99)
+          | - pruned by Main Index
+          |    - skipped: 0 blocks, 0 blocklets
+          | - pruned by CG Index
+          |    - name: bloom_1
+          |    - provider: bloomfilter
+          |    - skipped: 0 blocks, 0 blocklets""".stripMargin))
+      checkIfStageFilesAreDeleted(tablePath)
+    }
+  }
+
+  test("test insert stage command with materilaized view") {
+    createTable
+    // create materialized view
+    sql(s"drop materialized view if exists mv_1")
+    sql(s"create materialized view mv_1 as select stringField, shortField from $tableName where intField=99 ")
+    try {
+      val tablePath = storeLocation + "/" + tableName + "/"
+      val writerProperties = newWriterProperties(dataTempPath)
+      val carbonProperties = newCarbonProperties(storeLocation)
+
+      val environment = StreamExecutionEnvironment.getExecutionEnvironment
+      environment.enableCheckpointing(2000L)
+      executeFlinkStreamingEnvironment(environment, writerProperties, carbonProperties)
+      // check count before insert stage
+      checkAnswer(sql(s"SELECT count(*) FROM $tableName"), Seq(Row(0)))
+      checkAnswer(sql(s"SELECT count(*) FROM mv_1"), Seq(Row(0)))
+
+      sql(s"INSERT INTO $tableName STAGE")
+      checkAnswer(sql(s"SELECT count(*) FROM mv_1"), Seq(Row(1)))
+      val df = sql(s"select stringField, shortField from $tableName where intField=99")
+      val tables = df.queryExecution.optimizedPlan collect {
+        case l: LogicalRelation => l.catalogTable.get
+      }
+      assert(tables.exists(_.identifier.table.equalsIgnoreCase("mv_1")))
+      checkAnswer(df, Seq(Row("test99",12345)))
+      checkIfStageFilesAreDeleted(tablePath)
+    }
+  }
+
+  private def executeFlinkStreamingEnvironment(environment: StreamExecutionEnvironment,
+      writerProperties: Properties,
+      carbonProperties: Properties): JobExecutionResult = {
+    val tablePath = storeLocation + "/" + tableName + "/"
+    environment.setParallelism(1)
+    environment.setRestartStrategy(RestartStrategies.noRestart)
+    val dataCount = 1000
+    val source = new TestSource(dataCount) {
+      @throws[InterruptedException]
+      override def get(index: Int): Array[AnyRef] = {
+        Thread.sleep(1L)
+        val data = new Array[AnyRef](4)
+        data(0) = "test" + index
+        data(1) = index.asInstanceOf[AnyRef]
+        data(2) = 12345.asInstanceOf[AnyRef]
+        data(3) = "si" + index
+        data
+      }
+      @throws[InterruptedException]
+      override def onFinish(): Unit = {
+        Thread.sleep(5000L)
+      }
+    }
+    val stream = environment.addSource(source)
+    val factory = CarbonWriterFactory.builder("Local").build(
+      "default",
+      tableName,
+      tablePath,
+      new Properties,
+      writerProperties,
+      carbonProperties)
+    val streamSink = StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), factory).build
+    stream.addSink(streamSink)
+
+    try environment.execute
+    catch {
+      case exception: Exception =>
+        // TODO
+        throw new UnsupportedOperationException(exception)
+    }
+  }
+
+  private def checkIfStageFilesAreDeleted(tablePath: String): Unit = {
+    // ensure the stage snapshot file and all stage files are deleted
+    assertResult(false)(FileFactory.isFileExist(CarbonTablePath.getStageSnapshotFile(tablePath)))
+    assertResult(true)(FileFactory.getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty)
+  }
+
+  private def createTable = {
+    sql(s"DROP TABLE IF EXISTS $tableName")
+    sql(s"""
+           | CREATE TABLE $tableName (stringField string, intField int, shortField short, stringField1 string)
+           | STORED AS carbondata
+      """.stripMargin)
+  }
+
+  override def afterAll(): Unit = {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS,
+        CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT)
+    sql(s"DROP TABLE IF EXISTS $tableName")
+    sql(s"DROP TABLE IF EXISTS $bucketTableName").collect()
+  }
 
-  private def newWriterProperties(
-    dataTempPath: String,
-    storeLocation: String) = {
+  private def newWriterProperties(dataTempPath: String) = {
     val properties = new Properties
     properties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, dataTempPath)
     properties
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/LuceneTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/LuceneTestCase.scala
index aa09696..a13021c 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/LuceneTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/LuceneTestCase.scala
@@ -31,11 +31,11 @@ class LuceneTestCase extends QueryTest with BeforeAndAfterAll {
   val csvPath = s"$resourcesPath/source.csv"
 
   override protected def beforeAll(): Unit = {
-    sql("DROP TABLE IF EXISTS datamap_main")
+    sql("DROP TABLE IF EXISTS index_main")
   }
 
   //Create Lucene Index With DMProperties(String DataType) on MainTable
-  test("LuceneDataMap_TC001", Include) {
+  test("Luceneindex_TC001", Include) {
     sql("DROP TABLE IF EXISTS index_main")
     sql(
       "CREATE TABLE index_main (id Int, date date, country string,name String, phonetype " +
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/events/DropDataMapEvents.scala b/integration/spark/src/main/scala/org/apache/carbondata/events/DropIndexEvents.scala
similarity index 100%
rename from integration/spark/src/main/scala/org/apache/carbondata/events/DropDataMapEvents.scala
rename to integration/spark/src/main/scala/org/apache/carbondata/events/DropIndexEvents.scala
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index a8fda4d..913fe1f 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -625,7 +625,7 @@ object CarbonDataRDDFactory {
         true
       } catch {
         case ex: Exception =>
-          LOGGER.error("Problem while committing data maps", ex)
+          LOGGER.error("Problem while committing indexes", ex)
           false
       }
       if (!done || !commitComplete) {
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index 152c6e7..0502d57 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -33,12 +33,11 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
-import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
+import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
 import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
 import org.apache.carbondata.processing.util.CarbonBadRecordUtil
 import org.apache.carbondata.streaming.segment.StreamSegment
@@ -109,10 +108,6 @@ object StreamSinkFactory {
     val segmentId = getStreamSegmentId(carbonTable)
     carbonLoadModel.setSegmentId(segmentId)
 
-    // Used to generate load commands for child tables in case auto-handoff is fired.
-    val loadMetaEvent = new LoadMetadataEvent(carbonTable, false, parameters.asJava)
-    OperationListenerBus.getInstance().fireEvent(loadMetaEvent, operationContext)
-
     // default is carbon appended stream sink
     val carbonAppendableStreamSink = new CarbonAppendableStreamSink(
       sparkSession,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index de954e6..af2ff3c 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -103,7 +103,7 @@ class CarbonSession(@transient val sc: SparkContext,
    */
   @InterfaceAudience.Developer(Array("Index"))
   def isIndexHit(sqlStatement: String, indexName: String): Boolean = {
-    // explain command will output the dataMap information only if enable.query.statistics = true
+    // explain command will output the index information only if enable.query.statistics = true
     val message = sql(s"EXPLAIN $sqlStatement").collect()
     message(0).getString(0).contains(indexName)
   }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
index 14fa4ac..2995edc 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
@@ -29,6 +29,8 @@ import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.MergeIndexUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.index.Segment
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.{DataLoadMetrics, ObjectSerializationUtil}
@@ -42,6 +44,11 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
   override def onEvent(event: Event, operationContext: OperationContext): Unit = {
     event match {
       case preStatusUpdateEvent: LoadTablePreStatusUpdateEvent =>
+        // skip merge index in case of insert stage flow
+        if (null != operationContext.getProperty(CarbonCommonConstants.IS_INSERT_STAGE) &&
+          operationContext.getProperty(CarbonCommonConstants.IS_INSERT_STAGE).equals("true")) {
+          return
+        }
         LOGGER.info("Load post status event-listener called for merge index")
         val loadModel = preStatusUpdateEvent.getCarbonLoadModel
         val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable
@@ -90,7 +97,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
             loadModel.setMetrics(metrics)
             LOGGER.info("Total time taken for merge index " +
                         (System.currentTimeMillis() - startTime))
-            // clear Block dataMap Cache
+            // clear Block index Cache
             MergeIndexUtil.clearBlockIndexCache(carbonTable, Seq(loadModel.getSegmentId))
           }
         }
@@ -148,7 +155,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
                 readFileFooterFromCarbonDataFile = true)
               LOGGER.info("Total time taken for merge index "
                           + (System.currentTimeMillis() - startTime) + "ms")
-              // clear Block dataMap Cache
+              // clear Block index Cache
               MergeIndexUtil.clearBlockIndexCache(carbonMainTable, segmentsToMerge)
               val requestMessage = "Compaction request completed for table " +
                 s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }"
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
index 085ed80..e7fd07e 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
@@ -67,8 +67,8 @@ case class CarbonShowCacheCommand(showExecutorCache: Boolean,
       } else {
         Seq(
           AttributeReference("Identifier", StringType, nullable = false)(),
-          AttributeReference("Index size", StringType, nullable = false)(),
-          AttributeReference("Datamap size", StringType, nullable = false)(),
+          AttributeReference("Table Index size", StringType, nullable = false)(),
+          AttributeReference("CgAndFg Index size", StringType, nullable = false)(),
           AttributeReference("Cache Location", StringType, nullable = false)())
       }
     } else {
@@ -217,9 +217,9 @@ case class CarbonShowCacheCommand(showExecutorCache: Boolean,
         carbonTable =>
           carbonTable.getTablePath
       }
-      val (driverIndexSize, driverTableIndexSize) = getAllDriverCacheSize(tablePaths.toList)
-      if (driverIndexSize + driverTableIndexSize != 0 && driverRows.nonEmpty) {
-        (Seq(Row("TOTAL", driverIndexSize, driverTableIndexSize, "DRIVER")) ++
+      val (driverIndexSize, allCgAndFgIndexSize) = getAllDriverCacheSize(tablePaths.toList)
+      if (driverIndexSize + allCgAndFgIndexSize != 0 && driverRows.nonEmpty) {
+        (Seq(Row("TOTAL", driverIndexSize, allCgAndFgIndexSize, "DRIVER")) ++
          driverRows).collect {
           case row if row.getLong(1) != 0L || row.getLong(2) != 0L =>
             Row(row(0), bytesToDisplaySize(row.getLong(1)),
@@ -393,7 +393,6 @@ case class CarbonShowCacheCommand(showExecutorCache: Boolean,
     (sparkSession: SparkSession): List[(String, String, String)] = {
     val showTableCacheEvent = ShowTableCacheEvent(carbonTable, sparkSession, internalCall)
     val operationContext = new OperationContext
-    // datamapName -> (datamapProviderName, indexSize, datamapSize)
     operationContext.setProperty(carbonTable.getTableUniqueName, List())
     OperationListenerBus.getInstance.fireEvent(showTableCacheEvent, operationContext)
     operationContext.getProperty(carbonTable.getTableUniqueName)
@@ -404,21 +403,21 @@ case class CarbonShowCacheCommand(showExecutorCache: Boolean,
     val cache = CacheProvider.getInstance().getCarbonCache
     // Scan whole cache and fill the entries for All-Database-All-Tables
     // and Current-Database-All-Tables
-    var (allIndexSize, allDatamapSize) = (0L, 0L)
+    var (allTableIndexSize, allCgAndFgIndexSize) = (0L, 0L)
     var dbIndexSize = 0L
     cache.getCacheMap.asScala.foreach {
       case (key, cacheable) =>
         cacheable match {
           case _: BlockletIndexWrapper =>
-            allIndexSize += cacheable.getMemorySize
+            allTableIndexSize += cacheable.getMemorySize
             if (tablePaths.exists { path => key.startsWith(path) }) {
               dbIndexSize += cacheable.getMemorySize
             }
           case _ =>
-            allDatamapSize += cacheable.getMemorySize
+            allCgAndFgIndexSize += cacheable.getMemorySize
         }
     }
-    (allIndexSize, allDatamapSize)
+    (allTableIndexSize, allCgAndFgIndexSize)
   }
 
   private def collectDriverMetaCacheInfo(tableName: String,
@@ -449,18 +448,18 @@ case class CarbonShowCacheCommand(showExecutorCache: Boolean,
   }
 
   private def getIndexServerCacheSizeForCurrentDB: (Long, Long) = {
-    var (allIndexSize, allDatamapSize) = (0L, 0L)
+    var (allIndexSize, allCgAndFgIndexSize) = (0L, 0L)
     val bloomFilterIdentifier = IndexType.BLOOMFILTER.getIndexProviderName
     cacheResult.foreach {
       case (_, _, sum, provider) =>
         provider.toLowerCase match {
           case `bloomFilterIdentifier` =>
-            allDatamapSize += sum
+            allCgAndFgIndexSize += sum
           case _ =>
             allIndexSize += sum
         }
     }
-    (allIndexSize, allDatamapSize)
+    (allIndexSize, allCgAndFgIndexSize)
   }
 
 }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
index f7e8b42..facbb58 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
@@ -314,7 +314,7 @@ case class CarbonAddLoadCommand(
         true
       } catch {
         case ex: Exception =>
-          LOGGER.error("Problem while committing data maps", ex)
+          LOGGER.error("Problem while committing indexes", ex)
           false
       }
       commitComplete
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index dd1efe8..e22b89e 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -42,8 +42,10 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager, StageInput}
 import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.hadoop.CarbonInputSplit
 import org.apache.carbondata.processing.loading.FailureCauses
+import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreStatusUpdateEvent}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
@@ -310,11 +312,53 @@ case class CarbonInsertFromStageCommand(
       // 4) write segment file and update the segment entry to SUCCESS
       val segmentFileName = SegmentFileStore.writeSegmentFile(
         table, loadModel.getSegmentId, loadModel.getFactTimeStamp.toString)
-      SegmentFileStore.updateTableStatusFile(
+      // create operationContext to fire load events
+      val operationContext: OperationContext = new OperationContext
+      val (tableIndexes, indexOperationContext) = CommonLoadUtils.firePreLoadEvents(
+        sparkSession = spark,
+        carbonLoadModel = loadModel,
+        uuid = "",
+        factPath = "",
+        optionsFinal = options.asJava,
+        options = options.asJava,
+        isOverwriteTable = false,
+        isDataFrame = true,
+        updateModel = None,
+        operationContext = operationContext)
+      // in case of insert stage files, added the below property to avoid merge index and
+      // fire event to load data to secondary index
+      operationContext.setProperty(CarbonCommonConstants.IS_INSERT_STAGE, "true")
+      val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
+        new LoadTablePreStatusUpdateEvent(
+          table.getCarbonTableIdentifier,
+          loadModel)
+      OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext)
+
+      val status = SegmentFileStore.updateTableStatusFile(
         table, loadModel.getSegmentId, segmentFileName,
         table.getCarbonTableIdentifier.getTableId,
         new SegmentFileStore(table.getTablePath, segmentFileName),
         SegmentStatus.SUCCESS)
+
+      // trigger load post events
+      if (status) {
+        val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
+          new LoadTablePostStatusUpdateEvent(loadModel)
+        try {
+          OperationListenerBus.getInstance()
+            .fireEvent(loadTablePostStatusUpdateEvent, operationContext)
+        } catch {
+          case ex: Exception =>
+            LOGGER.error("Problem while committing indexes", ex)
+        }
+      }
+      // fire event to load data to materialized views and merge bloom index files
+      CommonLoadUtils.firePostLoadEvents(spark,
+        loadModel,
+        tableIndexes,
+        indexOperationContext,
+        table,
+        operationContext)
     } catch {
       case ex: Throwable =>
         LOGGER.error(s"failed to insert ${table.getDatabaseName}.${table.getTableName}", ex)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index f0b2287..919ce91 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -99,7 +99,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
       if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
         throw new ConcurrentOperationException(carbonTable, "loading", "alter table rename")
       }
-      // invalid data map for the old table, see CARBON-1690
+      // invalid index for the old table, see CARBON-1690
       val oldAbsoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
       IndexStoreManager.getInstance().clearIndex(oldAbsoluteTableIdentifier)
       // get the latest carbon table and check for column existence
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/Jobs/BlockletIndexDetailsWithSchema.java b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/Jobs/BlockletIndexDetailsWithSchema.java
index 4f396e4..ee7ff7e 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/Jobs/BlockletIndexDetailsWithSchema.java
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/Jobs/BlockletIndexDetailsWithSchema.java
@@ -26,7 +26,7 @@ import org.apache.carbondata.core.indexstore.blockletindex.BlockIndex;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 
 /**
- * class that holds dataMaps, column cardinality, columnSchema and other related information for
+ * class that holds indexes, column cardinality, columnSchema and other related information for
  * BlockletIndexInputFormate return value
  * TODO: When this code is moved to open source, this class can be removed and the required code
  * can be added to BlockletIndexWrapper class
@@ -44,8 +44,8 @@ public class BlockletIndexDetailsWithSchema implements Serializable {
     this.blockletIndexWrapper = blockletIndexWrapper;
     List<BlockIndex> indexes = blockletIndexWrapper.getIndexes();
     if (!indexes.isEmpty()) {
-      // In one task all dataMaps will have the same cardinality and schema therefore
-      // segmentPropertyIndex can be fetched from one dataMap
+      // In one task all indexes will have the same cardinality and schema therefore
+      // segmentPropertyIndex can be fetched from one index
       SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper
           segmentPropertiesWrapper = indexes.get(0).getSegmentPropertiesWrapper();
       // flag to check whether carbon table schema is modified. ColumnSchemaList will be
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/Jobs/BlockletIndexInputFormat.java b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/Jobs/BlockletIndexInputFormat.java
index 71f445d..32cade2 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/Jobs/BlockletIndexInputFormat.java
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/Jobs/BlockletIndexInputFormat.java
@@ -57,7 +57,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.log4j.Logger;
 
 /**
- * class to load blocklet data map
+ * class to load blocklet index
  */
 public class BlockletIndexInputFormat
     extends FileInputFormat<TableBlockIndexUniqueIdentifier, BlockletIndexDetailsWithSchema>
@@ -179,7 +179,7 @@ public class BlockletIndexInputFormat
             String segmentId =
                 tableBlockIndexUniqueIdentifierWrapper.getTableBlockIndexUniqueIdentifier()
                     .getSegmentId();
-            // as segmentId will be same for all the dataMaps and segmentProperties cache is
+            // as segmentId will be same for all the indexes and segmentProperties cache is
             // maintained at segment level so it need to be called only once for clearing
             SegmentPropertiesAndSchemaHolder.getInstance()
                 .invalidate(segmentId, wrapper.getIndexes().get(0).getSegmentPropertiesWrapper(),
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/Jobs/SparkBlockletIndexLoaderJob.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/Jobs/SparkBlockletIndexLoaderJob.scala
index 8327fcf..288c1e0 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/Jobs/SparkBlockletIndexLoaderJob.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/Jobs/SparkBlockletIndexLoaderJob.scala
@@ -119,7 +119,7 @@ class IndexCacher(
         TableBlockIndexUniqueIdentifierWrapper(
           indexWrapper._1,
           carbonTable)
-    // add dataMap to cache
+    // add indexes to cache
     cacheableIndex
       .cache(tableBlockIndexUniqueIdentifierWrapper,
         indexWrapper._2.getBlockletIndexWrapper)
@@ -134,7 +134,7 @@ class IndexLoaderPartition(rddId: Int, idx: Int, val inputSplit: InputSplit)
 }
 
 /**
- * This RDD is used to load the dataMaps of a segment
+ * This RDD is used to load the indexes of a segment
  *
  * @param ss
  * @param indexFormat
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala b/integration/spark/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
index 4cc6cb1..ef9d1b2 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
@@ -121,6 +121,7 @@ class QueryTest extends PlanTest {
   val integrationPath = TestQueryExecutor.integrationPath
   val dblocation = TestQueryExecutor.location
   val defaultParallelism = sqlContext.sparkContext.defaultParallelism
+  val targetTestClass = System.getProperty("user.dir") + "/target/test-classes"
 
   def defaultConfig(): Unit = {
     CarbonEnv.getInstance(sqlContext.sparkSession).carbonSessionInfo.getSessionParams.clear()
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index ce72a1f..3106a38 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -800,7 +800,7 @@ object AlterTableUtil {
   }
 
   private def clearCache(carbonTable: CarbonTable): Unit = {
-    // clear dataMap cache
+    // clear indexes cache
     IndexStoreManager.getInstance().clearIndex(carbonTable.getAbsoluteTableIdentifier)
     // clear segmentProperties Cache
     SegmentPropertiesAndSchemaHolder.getInstance()
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexFunctionSuite.scala b/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexFunctionSuite.scala
index cb7c7b3..d5f4f1a 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexFunctionSuite.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexFunctionSuite.scala
@@ -37,7 +37,7 @@ import org.apache.carbondata.core.index.status.IndexStatus
 import org.apache.carbondata.core.metadata.index.IndexType
 
 class BloomCoarseGrainIndexFunctionSuite  extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
-  val bigFile = s"$resourcesPath/bloom_datamap_function_test_big.csv"
+  val bigFile = s"$resourcesPath/bloom_index_function_test_big.csv"
   val normalTable = "carbon_normal"
   val bloomSampleTable = "carbon_bloom"
   val indexName = "bloom_dm"
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexSuite.scala b/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexSuite.scala
index d9135d6..67e62de 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexSuite.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexSuite.scala
@@ -34,8 +34,8 @@ import org.apache.carbondata.core.util.CarbonProperties
 
 class BloomCoarseGrainIndexSuite extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
   val carbonSession = sqlContext.sparkSession
-  val bigFile = s"$resourcesPath/bloom_datamap_input_big.csv"
-  val smallFile = s"$resourcesPath/bloom_datamap_input_small.csv"
+  val bigFile = s"$resourcesPath/bloom_index_input_big.csv"
+  val smallFile = s"$resourcesPath/bloom_index_input_small.csv"
   val normalTable = "carbon_normal"
   val bloomSampleTable = "carbon_bloom"
   val indexName = "bloom_dm"
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/index/lucene/LuceneCoarseGrainIndexSuite.scala b/integration/spark/src/test/scala/org/apache/carbondata/index/lucene/LuceneCoarseGrainIndexSuite.scala
index 78d651c..5a67e21 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/index/lucene/LuceneCoarseGrainIndexSuite.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/index/lucene/LuceneCoarseGrainIndexSuite.scala
@@ -26,7 +26,7 @@ import org.scalatest.{BeforeAndAfterAll, Ignore}
 @Ignore
 class LuceneCoarseGrainIndexSuite extends QueryTest with BeforeAndAfterAll {
 
-  val file2 = resourcesPath + "/datamap_input.csv"
+  val file2 = resourcesPath + "/index_input.csv"
 
   override protected def beforeAll(): Unit = {
     //n should be about 5000000 of reset if size is default 1024
@@ -42,11 +42,11 @@ class LuceneCoarseGrainIndexSuite extends QueryTest with BeforeAndAfterAll {
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')")
   }
 
-  test("test lucene coarse grain data map") {
-    sql("DROP TABLE IF EXISTS datamap_test")
+  test("test lucene coarse grain index") {
+    sql("DROP TABLE IF EXISTS index_test")
     sql(
       """
-        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+        | CREATE TABLE index_test(id INT, name STRING, city STRING, age INT)
         | STORED AS carbondata
         | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
       """.stripMargin)
@@ -54,20 +54,20 @@ class LuceneCoarseGrainIndexSuite extends QueryTest with BeforeAndAfterAll {
     sql(
       s"""
          | CREATE INDEX dm
-         | ON datamap_test (name, city)
+         | ON index_test (name, city)
          | AS 'lucene'
       """.stripMargin)
 
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE index_test OPTIONS('header'='false')")
 
-    checkAnswer(sql("select * from datamap_test where name='n502670'"),
+    checkAnswer(sql("select * from index_test where name='n502670'"),
       sql("select * from normal_test where name='n502670'"))
   }
 
   override protected def afterAll(): Unit = {
     LuceneFineGrainIndexSuite.deleteFile(file2)
     sql("DROP TABLE IF EXISTS normal_test")
-    sql("DROP TABLE IF EXISTS datamap_test")
+    sql("DROP TABLE IF EXISTS index_test")
   }
 
 }
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/index/lucene/LuceneFineGrainIndexSuite.scala b/integration/spark/src/test/scala/org/apache/carbondata/index/lucene/LuceneFineGrainIndexSuite.scala
index 24b59b5..917ebf3 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/index/lucene/LuceneFineGrainIndexSuite.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/index/lucene/LuceneFineGrainIndexSuite.scala
@@ -39,7 +39,7 @@ class LuceneFineGrainIndexSuite extends QueryTest with BeforeAndAfterAll {
     CarbonCommonConstants.USE_DISTRIBUTED_INDEX,
     CarbonCommonConstants.USE_DISTRIBUTED_INDEX_DEFAULT
   )
-  val file2 = resourcesPath + "/datamap_input.csv"
+  val file2 = resourcesPath + "/index_input.csv"
 
   override protected def beforeAll(): Unit = {
     sql("drop database if exists lucene cascade")
@@ -102,7 +102,7 @@ class LuceneFineGrainIndexSuite extends QueryTest with BeforeAndAfterAll {
     assertResult("Only String column is supported, column 'id' is INT type. ")(exception.getMessage)
   }
 
-  test("test lucene fine grain data map") {
+  test("test lucene fine grain index") {
     sql("drop index if exists dm on table index_test")
     sql(
       s"""
@@ -143,7 +143,7 @@ class LuceneFineGrainIndexSuite extends QueryTest with BeforeAndAfterAll {
     assert(exception.getMessage.contains("Non-lazy index index1 does not support manual refresh"))
   }
 
-  ignore("test lucene rebuild data map") {
+  ignore("test lucene rebuild index") {
     sql("DROP TABLE IF EXISTS index_test4")
     sql(
       """
@@ -169,7 +169,7 @@ class LuceneFineGrainIndexSuite extends QueryTest with BeforeAndAfterAll {
     sql("drop table index_test4")
   }
 
-  test("test lucene fine grain data map drop") {
+  test("test lucene fine grain index drop") {
     sql("DROP TABLE IF EXISTS index_test1")
     sql(
       """
@@ -199,7 +199,7 @@ class LuceneFineGrainIndexSuite extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS index_test1")
   }
 
-  test("test lucene fine grain data map show") {
+  test("test lucene fine grain index show") {
     sql("DROP TABLE IF EXISTS index_test2")
     sql("DROP TABLE IF EXISTS index_test3")
     sql(
@@ -237,7 +237,7 @@ class LuceneFineGrainIndexSuite extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS index_test3")
   }
 
-  test("test lucene fine grain data map with wildcard matching ") {
+  test("test lucene fine grain index with wildcard matching ") {
     sql("DROP TABLE IF EXISTS index_test_table")
     sql(
       """
@@ -259,7 +259,7 @@ class LuceneFineGrainIndexSuite extends QueryTest with BeforeAndAfterAll {
     sql("drop index if exists dm on table index_test_table")
   }
 
-  test("test lucene fine grain data map with TEXT_MATCH 'AND' Filter ") {
+  test("test lucene fine grain index with TEXT_MATCH 'AND' Filter ") {
     sql("DROP TABLE IF EXISTS index_test_table")
     sql(
       """
@@ -280,7 +280,7 @@ class LuceneFineGrainIndexSuite extends QueryTest with BeforeAndAfterAll {
     sql("drop index if exists dm on table index_test_table")
   }
 
-  test("test lucene fine grain data map with TEXT_MATCH 'OR' Filter ") {
+  test("test lucene fine grain index with TEXT_MATCH 'OR' Filter ") {
     sql("DROP TABLE IF EXISTS index_test_table")
     sql(
       """
@@ -301,7 +301,7 @@ class LuceneFineGrainIndexSuite extends QueryTest with BeforeAndAfterAll {
     sql("drop index if exists dm on table index_test_table")
   }
 
-  test("test lucene fine grain data map with TEXT_MATCH 'AND' and 'OR' Filter ") {
+  test("test lucene fine grain index with TEXT_MATCH 'AND' and 'OR' Filter ") {
     sql("DROP TABLE IF EXISTS index_test_table")
     sql(
       """
@@ -324,7 +324,7 @@ class LuceneFineGrainIndexSuite extends QueryTest with BeforeAndAfterAll {
     sql("drop index if exists dm on table index_test_table")
   }
 
-  test("test lucene fine grain data map with compaction-Major ") {
+  test("test lucene fine grain index with compaction-Major ") {
     sql("DROP TABLE IF EXISTS index_test_table")
     sql(
       """
@@ -349,7 +349,7 @@ class LuceneFineGrainIndexSuite extends QueryTest with BeforeAndAfterAll {
     sql("drop index if exists dm on table index_test_table")
   }
 
-  test("test lucene fine grain data map with compaction-Minor ") {
+  test("test lucene fine grain index with compaction-Minor ") {
     sql("DROP TABLE IF EXISTS index_test_table")
     sql(
       """
@@ -375,7 +375,7 @@ class LuceneFineGrainIndexSuite extends QueryTest with BeforeAndAfterAll {
     sql("drop index if exists dm on table index_test_table")
   }
 
-  test("test lucene fine grain data map with GLOBAL_SORT_SCOPE ") {
+  test("test lucene fine grain index with GLOBAL_SORT_SCOPE ") {
     sql("DROP TABLE IF EXISTS index_test_table")
     sql(
       """
@@ -423,7 +423,7 @@ class LuceneFineGrainIndexSuite extends QueryTest with BeforeAndAfterAll {
     sql("drop index if exists dm2 on table index_test_table")
   }
 
-  test("test lucene fine grain data map with TEXT_MATCH 'NOT' Filter ") {
+  test("test lucene fine grain index with TEXT_MATCH 'NOT' Filter ") {
     sql("DROP TABLE IF EXISTS index_test_table")
     sql(
       """
@@ -452,7 +452,7 @@ class LuceneFineGrainIndexSuite extends QueryTest with BeforeAndAfterAll {
     sql("drop index if exists dm on table index_test_table")
   }
 
-  test("test lucene fine grain data map with CTAS") {
+  test("test lucene fine grain index with CTAS") {
     sql("DROP TABLE IF EXISTS source_table")
     sql("DROP TABLE IF EXISTS target_table")
     sql(
@@ -481,7 +481,7 @@ class LuceneFineGrainIndexSuite extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS target_table")
   }
 
-  test("test lucene fine grain data map with text-match limit") {
+  test("test lucene fine grain index with text-match limit") {
     sql("DROP TABLE IF EXISTS index_test_limit")
     sql(
       """
@@ -502,7 +502,7 @@ class LuceneFineGrainIndexSuite extends QueryTest with BeforeAndAfterAll {
     sql("drop index dm on table index_test_limit")
   }
 
-  test("test lucene fine grain data map with InsertOverwrite") {
+  test("test lucene fine grain index with InsertOverwrite") {
     sql("DROP TABLE IF EXISTS index_test_overwrite")
     sql(
       """
@@ -628,7 +628,7 @@ class LuceneFineGrainIndexSuite extends QueryTest with BeforeAndAfterAll {
     assert(ex7.getMessage.contains("alter table column rename is not supported"))
   }
 
-  ignore("test lucene fine grain multiple data map on table") {
+  ignore("test lucene fine grain multiple index on table") {
     sql("DROP TABLE IF EXISTS index_test5")
     sql(
       """
@@ -784,9 +784,9 @@ class LuceneFineGrainIndexSuite extends QueryTest with BeforeAndAfterAll {
       sql("select * from table_stop where text_match('suggestion:*is*')").collect().length == 1)
   }
 
-  test("test lucene data map on null values") {
+  test("test lucene index on null values") {
     sql("DROP TABLE IF EXISTS index_test4")
-    sql("DROP TABLE IF EXISTS datamap_copy")
+    sql("DROP TABLE IF EXISTS index_copy")
     sql(
       """
         | CREATE TABLE index_test4(id INT, name STRING, city STRING, age INT)
@@ -796,7 +796,7 @@ class LuceneFineGrainIndexSuite extends QueryTest with BeforeAndAfterAll {
       """.stripMargin)
     sql(
       """
-        | CREATE TABLE datamap_copy(id INT, name STRING, city STRING, age INT)
+        | CREATE TABLE index_copy(id INT, name STRING, city STRING, age INT)
         | STORED AS carbondata
         | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT',
         | 'CACHE_LEVEL'='BLOCKLET')
@@ -804,7 +804,7 @@ class LuceneFineGrainIndexSuite extends QueryTest with BeforeAndAfterAll {
     sql("insert into index_test4 select 1,'name','city',20")
     sql("insert into index_test4 select 2,'name1','city1',20")
     sql("insert into index_test4 select 25,cast(null as string),'city2',NULL")
-    sql("insert into datamap_copy select * from index_test4")
+    sql("insert into index_copy select * from index_test4")
     sql(
       s"""
          | CREATE INDEX dm4
@@ -812,9 +812,9 @@ class LuceneFineGrainIndexSuite extends QueryTest with BeforeAndAfterAll {
          | AS 'lucene'
       """.stripMargin)
     checkAnswer(sql("SELECT * FROM index_test4 WHERE TEXT_MATCH('name:n*')"),
-      sql(s"select * from datamap_copy where name like '%n%'"))
+      sql(s"select * from index_copy where name like '%n%'"))
     sql("drop table index_test4")
-    sql("drop table datamap_copy")
+    sql("drop table index_copy")
   }
 
   test("test create index: unable to create same index for one column") {
@@ -881,7 +881,7 @@ class LuceneFineGrainIndexSuite extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS index_test4")
     sql("DROP TABLE IF EXISTS index_test5")
     sql("DROP TABLE IF EXISTS index_test7")
-    sql("DROP TABLE IF EXISTS datamap_main")
+    sql("DROP TABLE IF EXISTS index_main")
     sql("DROP TABLE IF EXISTS table_stop")
     sql("use default")
     sql("drop database if exists lucene cascade")
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/index/CGIndexTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/index/CGIndexTestCase.scala
index 40a9639..59ca8c1 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/index/CGIndexTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/index/CGIndexTestCase.scala
@@ -175,7 +175,7 @@ class CGIndex extends CoarseGrainIndex {
   var shardName: String = _
 
   /**
-   * It is called to load the data map to memory or to initialize it.
+   * It is called to load the index to memory or to initialize it.
    */
   override def init(indexModel: IndexModel): Unit = {
     val indexPath = FileFactory.getPath(indexModel.getFilePath)
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/index/FGIndexTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/index/FGIndexTestCase.scala
index 4f0b49e..75346e1 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/index/FGIndexTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/index/FGIndexTestCase.scala
@@ -171,7 +171,7 @@ class FGIndex extends FineGrainIndex {
   var taskName:String = _
 
   /**
-   * It is called to load the data map to memory or to initialize it.
+   * It is called to load the index to memory or to initialize it.
    */
   override def init(indexModel: IndexModel): Unit = {
     this.filePath = indexModel.getFilePath
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestMVTimeSeriesCarbonCreateIndexCommand.scala b/integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala
similarity index 99%
rename from integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestMVTimeSeriesCarbonCreateIndexCommand.scala
rename to integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala
index 2572066..57b8ce0 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestMVTimeSeriesCarbonCreateIndexCommand.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/view/timeseries/TestCreateMVWithTimeSeries.scala
@@ -26,7 +26,7 @@ import org.apache.carbondata.view.rewrite.TestUtil
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
-class TestMVTimeSeriesCarbonCreateIndexCommand extends QueryTest with BeforeAndAfterAll {
+class TestCreateMVWithTimeSeries extends QueryTest with BeforeAndAfterAll {
 
   private val timestampFormat = CarbonProperties.getInstance()
     .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT)
diff --git a/processing/src/main/java/org/apache/carbondata/processing/index/IndexWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/index/IndexWriterListener.java
index 4c14223..7f24757 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/index/IndexWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/index/IndexWriterListener.java
@@ -48,7 +48,7 @@ public class IndexWriterListener {
   private static final Logger LOG = LogServiceFactory.getLogService(
       IndexWriterListener.class.getCanonicalName());
 
-  // list indexed column -> list of data map writer
+  // list indexed column -> list of index writer
   private Map<List<CarbonColumn>, List<IndexWriter>> registry = new ConcurrentHashMap<>();
   // table for this listener
   private CarbonTableIdentifier tblIdentifier;
@@ -91,7 +91,7 @@ public class IndexWriterListener {
     assert (segmentId != null);
     IndexMeta meta = factory.getMeta();
     if (meta == null) {
-      // if data map does not have meta, no need to register
+      // if index does not have meta, no need to register
       return;
     }
     List<CarbonColumn> columns = factory.getMeta().getIndexedColumns();
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
index 7759b02..924cd62 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
@@ -20,7 +20,6 @@ package org.apache.carbondata.processing.loading.events;
 import java.util.Map;
 
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.events.Event;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 
@@ -135,35 +134,6 @@ public class LoadEvents {
     }
   }
 
-  /**
-   * Load Even class will be fired from the Load and compaction class
-   * to creating all the load commands for all preaggregate data map
-   */
-  public static class LoadMetadataEvent extends Event {
-    private CarbonTable carbonTable;
-    private boolean isCompaction;
-    private Map<String, String> options;
-
-    public LoadMetadataEvent(CarbonTable carbonTable, boolean isCompaction,
-        Map<String, String> options) {
-      this.carbonTable = carbonTable;
-      this.isCompaction = isCompaction;
-      this.options = options;
-    }
-
-    public boolean isCompaction() {
-      return isCompaction;
-    }
-
-    public CarbonTable getCarbonTable() {
-      return carbonTable;
-    }
-
-    public Map<String, String> getOptions() {
-      return options;
-    }
-  }
-
   public static class LoadTablePostStatusUpdateEvent extends Event {
     private CarbonLoadModel carbonLoadModel;
 
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 0dd17ee..18911f4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -140,7 +140,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
   protected List<org.apache.carbondata.format.BlockletIndex> blockletIndex;
 
   /**
-   * listener to write data map
+   * listener to write index
    */
   protected IndexWriterListener listener;
   /**