You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2020/03/23 05:54:07 UTC

[carbondata] branch master updated: [CARBONDATA-3718] Support SegmentLevel MinMax for better Pruning and less driver memory usage for cache

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new d3204f7  [CARBONDATA-3718] Support SegmentLevel MinMax for better Pruning and less driver memory usage for cache
d3204f7 is described below

commit d3204f7b47ae2a3e1c77748774591a58c3e00757
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Thu Jan 9 15:41:44 2020 +0530

    [CARBONDATA-3718] Support SegmentLevel MinMax for better Pruning and less driver memory usage for cache
    
    Why is this PR needed?
    In Cloud scenarios, index is too big to store in SparkDriver, since VM may not have
    so much memory. Currently in Carbon, we will load all indexes to cache for first time query.
    Since Carbon LRU Cache does not support time-based expiration, indexes will be removed from
    cache based on LeastRecentlyUsed mechanism, when the carbon lru cache is full.
    
    In some scenarios, where user's table has more segments and if user queries only very
    few segments often, we no need to load all indexes to cache. For filter queries,
    if we prune and load only matched segments to cache,
    then driver's memory will be saved.
    
    What changes were proposed in this PR?
    Added all block minmax with column-id and sort_column info to segment metadata file
    and prune segment based on segment files and load index only for matched segment. Added a configurable carbon property 'carbon.load.all.index.to.cache'
    to allow user to load all indexes to cache if needed. BY default, value will
    be true, which loads all indexes to cache.
    
    Does this PR introduce any user interface change?
    Yes.
    
    Is any new testcase added?
    Yes
    
    This closes #3584
---
 .../core/constants/CarbonCommonConstants.java      |  10 ++
 .../apache/carbondata/core/datamap/Segment.java    |  14 ++
 .../carbondata/core/datamap/TableDataMap.java      |  13 +-
 .../core/datamap/dev/DataMapFactory.java           |   7 +-
 .../core/indexstore/SegmentBlockIndexInfo.java     |  54 ++++++
 .../blockletindex/BlockletDataMapFactory.java      | 196 +++++++++++++++++++--
 .../carbondata/core/metadata/SegmentFileStore.java |  87 ++++++++-
 .../metadata/schema/table/column/ColumnSchema.java |  19 +-
 .../TableStatusReadCommittedScope.java             |   1 +
 .../core/segmentmeta/BlockColumnMetaDataInfo.java  |  64 +++++++
 .../segmentmeta/SegmentColumnMetaDataInfo.java     |  79 +++++++++
 .../core/segmentmeta/SegmentMetaDataInfo.java      |  37 ++++
 .../core/segmentmeta/SegmentMetaDataInfoStats.java | 167 ++++++++++++++++++
 docs/configuration-parameters.md                   |   1 +
 .../spark/load/DataLoadProcessBuilderOnSpark.scala |  39 ++--
 .../spark/rdd/CarbonDataRDDFactory.scala           |  88 ++++++---
 .../carbondata/spark/rdd/CarbonIUDMergerRDD.scala  |   8 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala     |  46 ++---
 .../spark/rdd/CarbonTableCompactor.scala           |  42 ++++-
 .../rdd/CompactionTaskCompletionListener.scala     |  77 ++++++++
 .../spark/rdd/InsertTaskCompletionListener.scala   |  19 +-
 .../spark/rdd/NewCarbonDataLoadRDD.scala           |  24 ++-
 .../carbondata/spark/rdd/UpdateDataLoad.scala      |  11 +-
 .../CarbonTaskCompletionListener.scala             |   5 +
 .../management/CarbonInsertFromStageCommand.scala  |  15 +-
 .../command/management/CommonLoadUtils.scala       |  79 ++++++++-
 .../secondaryindex/util/SecondaryIndexUtil.scala   |   1 +
 .../allqueries/TestPruneUsingSegmentMinMax.scala   | 121 +++++++++++++
 .../store/writer/AbstractFactDataWriter.java       |  17 ++
 29 files changed, 1215 insertions(+), 126 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index baf5a37..fbfacec 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2421,5 +2421,15 @@ public final class CarbonCommonConstants {
    */
   public static final int INDEX_CACHE_EXPIRATION_TIME_IN_SECONDS_DEFAULT = Integer.MAX_VALUE;
 
+  /**
+   * Load all indexes to carbon LRU cache
+   */
+  public static final String CARBON_LOAD_ALL_SEGMENT_INDEXES_TO_CACHE =
+      "carbon.load.all.segment.indexes.to.cache";
 
+  /**
+   * Default value for loading cache is true
+   * Make this false, to load index for the matched segments from filter expression
+   */
+  public static final String CARBON_LOAD_ALL_SEGMENT_INDEXES_TO_CACHE_DEFAULT = "true";
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index 708dc1d..9849df2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -31,6 +31,7 @@ import java.util.Set;
 import org.apache.carbondata.core.metadata.schema.table.Writable;
 import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
@@ -85,6 +86,11 @@ public class Segment implements Serializable, Writable {
    */
   private transient Map<String, String> options;
 
+  /**
+   * Segment metadata info
+   */
+  private SegmentMetaDataInfo segmentMetaDataInfo;
+
   public Segment() {
 
   }
@@ -376,4 +382,12 @@ public class Segment implements Serializable, Writable {
     }
     this.indexSize = in.readLong();
   }
+
+  public SegmentMetaDataInfo getSegmentMetaDataInfo() {
+    return segmentMetaDataInfo;
+  }
+
+  public void setSegmentMetaDataInfo(SegmentMetaDataInfo segmentMetaDataInfo) {
+    this.segmentMetaDataInfo = segmentMetaDataInfo;
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 62424fe..8d680b9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -120,10 +120,11 @@ public final class TableDataMap extends OperationEventListener {
     final List<ExtendedBlocklet> blocklets = new ArrayList<>();
     List<Segment> segments = getCarbonSegments(allsegments);
     final Map<Segment, List<DataMap>> dataMaps;
-    if (table.isHivePartitionTable() && filter != null && !filter.isEmpty() && partitions != null) {
-      dataMaps = dataMapFactory.getDataMaps(segments, partitions);
+    boolean isFilterPresent = filter != null && !filter.isEmpty();
+    if (table.isHivePartitionTable() && isFilterPresent && partitions != null) {
+      dataMaps = dataMapFactory.getDataMaps(segments, partitions, filter);
     } else {
-      dataMaps = dataMapFactory.getDataMaps(segments);
+      dataMaps = dataMapFactory.getDataMaps(segments, filter);
     }
 
     if (dataMaps.isEmpty()) {
@@ -133,9 +134,9 @@ public final class TableDataMap extends OperationEventListener {
     // for filter queries
     int totalFiles = 0;
     int datamapsCount = 0;
-    // In case if filter has matched partitions, then update the segments with datamap's
-    // segment list, as getDataMaps will return segments that matches the partition.
-    if (null != partitions && !partitions.isEmpty()) {
+    // In case if filter is present, then update the segments with datamap's segment list
+    // based on segment or partition pruning
+    if (isFilterPresent) {
       segments = new ArrayList<>(dataMaps.keySet());
     }
     for (Segment segment : segments) {
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index 6296bf8..711c495 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -26,6 +26,7 @@ import java.util.Set;
 
 import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapFilter;
 import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.Segment;
@@ -89,8 +90,8 @@ public abstract class DataMapFactory<T extends DataMap> {
   /**
    * Get the datamap for all segments
    */
-  public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> segments)
-      throws IOException {
+  public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> segments,
+      DataMapFilter filter) throws IOException {
     Map<Segment, List<CoarseGrainDataMap>> dataMaps = new HashMap<>();
     for (Segment segment : segments) {
       dataMaps.put(segment, (List<CoarseGrainDataMap>) this.getDataMaps(segment));
@@ -103,7 +104,7 @@ public abstract class DataMapFactory<T extends DataMap> {
    * matches the partition.
    */
   public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> segments,
-      List<PartitionSpec> partitionSpecs) throws IOException {
+      List<PartitionSpec> partitionSpecs, DataMapFilter dataMapFilter) throws IOException {
     Map<Segment, List<CoarseGrainDataMap>> dataMaps = new HashMap<>();
     for (Segment segment : segments) {
       dataMaps.put(segment, (List<CoarseGrainDataMap>) this.getDataMaps(segment, partitionSpecs));
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentBlockIndexInfo.java b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentBlockIndexInfo.java
new file mode 100644
index 0000000..95bcf1f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentBlockIndexInfo.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.indexstore;
+
+import java.util.Set;
+
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo;
+
+/**
+ * Holds tableBlockUniqueIdentifiers and block level minMax values for the segment
+ */
+public class SegmentBlockIndexInfo {
+
+  /**
+   * IndexFile's information for the segment
+   */
+  private Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers;
+
+  /**
+   * segment metadata info
+   */
+  private SegmentMetaDataInfo segmentMetaDataInfo;
+
+  public SegmentBlockIndexInfo(
+      Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers,
+      SegmentMetaDataInfo segmentMetaDataInfo) {
+    this.tableBlockIndexUniqueIdentifiers = tableBlockIndexUniqueIdentifiers;
+    this.segmentMetaDataInfo = segmentMetaDataInfo;
+  }
+
+  public Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers() {
+    return tableBlockIndexUniqueIdentifiers;
+  }
+
+  public SegmentMetaDataInfo getSegmentMetaDataInfo() {
+    return segmentMetaDataInfo;
+  }
+
+}
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index b0f0214..44cc6da 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.indexstore.blockletindex;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -26,11 +27,14 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapFilter;
 import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.CacheableDataMap;
@@ -42,6 +46,7 @@ import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactor
 import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.features.TableOperation;
@@ -50,14 +55,21 @@ import org.apache.carbondata.core.indexstore.BlockletDataMapIndexWrapper;
 import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.SegmentBlockIndexInfo;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
 import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifierWrapper;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.segmentmeta.SegmentColumnMetaDataInfo;
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo;
 import org.apache.carbondata.core.util.BlockletDataMapUtil;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
@@ -79,7 +91,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
   private AbsoluteTableIdentifier identifier;
 
   // segmentId -> list of index file
-  private Map<String, Set<TableBlockIndexUniqueIdentifier>> segmentMap = new ConcurrentHashMap<>();
+  private Map<String, SegmentBlockIndexInfo> segmentMap = new ConcurrentHashMap<>();
 
   private Cache<TableBlockIndexUniqueIdentifierWrapper, BlockletDataMapIndexWrapper> cache;
 
@@ -122,16 +134,16 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
   /**
    * Get the datamap for all segments
    */
-  public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> segments)
-      throws IOException {
-    return getDataMaps(segments, null);
+  public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> segments,
+      DataMapFilter filter) throws IOException {
+    return getDataMaps(segments, null, filter);
   }
 
   /**
    * Get the datamap for all segments
    */
   public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> segments,
-      List<PartitionSpec> partitionsToPrune) throws IOException {
+      List<PartitionSpec> partitionsToPrune, DataMapFilter filter) throws IOException {
     List<TableBlockIndexUniqueIdentifierWrapper> tableBlockIndexUniqueIdentifierWrappers =
         new ArrayList<>();
     Map<Segment, List<CoarseGrainDataMap>> dataMaps = new HashMap<>();
@@ -140,9 +152,28 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
       segmentMap.put(segment.getSegmentNo(), segment);
       Set<TableBlockIndexUniqueIdentifier> identifiers =
           getTableBlockIndexUniqueIdentifiers(segment);
-      // get tableBlockIndexUniqueIdentifierWrappers from segment file info
-      getTableBlockUniqueIdentifierWrappers(partitionsToPrune,
-          tableBlockIndexUniqueIdentifierWrappers, identifiers);
+      if (null != partitionsToPrune) {
+        // get tableBlockIndexUniqueIdentifierWrappers from segment file info
+        getTableBlockUniqueIdentifierWrappers(partitionsToPrune,
+            tableBlockIndexUniqueIdentifierWrappers, identifiers);
+      } else {
+        SegmentMetaDataInfo segmentMetaDataInfo = segment.getSegmentMetaDataInfo();
+        boolean isLoadAllIndex = Boolean.parseBoolean(CarbonProperties.getInstance()
+            .getProperty(CarbonCommonConstants.CARBON_LOAD_ALL_SEGMENT_INDEXES_TO_CACHE,
+                CarbonCommonConstants.CARBON_LOAD_ALL_SEGMENT_INDEXES_TO_CACHE_DEFAULT));
+        if (!isLoadAllIndex && null != segmentMetaDataInfo && null != filter && !filter.isEmpty()
+            && null != filter.getExpression() && null == FilterUtil
+            .getImplicitFilterExpression(filter.getExpression())) {
+          getTableBlockIndexUniqueIdentifierUsingSegmentMinMax(segment, segmentMetaDataInfo, filter,
+              identifiers, tableBlockIndexUniqueIdentifierWrappers);
+        } else {
+          for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) {
+            tableBlockIndexUniqueIdentifierWrappers.add(
+                new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
+                    this.getCarbonTable()));
+          }
+        }
+      }
     }
     List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers =
         cache.getAll(tableBlockIndexUniqueIdentifierWrappers);
@@ -186,6 +217,115 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
     }
   }
 
+  /**
+   * Using blockLevel minmax values, identify if segment has to be added for further pruning and to
+   * load segment index info to cache
+   * @param segment to be identified if needed for loading block datamaps
+   * @param segmentMetaDataInfo list of block level min max values
+   * @param filter filter expression
+   * @param identifiers tableBlockIndexUniqueIdentifiers
+   * @param tableBlockIndexUniqueIdentifierWrappers to add tableBlockIndexUniqueIdentifiers
+   */
+  private void getTableBlockIndexUniqueIdentifierUsingSegmentMinMax(Segment segment,
+      SegmentMetaDataInfo segmentMetaDataInfo, DataMapFilter filter,
+      Set<TableBlockIndexUniqueIdentifier> identifiers,
+      List<TableBlockIndexUniqueIdentifierWrapper> tableBlockIndexUniqueIdentifierWrappers) {
+    boolean isScanRequired = false;
+    Map<String, SegmentColumnMetaDataInfo> segmentColumnMetaDataInfoMap =
+        segmentMetaDataInfo.getSegmentColumnMetaDataInfoMap();
+    int length = segmentColumnMetaDataInfoMap.size();
+    // Add columnSchemas based on the columns present in segment
+    List<ColumnSchema> columnSchemas = new ArrayList<>();
+    byte[][] min = new byte[length][];
+    byte[][] max = new byte[length][];
+    boolean[] minMaxFlag = new boolean[length];
+    int i = 0;
+
+    // get current columnSchema list for the table
+    Map<String, ColumnSchema> tableColumnSchemas =
+        this.getCarbonTable().getTableInfo().getFactTable().getListOfColumns().stream()
+            .collect(Collectors.toMap(ColumnSchema::getColumnUniqueId, ColumnSchema::clone));
+
+    // fill min,max and columnSchema values
+    for (Map.Entry<String, SegmentColumnMetaDataInfo> columnMetaData :
+        segmentColumnMetaDataInfoMap.entrySet()) {
+      ColumnSchema columnSchema = tableColumnSchemas.get(columnMetaData.getKey());
+      if (null != columnSchema) {
+        // get segment sort column and column drift info
+        boolean isSortColumnInSegment = columnMetaData.getValue().isSortColumn();
+        boolean isColumnDriftInSegment = columnMetaData.getValue().isColumnDrift();
+        if (null != columnSchema.getColumnProperties()) {
+          // get current sort column and column drift info from current columnSchema
+          String isSortColumn =
+              columnSchema.getColumnProperties().get(CarbonCommonConstants.SORT_COLUMNS);
+          String isColumnDrift =
+              columnSchema.getColumnProperties().get(CarbonCommonConstants.COLUMN_DRIFT);
+          if (null != isSortColumn) {
+            if (isSortColumn.equalsIgnoreCase("true") && !isSortColumnInSegment) {
+              // Unset current column schema column properties
+              modifyColumnSchemaForSortColumn(columnSchema, isColumnDriftInSegment, isColumnDrift,
+                  false);
+            } else if (isSortColumn.equalsIgnoreCase("false") && isSortColumnInSegment) {
+              // set sort column to true in current column schema column properties
+              modifyColumnSchemaForSortColumn(columnSchema, isColumnDriftInSegment, isColumnDrift,
+                  true);
+            }
+          } else {
+            modifyColumnSchemaForSortColumn(columnSchema, isColumnDriftInSegment, isColumnDrift,
+                false);
+          }
+        }
+        columnSchemas.add(columnSchema);
+        min[i] = columnMetaData.getValue().getColumnMinValue();
+        max[i] = columnMetaData.getValue().getColumnMaxValue();
+        minMaxFlag[i] = min[i].length != 0 && max[i].length != 0;
+        i++;
+      }
+    }
+    // get segmentProperties using created columnSchemas list
+    SegmentProperties segmentProperties = SegmentPropertiesAndSchemaHolder.getInstance()
+        .addSegmentProperties(this.getCarbonTable(), columnSchemas, segment.getSegmentNo())
+        .getSegmentProperties();
+
+    FilterResolverIntf resolver =
+        new DataMapFilter(segmentProperties, this.getCarbonTable(), filter.getExpression())
+            .getResolver();
+    // prepare filter executer using datmapFilter resolver
+    FilterExecuter filterExecuter =
+        FilterUtil.getFilterExecuterTree(resolver, segmentProperties, null, null, false);
+    // check if block has to be pruned based on segment minmax
+    BitSet scanRequired = filterExecuter.isScanRequired(max, min, minMaxFlag);
+    if (!scanRequired.isEmpty()) {
+      isScanRequired = true;
+    }
+    if (isScanRequired) {
+      for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) {
+        tableBlockIndexUniqueIdentifierWrappers.add(
+            new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
+                this.getCarbonTable()));
+      }
+    }
+  }
+
+  private void modifyColumnSchemaForSortColumn(ColumnSchema columnSchema, boolean columnDrift,
+      String isColumnDrift, boolean isSortColumnInSegment) {
+    if (!isSortColumnInSegment) {
+      if (null != isColumnDrift && isColumnDrift.equalsIgnoreCase("true") && !columnDrift) {
+        columnSchema.setDimensionColumn(false);
+      }
+      columnSchema.setSortColumn(false);
+      columnSchema.getColumnProperties().clear();
+    } else {
+      // modify column schema, if current columnSchema is changed
+      columnSchema.setSortColumn(true);
+      if (!columnSchema.isDimensionColumn()) {
+        columnSchema.setDimensionColumn(true);
+        columnSchema.getColumnProperties().put(CarbonCommonConstants.COLUMN_DRIFT, "true");
+      }
+      columnSchema.getColumnProperties().put(CarbonCommonConstants.SORT_COLUMNS, "true");
+    }
+  }
+
   @Override
   public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException {
     return getDataMaps(segment, null);
@@ -211,13 +351,19 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
 
   public Set<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment)
       throws IOException {
-    Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
-        segmentMap.get(segment.getSegmentNo());
-    if (tableBlockIndexUniqueIdentifiers == null) {
+    SegmentBlockIndexInfo segmentBlockIndexInfo = segmentMap.get(segment.getSegmentNo());
+    Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = null;
+    if (null != segmentBlockIndexInfo) {
+      segment.setSegmentMetaDataInfo(
+          segmentMap.get(segment.getSegmentNo()).getSegmentMetaDataInfo());
+      return segmentBlockIndexInfo.getTableBlockIndexUniqueIdentifiers();
+    } else {
       tableBlockIndexUniqueIdentifiers =
           BlockletDataMapUtil.getTableBlockUniqueIdentifiers(segment);
       if (tableBlockIndexUniqueIdentifiers.size() > 0) {
-        segmentMap.put(segment.getSegmentNo(), tableBlockIndexUniqueIdentifiers);
+        segmentMap.put(segment.getSegmentNo(),
+            new SegmentBlockIndexInfo(tableBlockIndexUniqueIdentifiers,
+                segment.getSegmentMetaDataInfo()));
       }
     }
     return tableBlockIndexUniqueIdentifiers;
@@ -319,7 +465,11 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
 
   @Override
   public void clear(String segment) {
-    Set<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segment);
+    SegmentBlockIndexInfo segmentBlockIndexInfo = segmentMap.remove(segment);
+    Set<TableBlockIndexUniqueIdentifier> blockIndexes = null;
+    if (null != segmentBlockIndexInfo) {
+      blockIndexes = segmentBlockIndexInfo.getTableBlockIndexUniqueIdentifiers();
+    }
     if (blockIndexes != null) {
       for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
         TableBlockIndexUniqueIdentifierWrapper blockIndexWrapper =
@@ -351,8 +501,9 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
   public String getCacheSize() {
     long sum = 0L;
     int numOfIndexFiles = 0;
-    for (Map.Entry<String, Set<TableBlockIndexUniqueIdentifier>> entry : segmentMap.entrySet()) {
-      for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : entry.getValue()) {
+    for (Map.Entry<String, SegmentBlockIndexInfo> entry : segmentMap.entrySet()) {
+      for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : entry.getValue()
+          .getTableBlockIndexUniqueIdentifiers()) {
         BlockletDataMapIndexWrapper blockletDataMapIndexWrapper = cache.getIfPresent(
             new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
                 getCarbonTable()));
@@ -392,8 +543,13 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
   private List<TableBlockIndexUniqueIdentifierWrapper> getTableBlockIndexUniqueIdentifier(
       DataMapDistributable distributable) throws IOException {
     List<TableBlockIndexUniqueIdentifierWrapper> identifiersWrapper = new ArrayList<>();
-    Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
+    SegmentBlockIndexInfo segmentBlockIndexInfo =
         segmentMap.get(distributable.getSegment().getSegmentNo());
+    Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = null;
+    if (null != segmentBlockIndexInfo) {
+      tableBlockIndexUniqueIdentifiers =
+          segmentBlockIndexInfo.getTableBlockIndexUniqueIdentifiers();
+    }
     if (tableBlockIndexUniqueIdentifiers == null) {
       tableBlockIndexUniqueIdentifiers = new HashSet<>();
       Set<String> indexFiles = distributable.getSegment().getCommittedIndexFile().keySet();
@@ -417,7 +573,9 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
                 this.getCarbonTable()));
         tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier);
       }
-      segmentMap.put(distributable.getSegment().getSegmentNo(), tableBlockIndexUniqueIdentifiers);
+      segmentMap.put(distributable.getSegment().getSegmentNo(),
+          new SegmentBlockIndexInfo(tableBlockIndexUniqueIdentifiers,
+              distributable.getSegment().getSegmentMetaDataInfo()));
     } else {
       for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
           tableBlockIndexUniqueIdentifiers) {
@@ -539,7 +697,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
   private Set<TableBlockIndexUniqueIdentifier> getTableSegmentUniqueIdentifiers(Segment segment)
       throws IOException {
     Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
-        segmentMap.get(segment.getSegmentNo());
+        segmentMap.get(segment.getSegmentNo()).getTableBlockIndexUniqueIdentifiers();
     if (tableBlockIndexUniqueIdentifiers == null) {
       tableBlockIndexUniqueIdentifiers = BlockletDataMapUtil.getSegmentUniqueIdentifiers(segment);
     }
@@ -550,7 +708,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
       Map<String, Set<TableBlockIndexUniqueIdentifier>> indexUniqueIdentifiers) {
     for (Map.Entry<String, Set<TableBlockIndexUniqueIdentifier>> identifier : indexUniqueIdentifiers
         .entrySet()) {
-      segmentMap.put(identifier.getKey(), identifier.getValue());
+      segmentMap.put(identifier.getKey(), new SegmentBlockIndexInfo(identifier.getValue(), null));
     }
   }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index c42fa7f..af5724b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -54,12 +54,16 @@ import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.segmentmeta.SegmentColumnMetaDataInfo;
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo;
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfoStats;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.ObjectSerializationUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import com.google.gson.Gson;
@@ -180,11 +184,17 @@ public class SegmentFileStore {
    * @param carbonTable CarbonTable
    * @param segmentId segment id
    * @param UUID      a UUID string used to construct the segment file name
+   * @param segmentMetaDataInfo list of block level min and max values for segment
    * @return segment file name
    */
+  public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String UUID,
+      SegmentMetaDataInfo segmentMetaDataInfo) throws IOException {
+    return writeSegmentFile(carbonTable, segmentId, UUID, null, segmentMetaDataInfo);
+  }
+
   public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String UUID)
       throws IOException {
-    return writeSegmentFile(carbonTable, segmentId, UUID, null);
+    return writeSegmentFile(carbonTable, segmentId, UUID, null, null);
   }
 
   /**
@@ -193,12 +203,18 @@ public class SegmentFileStore {
    * @param carbonTable CarbonTable
    * @param segmentId segment id
    * @param UUID      a UUID string used to construct the segment file name
+   * @param segPath segment path
+   * @param segmentMetaDataInfo segment metadata info
    * @return segment file name
    */
   public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String UUID,
-      String segPath)
-      throws IOException {
-    return writeSegmentFile(carbonTable, segmentId, UUID, null, segPath);
+      String segPath, SegmentMetaDataInfo segmentMetaDataInfo) throws IOException {
+    return writeSegmentFile(carbonTable, segmentId, UUID, null, segPath, segmentMetaDataInfo);
+  }
+
+  public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String UUID,
+      String segPath) throws IOException {
+    return writeSegmentFile(carbonTable, segmentId, UUID, null, segPath, null);
   }
 
   /**
@@ -315,7 +331,8 @@ public class SegmentFileStore {
    * @throws IOException
    */
   public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String UUID,
-      final String currentLoadTimeStamp, String absSegPath) throws IOException {
+      final String currentLoadTimeStamp, String absSegPath, SegmentMetaDataInfo segmentMetaDataInfo)
+      throws IOException {
     String tablePath = carbonTable.getTablePath();
     boolean supportFlatFolder = carbonTable.isSupportFlatFolder();
     String segmentPath = absSegPath;
@@ -356,6 +373,10 @@ public class SegmentFileStore {
         }
       }
       segmentFile.addPath(segmentRelativePath, folderDetails);
+      // set segmentMinMax to segmentFile
+      if (null != segmentMetaDataInfo) {
+        segmentFile.setSegmentMetaDataInfo(segmentMetaDataInfo);
+      }
       String segmentFileFolder = CarbonTablePath.getSegmentFilesLocation(tablePath);
       CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFileFolder);
       if (!carbonFile.exists()) {
@@ -582,7 +603,7 @@ public class SegmentFileStore {
    * @param partitionSpecs
    */
   public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath,
-      List<PartitionSpec> partitionSpecs) {
+      List<PartitionSpec> partitionSpecs) throws IOException {
     SegmentFile segmentFile = null;
     for (PartitionSpec spec : partitionSpecs) {
       String location = spec.getLocation().toString();
@@ -1223,11 +1244,16 @@ public class SegmentFileStore {
      */
     private Map<String, String> options;
 
+    /**
+     * Segment metadata information such as column_id, min-max, alter properties
+     */
+    private String segmentMetaDataInfo;
+
     SegmentFile() {
       locationMap = new HashMap<>();
     }
 
-    public SegmentFile merge(SegmentFile segmentFile) {
+    public SegmentFile merge(SegmentFile segmentFile) throws IOException {
       if (this == segmentFile) {
         return this;
       }
@@ -1240,6 +1266,37 @@ public class SegmentFileStore {
             locationMap.put(entry.getKey(), entry.getValue());
           }
         }
+        if (segmentMetaDataInfo != null) {
+          SegmentMetaDataInfo currentSegmentMetaDataInfo =
+              (SegmentMetaDataInfo) ObjectSerializationUtil
+                  .convertStringToObject(segmentMetaDataInfo);
+          if (null != segmentFile.getSegmentMetaDataInfo()) {
+            // get updated segmentColumnMetaDataInfo based on comparing block min-max values
+            Map<String, SegmentColumnMetaDataInfo> previousBlockColumnMetaDataInfo =
+                segmentFile.getSegmentMetaDataInfo().getSegmentColumnMetaDataInfoMap();
+            for (Map.Entry<String, SegmentColumnMetaDataInfo> entry :
+                previousBlockColumnMetaDataInfo.entrySet()) {
+              if (currentSegmentMetaDataInfo.getSegmentColumnMetaDataInfoMap()
+                  .containsKey(entry.getKey())) {
+                SegmentColumnMetaDataInfo currentBlockMinMaxInfo =
+                    currentSegmentMetaDataInfo.getSegmentColumnMetaDataInfoMap()
+                        .get(entry.getKey());
+                byte[] blockMaxValue = SegmentMetaDataInfoStats.getInstance()
+                    .compareAndUpdateMinMax(currentBlockMinMaxInfo.getColumnMaxValue(),
+                        entry.getValue().getColumnMaxValue(), false);
+                byte[] blockMinValue = SegmentMetaDataInfoStats.getInstance()
+                    .compareAndUpdateMinMax(currentBlockMinMaxInfo.getColumnMinValue(),
+                        entry.getValue().getColumnMinValue(), true);
+                currentSegmentMetaDataInfo.getSegmentColumnMetaDataInfoMap().get(entry.getKey())
+                    .setColumnMaxValue(blockMaxValue);
+                currentSegmentMetaDataInfo.getSegmentColumnMetaDataInfoMap().get(entry.getKey())
+                    .setColumnMinValue(blockMinValue);
+              }
+            }
+          }
+          segmentMetaDataInfo =
+              ObjectSerializationUtil.convertObjectToString(currentSegmentMetaDataInfo);
+        }
       }
       if (locationMap == null) {
         locationMap = segmentFile.locationMap;
@@ -1265,6 +1322,22 @@ public class SegmentFileStore {
     public void setOptions(Map<String, String> options) {
       this.options = options;
     }
+
+    public SegmentMetaDataInfo getSegmentMetaDataInfo() {
+      SegmentMetaDataInfo newSegmentMetaDataInfo = null;
+      try {
+        newSegmentMetaDataInfo = (SegmentMetaDataInfo) ObjectSerializationUtil
+            .convertStringToObject(segmentMetaDataInfo);
+      } catch (IOException e) {
+        LOGGER.error("Error while getting segment metadata info");
+      }
+      return newSegmentMetaDataInfo;
+    }
+
+    public void setSegmentMetaDataInfo(SegmentMetaDataInfo segmentMetaDataInfo) throws IOException {
+      this.segmentMetaDataInfo = ObjectSerializationUtil.convertObjectToString(
+          segmentMetaDataInfo);
+    }
   }
 
   public static SegmentFile createSegmentFile(String partitionPath, FolderDetails folderDetails) {
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
index 933898c..b21add4 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
@@ -17,8 +17,12 @@
 
 package org.apache.carbondata.core.metadata.schema.table.column;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -37,7 +41,7 @@ import org.apache.carbondata.core.preagg.TimeSeriesUDF;
 /**
  * Store the information about the column meta data present the table
  */
-public class ColumnSchema implements Serializable, Writable {
+public class ColumnSchema implements Serializable, Writable, Cloneable {
 
   /**
    * serialization version
@@ -600,4 +604,17 @@ public class ColumnSchema implements Serializable, Writable {
   public void setIndexColumn(boolean indexColumn) {
     this.indexColumn = indexColumn;
   }
+
+  public ColumnSchema clone() {
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos)) {
+      this.write(dos);
+      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bos.toByteArray()));
+      ColumnSchema columnSchema = (ColumnSchema) super.clone();
+      columnSchema.readFields(dis);
+      return columnSchema;
+    } catch (IOException | CloneNotSupportedException e) {
+      throw new RuntimeException("Error occur while cloning ColumnSchema", e);
+    }
+  }
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
index 7e59156..5d4ed4e 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
@@ -86,6 +86,7 @@ public class TableStatusReadCommittedScope implements ReadCommittedScope {
       SegmentFileStore fileStore =
           new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
       indexFiles = fileStore.getIndexOrMergeFiles();
+      segment.setSegmentMetaDataInfo(fileStore.getSegmentFile().getSegmentMetaDataInfo());
     }
     return indexFiles;
   }
diff --git a/core/src/main/java/org/apache/carbondata/core/segmentmeta/BlockColumnMetaDataInfo.java b/core/src/main/java/org/apache/carbondata/core/segmentmeta/BlockColumnMetaDataInfo.java
new file mode 100644
index 0000000..c138243
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/segmentmeta/BlockColumnMetaDataInfo.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.segmentmeta;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.carbondata.format.ColumnSchema;
+
+/**
+ * Hold's columnSchemas and  updated min-max values for all columns in a segment
+ */
+public class BlockColumnMetaDataInfo implements Serializable {
+
+  private List<org.apache.carbondata.format.ColumnSchema> columnSchemas;
+
+  private byte[][] min;
+
+  private byte[][] max;
+
+  public BlockColumnMetaDataInfo(List<org.apache.carbondata.format.ColumnSchema> columnSchemas,
+      byte[][] min, byte[][] max) {
+    this.columnSchemas = columnSchemas;
+    this.min = min;
+    this.max = max;
+  }
+
+  public byte[][] getMin() {
+    return min;
+  }
+
+  public void setMinMax(byte[][] min, byte[][] max) {
+    this.min = min;
+    this.max = max;
+  }
+
+  public byte[][] getMax() {
+    return max;
+  }
+
+  public List<ColumnSchema> getColumnSchemas() {
+    return columnSchemas;
+  }
+
+  public void setColumnSchemas(List<ColumnSchema> columnSchemas) {
+    this.columnSchemas = columnSchemas;
+  }
+}
+
diff --git a/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentColumnMetaDataInfo.java b/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentColumnMetaDataInfo.java
new file mode 100644
index 0000000..3536411
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentColumnMetaDataInfo.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.segmentmeta;
+
+import java.io.Serializable;
+
+/**
+ * Represent segment level column metadata information
+ */
+public class SegmentColumnMetaDataInfo implements Serializable {
+
+  /**
+   * true if column is a sort column
+   */
+  private boolean isSortColumn;
+
+  /**
+   * segment level  min value for a column
+   */
+  private byte[] columnMinValue;
+
+  /**
+   * segment level max value for a column
+   */
+  private byte[] columnMaxValue;
+
+  /**
+   * true if column measure if changed to dimension
+   */
+  private boolean isColumnDrift;
+
+  public SegmentColumnMetaDataInfo(boolean isSortColumn, byte[] columnMinValue,
+      byte[] columnMaxValue, boolean isColumnDrift) {
+    this.isSortColumn = isSortColumn;
+    this.columnMinValue = columnMinValue;
+    this.columnMaxValue = columnMaxValue;
+    this.isColumnDrift = isColumnDrift;
+  }
+
+  public boolean isSortColumn() {
+    return isSortColumn;
+  }
+
+  public byte[] getColumnMinValue() {
+    return columnMinValue;
+  }
+
+  public byte[] getColumnMaxValue() {
+    return columnMaxValue;
+  }
+
+  public boolean isColumnDrift() {
+    return isColumnDrift;
+  }
+
+  public void setColumnMinValue(byte[] columnMinValue) {
+    this.columnMinValue = columnMinValue;
+  }
+
+  public void setColumnMaxValue(byte[] columnMaxValue) {
+    this.columnMaxValue = columnMaxValue;
+  }
+}
+
diff --git a/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentMetaDataInfo.java b/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentMetaDataInfo.java
new file mode 100644
index 0000000..721522f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentMetaDataInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.segmentmeta;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Represent segment metadata information
+ */
+public class SegmentMetaDataInfo implements Serializable {
+
+  private Map<String, SegmentColumnMetaDataInfo> segmentColumnMetaDataInfoMap;
+
+  SegmentMetaDataInfo(Map<String, SegmentColumnMetaDataInfo> segmentColumnMetaDataInfoMap) {
+    this.segmentColumnMetaDataInfoMap = segmentColumnMetaDataInfoMap;
+  }
+
+  public Map<String, SegmentColumnMetaDataInfo> getSegmentColumnMetaDataInfoMap() {
+    return segmentColumnMetaDataInfoMap;
+  }
+}
diff --git a/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentMetaDataInfoStats.java b/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentMetaDataInfoStats.java
new file mode 100644
index 0000000..704df72
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentMetaDataInfoStats.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.segmentmeta;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.ByteUtil;
+
+/**
+ * Holds segment level meta data information such as min,max, sortColumn info for the
+ * corresponding table
+ */
+public class SegmentMetaDataInfoStats {
+
+  private SegmentMetaDataInfoStats() {
+    tableSegmentMetaDataInfoMap = new LinkedHashMap<>();
+  }
+
+  public static synchronized SegmentMetaDataInfoStats getInstance() {
+    if (null == segmentMetaDataInfoStats) {
+      segmentMetaDataInfoStats = new SegmentMetaDataInfoStats();
+      return segmentMetaDataInfoStats;
+    } else {
+      return segmentMetaDataInfoStats;
+    }
+  }
+
+  private Map<String, Map<String, BlockColumnMetaDataInfo>> tableSegmentMetaDataInfoMap;
+
+  private static SegmentMetaDataInfoStats segmentMetaDataInfoStats;
+
+  /**
+   * Prepare of map with key as column-id and value as SegmentColumnMetaDataInfo using the
+   * tableSegmentMetaDataInfoMap
+   *
+   * @param tableName get corresponding tableName from map
+   * @param segmentId get corresponding segment Id from map
+   * @return segmentMetaDataInfo for the corresponding segment
+   */
+  public synchronized SegmentMetaDataInfo getTableSegmentMetaDataInfo(String tableName,
+      String segmentId) {
+    Map<String, SegmentColumnMetaDataInfo> segmentColumnMetaDataInfoMap = new LinkedHashMap<>();
+    Map<String, BlockColumnMetaDataInfo> segmentMetaDataInfoMap =
+        this.tableSegmentMetaDataInfoMap.get(tableName);
+    if (null != segmentMetaDataInfoMap && !segmentMetaDataInfoMap.isEmpty()
+        && null != segmentMetaDataInfoMap.get(segmentId)) {
+      BlockColumnMetaDataInfo blockColumnMetaDataInfo = segmentMetaDataInfoMap.get(segmentId);
+      for (int i = 0; i < blockColumnMetaDataInfo.getColumnSchemas().size(); i++) {
+        org.apache.carbondata.format.ColumnSchema columnSchema =
+            blockColumnMetaDataInfo.getColumnSchemas().get(i);
+        boolean isSortColumn = false;
+        boolean isColumnDrift = false;
+        if (null != columnSchema.columnProperties && !columnSchema.columnProperties.isEmpty()) {
+          if (null != columnSchema.columnProperties.get(CarbonCommonConstants.SORT_COLUMNS)) {
+            isSortColumn = true;
+          }
+          if (null != columnSchema.columnProperties.get(CarbonCommonConstants.COLUMN_DRIFT)) {
+            isColumnDrift = true;
+          }
+        }
+        segmentColumnMetaDataInfoMap.put(columnSchema.column_id,
+            new SegmentColumnMetaDataInfo(isSortColumn, blockColumnMetaDataInfo.getMin()[i],
+                blockColumnMetaDataInfo.getMax()[i], isColumnDrift));
+      }
+    }
+    return new SegmentMetaDataInfo(segmentColumnMetaDataInfoMap);
+  }
+
+  public synchronized void setBlockMetaDataInfo(String tableName, String segmentId,
+      BlockColumnMetaDataInfo currentBlockColumnMetaInfo) {
+    // check if tableName is present in tableSegmentMetaDataInfoMap
+    if (!this.tableSegmentMetaDataInfoMap.isEmpty() && null != this.tableSegmentMetaDataInfoMap
+        .get(tableName) && null != this.tableSegmentMetaDataInfoMap.get(tableName).get(segmentId)) {
+      // get previous blockColumn metadata information
+      BlockColumnMetaDataInfo previousBlockColumnMetaInfo =
+          this.tableSegmentMetaDataInfoMap.get(tableName).get(segmentId);
+      // compare and get updated min and max values
+      byte[][] updatedMin = compareAndUpdateMinMax(previousBlockColumnMetaInfo.getMin(),
+          currentBlockColumnMetaInfo.getMin(), true);
+      byte[][] updatedMax = compareAndUpdateMinMax(previousBlockColumnMetaInfo.getMax(),
+          currentBlockColumnMetaInfo.getMax(), false);
+      // update the segment
+      this.tableSegmentMetaDataInfoMap.get(tableName).get(segmentId)
+          .setMinMax(updatedMin, updatedMax);
+    } else {
+      Map<String, BlockColumnMetaDataInfo> segmentMinMaxMap = new HashMap<>();
+      if (null != this.tableSegmentMetaDataInfoMap.get(tableName)
+          && !this.tableSegmentMetaDataInfoMap.get(tableName).isEmpty()) {
+        segmentMinMaxMap = this.tableSegmentMetaDataInfoMap.get(tableName);
+      }
+      segmentMinMaxMap.put(segmentId, currentBlockColumnMetaInfo);
+      this.tableSegmentMetaDataInfoMap.put(tableName, segmentMinMaxMap);
+    }
+  }
+
+  /**
+   * Clear the corresponding segmentId and tableName from the segmentMinMaxMap
+   */
+  public synchronized void clear(String tableName, String segmentId) {
+    if (null != tableSegmentMetaDataInfoMap.get(tableName)) {
+      if (null != tableSegmentMetaDataInfoMap.get(tableName).get(segmentId)) {
+        tableSegmentMetaDataInfoMap.get(tableName).remove(segmentId);
+      }
+      if (tableSegmentMetaDataInfoMap.get(tableName).isEmpty()) {
+        tableSegmentMetaDataInfoMap.remove(tableName);
+      }
+    }
+  }
+
+  /**
+   * This method will do min/max comparison of values and update if required
+   */
+  public synchronized byte[] compareAndUpdateMinMax(byte[] minMaxValueCompare1,
+      byte[] minMaxValueCompare2, boolean isMinValueComparison) {
+    // Compare and update min max values
+    byte[] updatedMinMaxValues = new byte[minMaxValueCompare1.length];
+    System.arraycopy(minMaxValueCompare1, 0, updatedMinMaxValues, 0, minMaxValueCompare1.length);
+    int compare =
+        ByteUtil.UnsafeComparer.INSTANCE.compareTo(minMaxValueCompare2, minMaxValueCompare1);
+    if (isMinValueComparison) {
+      if (compare < 0) {
+        updatedMinMaxValues = minMaxValueCompare2;
+      }
+    } else if (compare > 0) {
+      updatedMinMaxValues = minMaxValueCompare2;
+    }
+    return updatedMinMaxValues;
+  }
+
+  private synchronized byte[][] compareAndUpdateMinMax(byte[][] minMaxValueCompare1,
+      byte[][] minMaxValueCompare2, boolean isMinValueComparison) {
+    // Compare and update min max values
+    byte[][] updatedMinMaxValues = new byte[minMaxValueCompare1.length][];
+    System.arraycopy(minMaxValueCompare1, 0, updatedMinMaxValues, 0, minMaxValueCompare1.length);
+    for (int i = 0; i < minMaxValueCompare1.length; i++) {
+      int compare = ByteUtil.UnsafeComparer.INSTANCE
+          .compareTo(minMaxValueCompare2[i], minMaxValueCompare1[i]);
+      if (isMinValueComparison) {
+        if (compare < 0) {
+          updatedMinMaxValues[i] = minMaxValueCompare2[i];
+        }
+      } else if (compare > 0) {
+        updatedMinMaxValues[i] = minMaxValueCompare2[i];
+      }
+    }
+    return updatedMinMaxValues;
+  }
+
+}
\ No newline at end of file
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 1f521bdc..a570d4c 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -146,6 +146,7 @@ This section provides the details of all the configurations required for the Car
 | carbon.query.prefetch.enable | true | By default this property is true, so prefetch is used in query to read next blocklet asynchronously in other thread while processing current blocklet in main thread. This can help to reduce CPU idle time. Setting this property false will disable this prefetch feature in query. |
 | carbon.query.stage.input.enable | false | Stage input files are data files written by external applications (such as Flink), but have not been loaded into carbon table. Enabling this configuration makes query to include these files, thus makes query on latest data. However, since these files are not indexed, query maybe slower as full scan is required for these files. |
 | carbon.driver.pruning.multi.thread.enable.files.count | 100000 | To prune in multi-thread when total number of segment files for a query increases beyond the configured value. |
+| carbon.load.all.segment.indexes.to.cache | true | Setting this configuration to false, will prune and load only matched segment indexes to cache using segment metadata information such as columnid and it's minmax values, which decreases the usage of driver memory.  |
 
 ## Data Mutation Configuration
 | Parameter | Default Value | Description |
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index b332849..8fb3002 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.command.ExecutionErrors
 import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.unsafe.types.UTF8String
-import org.apache.spark.util.LongAccumulator
+import org.apache.spark.util.{CollectionAccumulator, LongAccumulator}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.converter.SparkDataTypeConverterImpl
@@ -44,6 +44,7 @@ import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, StructField, StructType}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
 import org.apache.carbondata.core.util._
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer
@@ -56,7 +57,7 @@ import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters}
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, TableOptionConstant}
-import org.apache.carbondata.spark.rdd.{CarbonScanRDD, StringArrayRow}
+import org.apache.carbondata.spark.rdd.{CarbonScanRDD, InsertTaskCompletionListener, StringArrayRow}
 import org.apache.carbondata.spark.util.{CommonUtil, Util}
 import org.apache.carbondata.store.CarbonRowReadSupport
 
@@ -70,7 +71,9 @@ object DataLoadProcessBuilderOnSpark {
       sparkSession: SparkSession,
       dataFrame: Option[DataFrame],
       model: CarbonLoadModel,
-      hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
+      hadoopConf: Configuration,
+      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]])
+  : Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
     var isLoadFromCSV = false
     val originRDD = if (dataFrame.isDefined) {
       dataFrame.get.rdd
@@ -160,10 +163,10 @@ object DataLoadProcessBuilderOnSpark {
 
     // 4. Write
     sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => {
-      setTaskListener()
-      val model = modelBroadcast.value.getCopyWithTaskNo(context.partitionId.toString)
+      setTaskListener(model.getTableName, model.getSegmentId, segmentMetaDataAccumulator)
+      val loadModel = modelBroadcast.value.getCopyWithTaskNo(context.partitionId.toString)
       DataLoadProcessorStepOnSpark.writeFunc(
-        rows, context.partitionId, model, writeStepRowCounter, conf.value.value)
+        rows, context.partitionId, loadModel, writeStepRowCounter, conf.value.value)
     })
 
     // clean cache only if persisted and keeping unpersist non-blocking as non-blocking call will
@@ -186,7 +189,9 @@ object DataLoadProcessBuilderOnSpark {
       sparkSession: SparkSession,
       scanResultRDD : RDD[InternalRow],
       model: CarbonLoadModel,
-      hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
+      hadoopConf: Configuration,
+      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]])
+  : Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
     val originRDD = scanResultRDD
 
     val sc = sparkSession.sparkContext
@@ -245,9 +250,9 @@ object DataLoadProcessBuilderOnSpark {
 
     // 3. Write
     sc.runJob(newRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => {
-      setTaskListener()
-      val model = modelBroadcast.value.getCopyWithTaskNo(context.partitionId.toString)
-      DataLoadProcessorStepOnSpark.writeFunc(rows, context.partitionId, model,
+      setTaskListener(model.getTableName, model.getSegmentId, segmentMetaDataAccumulator)
+      val loadModel = modelBroadcast.value.getCopyWithTaskNo(context.partitionId.toString)
+      DataLoadProcessorStepOnSpark.writeFunc(rows, context.partitionId, loadModel,
         writeStepRowCounter, conf.value.value)
     })
     // clean cache only if persisted and keeping unpersist non-blocking as non-blocking call will
@@ -298,7 +303,9 @@ object DataLoadProcessBuilderOnSpark {
   def loadDataUsingRangeSort(
       sparkSession: SparkSession,
       model: CarbonLoadModel,
-      hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
+      hadoopConf: Configuration,
+      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]])
+  : Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
     // initialize and prepare row counter
     val sc = sparkSession.sparkContext
     val modelBroadcast = sc.broadcast(model)
@@ -349,7 +356,7 @@ object DataLoadProcessBuilderOnSpark {
 
     // 4. Sort and Write data
     sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => {
-      setTaskListener()
+      setTaskListener(model.getTableName, model.getSegmentId, segmentMetaDataAccumulator)
       DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, context.partitionId, modelBroadcast,
         writeStepRowCounter, conf.value.value)
     })
@@ -479,9 +486,11 @@ object DataLoadProcessBuilderOnSpark {
     }
   }
 
-  def setTaskListener(): Unit = {
-    TaskContext.get.addTaskCompletionListener { _ =>
-      CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)
+  def setTaskListener(tableName: String,
+      segmentId: String,
+      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]]): Unit = {
+    TaskContext.get.addTaskCompletionListener {
+      new InsertTaskCompletionListener(null, null, segmentMetaDataAccumulator, tableName, segmentId)
     }
     TaskMetricsMap.initializeThreadLocal()
     val carbonTaskInfo = new CarbonTaskInfo
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index a7603e2..4777ab8 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -41,6 +41,7 @@ import org.apache.spark.sql.execution.command.management.CommonLoadUtils
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.util.{CarbonException, SparkSQLUtil}
+import org.apache.spark.util.CollectionAccumulator
 
 import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -56,6 +57,7 @@ import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsa
 import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion, SegmentFileStore}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.segmentmeta.{SegmentMetaDataInfo, SegmentMetaDataInfoStats}
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, ThreadLocalSessionInfo}
 import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -316,7 +318,10 @@ object CarbonDataRDDFactory {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     var status: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null
     var res: Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = null
-
+    // accumulator to collect segment metadata
+    val segmentMetaDataAccumulator = sqlContext
+      .sparkContext
+      .collectionAccumulator[Map[String, SegmentMetaDataInfo]]
     // create new segment folder  in carbon store
     if (updateModel.isEmpty && carbonLoadModel.isCarbonTransactionalTable) {
       CarbonLoaderUtil.checkAndCreateCarbonDataLocation(carbonLoadModel.getSegmentId, carbonTable)
@@ -339,7 +344,8 @@ object CarbonDataRDDFactory {
             carbonLoadModel,
             updateModel,
             carbonTable,
-            hadoopConf)
+            hadoopConf,
+            segmentMetaDataAccumulator)
           res.foreach { resultOfSeg =>
             resultOfSeg.foreach { resultOfBlock =>
               if (resultOfBlock._2._1.getSegmentStatus == SegmentStatus.LOAD_FAILURE) {
@@ -373,9 +379,14 @@ object CarbonDataRDDFactory {
                 .sparkSession,
                 convertedRdd,
                 carbonLoadModel,
-                hadoopConf)
+                hadoopConf,
+                segmentMetaDataAccumulator)
             } else {
-              loadDataFrame(sqlContext, None, Some(convertedRdd), carbonLoadModel)
+              loadDataFrame(sqlContext,
+                None,
+                Some(convertedRdd),
+                carbonLoadModel,
+                segmentMetaDataAccumulator)
             }
           } else {
             if (dataFrame.isEmpty && isSortTable &&
@@ -383,16 +394,24 @@ object CarbonDataRDDFactory {
                 (sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT) ||
                  sortScope.equals(SortScopeOptions.SortScope.LOCAL_SORT))) {
               DataLoadProcessBuilderOnSpark
-                .loadDataUsingRangeSort(sqlContext.sparkSession, carbonLoadModel, hadoopConf)
+                .loadDataUsingRangeSort(sqlContext.sparkSession,
+                  carbonLoadModel,
+                  hadoopConf,
+                  segmentMetaDataAccumulator)
             } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
               DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession,
                 dataFrame,
                 carbonLoadModel,
-                hadoopConf)
+                hadoopConf,
+                segmentMetaDataAccumulator)
             } else if (dataFrame.isDefined) {
-              loadDataFrame(sqlContext, dataFrame, None, carbonLoadModel)
+              loadDataFrame(sqlContext,
+                dataFrame,
+                None,
+                carbonLoadModel,
+                segmentMetaDataAccumulator)
             } else {
-              loadDataFile(sqlContext, carbonLoadModel, hadoopConf)
+              loadDataFile(sqlContext, carbonLoadModel, hadoopConf, segmentMetaDataAccumulator)
             }
           }
           val newStatusMap = scala.collection.mutable.Map.empty[String, SegmentStatus]
@@ -479,7 +498,17 @@ object CarbonDataRDDFactory {
             segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName))
           }
         }
-        val segmentFiles = updateSegmentFiles(carbonTable, segmentDetails, updateModel.get)
+        var segmentMetaDataInfoMap = scala.collection.mutable.Map.empty[String, SegmentMetaDataInfo]
+        if (!segmentMetaDataAccumulator.isZero) {
+          segmentMetaDataAccumulator.value.asScala.foreach(map => if (map.nonEmpty) {
+            segmentMetaDataInfoMap = segmentMetaDataInfoMap ++ map
+          })
+        }
+        val segmentFiles = updateSegmentFiles(carbonTable,
+          segmentDetails,
+          updateModel.get,
+          segmentMetaDataInfoMap.asJava)
+
         // this means that the update doesnt have any records to update so no need to do table
         // status file updation.
         if (resultSize == 0) {
@@ -550,9 +579,14 @@ object CarbonDataRDDFactory {
           loadStatus
         }
 
+      val segmentMetaDataInfo = CommonLoadUtils.getSegmentMetaDataInfoFromAccumulator(
+        carbonLoadModel.getSegmentId,
+        segmentMetaDataAccumulator)
       val segmentFileName =
         SegmentFileStore.writeSegmentFile(carbonTable, carbonLoadModel.getSegmentId,
-          String.valueOf(carbonLoadModel.getFactTimeStamp))
+          String.valueOf(carbonLoadModel.getFactTimeStamp), segmentMetaDataInfo)
+      // clear segmentMetaDataAccumulator
+      segmentMetaDataAccumulator.reset()
 
       SegmentFileStore.updateTableStatusFile(
         carbonTable,
@@ -667,7 +701,8 @@ object CarbonDataRDDFactory {
   private def updateSegmentFiles(
       carbonTable: CarbonTable,
       segmentDetails: util.HashSet[Segment],
-      updateModel: UpdateTableModel) = {
+      updateModel: UpdateTableModel,
+      segmentMetaDataInfoMap: util.Map[String, SegmentMetaDataInfo]) = {
     val metadataDetails =
       SegmentStatusManager.readTableStatusFile(
         CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath))
@@ -677,11 +712,13 @@ object CarbonDataRDDFactory {
       val segmentFile = load.getSegmentFile
       var segmentFiles: Seq[CarbonFile] = Seq.empty[CarbonFile]
 
+      val segmentMetaDataInfo = segmentMetaDataInfoMap.get(seg.getSegmentNo)
       val file = SegmentFileStore.writeSegmentFile(
         carbonTable,
         seg.getSegmentNo,
         String.valueOf(System.currentTimeMillis()),
-        load.getPath)
+        load.getPath,
+        segmentMetaDataInfo)
 
       if (segmentFile != null) {
         segmentFiles ++= FileFactory.getCarbonFile(
@@ -720,7 +757,9 @@ object CarbonDataRDDFactory {
       carbonLoadModel: CarbonLoadModel,
       updateModel: Option[UpdateTableModel],
       carbonTable: CarbonTable,
-      hadoopConf: Configuration): Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = {
+      hadoopConf: Configuration,
+      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]]
+  ): Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = {
     val segmentUpdateParallelism = CarbonProperties.getInstance().getParallelismForSegmentUpdate
 
     val updateRdd = dataFrame.get.rdd
@@ -773,7 +812,8 @@ object CarbonDataRDDFactory {
           updateModel,
           segId.getSegmentNo,
           newTaskNo,
-          partition).toList).toIterator
+          partition,
+          segmentMetaDataAccumulator).toList).toIterator
       }.collect()
     }
   }
@@ -786,7 +826,9 @@ object CarbonDataRDDFactory {
       updateModel: Option[UpdateTableModel],
       key: String,
       taskNo: Long,
-      iter: Iterator[Row]): Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] = {
+      iter: Iterator[Row],
+      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]]
+  ): Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] = {
     val rddResult = new updateResultImpl()
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val resultIter = new Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] {
@@ -811,7 +853,8 @@ object CarbonDataRDDFactory {
           index,
           iter,
           carbonLoadModel,
-          loadMetadataDetails)
+          loadMetadataDetails,
+          segmentMetaDataAccumulator)
       } catch {
         case e: NoRetryException =>
           loadMetadataDetails
@@ -994,7 +1037,8 @@ object CarbonDataRDDFactory {
       sqlContext: SQLContext,
       dataFrame: Option[DataFrame],
       scanResultRDD: Option[RDD[InternalRow]],
-      carbonLoadModel: CarbonLoadModel
+      carbonLoadModel: CarbonLoadModel,
+      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]]
   ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
     try {
       val rdd = if (dataFrame.isDefined) {
@@ -1024,7 +1068,8 @@ object CarbonDataRDDFactory {
         sqlContext.sparkSession,
         new DataLoadResultImpl(),
         carbonLoadModel,
-        newRdd
+        newRdd,
+        segmentMetaDataAccumulator
       ).collect()
     } catch {
       case ex: Exception =>
@@ -1039,7 +1084,8 @@ object CarbonDataRDDFactory {
   private def loadDataFile(
       sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
-      hadoopConf: Configuration
+      hadoopConf: Configuration,
+      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]]
   ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
     /*
      * when data load handle by node partition
@@ -1134,7 +1180,9 @@ object CarbonDataRDDFactory {
       sqlContext.sparkSession,
       new DataLoadResultImpl(),
       carbonLoadModel,
-      blocksGroupBy
+      blocksGroupBy,
+      segmentMetaDataAccumulator
     ).collect()
   }
+
 }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index a0baad0..5bd05a9 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -24,9 +24,11 @@ import org.apache.spark.Partition
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.command.CarbonMergerMapping
+import org.apache.spark.util.CollectionAccumulator
 
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
 import org.apache.carbondata.hadoop.api.CarbonInputFormat
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
@@ -41,11 +43,13 @@ class CarbonIUDMergerRDD[K, V](
     @transient private val ss: SparkSession,
     result: MergeResult[K, V],
     carbonLoadModel: CarbonLoadModel,
-    carbonMergerMapping: CarbonMergerMapping)
+    carbonMergerMapping: CarbonMergerMapping,
+    segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]])
   extends CarbonMergerRDD[K, V](ss,
     result,
     carbonLoadModel,
-    carbonMergerMapping) {
+    carbonMergerMapping,
+    segmentMetaDataAccumulator) {
 
   override def internalGetPartitions: Array[Partition] = {
     val startTime = System.currentTimeMillis()
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 4278ade..5c2cc1b 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.command.{CarbonMergerMapping, NodeInfo}
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.sql.util.CarbonException
+import org.apache.spark.util.CollectionAccumulator
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.converter.SparkDataTypeConverterImpl
@@ -55,13 +56,13 @@ import org.apache.carbondata.core.mutate.UpdateVO
 import org.apache.carbondata.core.scan.expression
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
 import org.apache.carbondata.core.statusmanager.{FileFormat, LoadMetadataDetails, SegmentStatusManager, SegmentUpdateStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonInputSplitWrapper, CarbonMultiBlockSplit, CarbonProjection}
 import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
 import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, CarbonInputSplitTaskInfo}
-import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger._
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
@@ -73,7 +74,8 @@ class CarbonMergerRDD[K, V](
     @transient private val ss: SparkSession,
     result: MergeResult[K, V],
     carbonLoadModel: CarbonLoadModel,
-    carbonMergerMapping: CarbonMergerMapping)
+    carbonMergerMapping: CarbonMergerMapping,
+    segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]])
   extends CarbonRDD[(K, V)](ss, Nil) {
 
   ss.sparkContext.setLocalProperty("spark.scheduler.pool", "DDL")
@@ -121,7 +123,8 @@ class CarbonMergerRDD[K, V](
       var mergeNumber = ""
       var exec: CarbonCompactionExecutor = null
       var processor: AbstractResultProcessor = null
-      var rawResultIteratorMap: util.Map[String, util.List[RawResultIterator]] = _
+      var rawResultIteratorMap: util.Map[String, util.List[RawResultIterator]] = new util
+      .HashMap[String, util.List[RawResultIterator]]()
       try {
         // sorting the table block info List.
         val splitList = if (null == rangeColumn || singleRange) {
@@ -202,8 +205,14 @@ class CarbonMergerRDD[K, V](
           new SparkDataTypeConverterImpl)
 
         // add task completion listener to clean up the resources
-        context.addTaskCompletionListener { _ =>
-          close()
+        context.addTaskCompletionListener {
+          new CompactionTaskCompletionListener(carbonLoadModel,
+            exec,
+            processor,
+            rawResultIteratorMap,
+            segmentMetaDataAccumulator,
+            queryStartTime
+          )
         }
         try {
           // fire a query and get the results.
@@ -265,33 +274,6 @@ class CarbonMergerRDD[K, V](
           throw e
       }
 
-      private def close(): Unit = {
-        deleteLocalDataFolders()
-        // close all the query executor service and clean up memory acquired during query processing
-        if (null != exec) {
-          LOGGER.info("Cleaning up query resources acquired during compaction")
-          exec.close(rawResultIteratorMap.get(CarbonCompactionUtil.UNSORTED_IDX), queryStartTime)
-          exec.close(rawResultIteratorMap.get(CarbonCompactionUtil.SORTED_IDX), queryStartTime)
-        }
-        // clean up the resources for processor
-        if (null != processor) {
-          LOGGER.info("Closing compaction processor instance to clean up loading resources")
-          processor.close()
-        }
-      }
-
-      private def deleteLocalDataFolders(): Unit = {
-        try {
-          LOGGER.info("Deleting local folder store location")
-          val isCompactionFlow = true
-          TableProcessingOperations
-            .deleteLocalDataLoadFolderLocation(carbonLoadModel, isCompactionFlow, false)
-        } catch {
-          case e: Exception =>
-            LOGGER.error(e)
-        }
-      }
-
       var finished = false
 
       override def hasNext: Boolean = {
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 4dd472b..d1d8743 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -29,8 +29,9 @@ import org.apache.hadoop.mapreduce.{InputSplit, Job}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.{CarbonUtils, SparkSession, SQLContext}
 import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel, CompactionModel}
+import org.apache.spark.sql.execution.command.management.CommonLoadUtils
 import org.apache.spark.sql.util.SparkSQLUtil
-import org.apache.spark.util.MergeIndexUtil
+import org.apache.spark.util.{CollectionAccumulator, MergeIndexUtil}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.constants.SortScopeOptions.SortScope
@@ -38,6 +39,7 @@ import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.util.CarbonUtil
@@ -194,6 +196,10 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
       OperationListenerBus.getInstance().fireEvent(dataMapPreExecutionEvent,
         dataMapOperationContext)
     }
+    // accumulator to collect segment metadata
+    val segmentMetaDataAccumulator = sqlContext
+      .sparkContext
+      .collectionAccumulator[Map[String, SegmentMetaDataInfo]]
 
     val mergeStatus =
       if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
@@ -201,19 +207,24 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
           sc.sparkSession,
           new MergeResultImpl(),
           carbonLoadModel,
-          carbonMergerMapping
+          carbonMergerMapping,
+          segmentMetaDataAccumulator
         ).collect
       } else if (SortScope.GLOBAL_SORT == carbonTable.getSortScope &&
                  !carbonTable.getSortColumns.isEmpty &&
                  carbonTable.getRangeColumn == null &&
                  CarbonUtil.isStandardCarbonTable(carbonTable)) {
-        compactSegmentsByGlobalSort(sc.sparkSession, carbonLoadModel, carbonMergerMapping)
+        compactSegmentsByGlobalSort(sc.sparkSession,
+          carbonLoadModel,
+          carbonMergerMapping,
+          segmentMetaDataAccumulator)
       } else {
         new CarbonMergerRDD(
           sc.sparkSession,
           new MergeResultImpl(),
           carbonLoadModel,
-          carbonMergerMapping
+          carbonMergerMapping,
+          segmentMetaDataAccumulator
         ).collect
       }
 
@@ -248,21 +259,31 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
       } else {
         // Get the segment files each updated segment in case of IUD compaction
         if (compactionType == CompactionType.IUD_UPDDEL_DELTA) {
-          val segmentFilesList = loadsToMerge.asScala.map{seg =>
+          val segmentFilesList = loadsToMerge.asScala.map { seg =>
+            val segmentMetaDataInfo = new SegmentFileStore(carbonLoadModel.getTablePath,
+              seg.getSegmentFile).getSegmentFile.getSegmentMetaDataInfo
             val file = SegmentFileStore.writeSegmentFile(
               carbonTable,
               seg.getLoadName,
-              carbonLoadModel.getFactTimeStamp.toString)
+              carbonLoadModel.getFactTimeStamp.toString,
+              segmentMetaDataInfo)
             new Segment(seg.getLoadName, file)
           }.filter(_.getSegmentFileName != null).asJava
           segmentFilesForIUDCompact = new util.ArrayList[Segment](segmentFilesList)
         } else {
+          // get segmentMetadata info from accumulator
+          val segmentMetaDataInfo = CommonLoadUtils.getSegmentMetaDataInfoFromAccumulator(
+            mergedLoadNumber,
+            segmentMetaDataAccumulator)
           segmentFileName = SegmentFileStore.writeSegmentFile(
             carbonTable,
             mergedLoadNumber,
-            carbonLoadModel.getFactTimeStamp.toString)
+            carbonLoadModel.getFactTimeStamp.toString,
+            segmentMetaDataInfo)
         }
       }
+      // clear segmentMetaDataAccumulator
+      segmentMetaDataAccumulator.reset()
       // Used to inform the commit listener that the commit is fired from compaction flow.
       operationContext.setProperty("isCompaction", "true")
       // trigger event for compaction
@@ -362,7 +383,9 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
   def compactSegmentsByGlobalSort(
       sparkSession: SparkSession,
       carbonLoadModel: CarbonLoadModel,
-      carbonMergerMapping: CarbonMergerMapping): Array[(String, Boolean)] = {
+      carbonMergerMapping: CarbonMergerMapping,
+      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]])
+  : Array[(String, Boolean)] = {
     val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val splits = splitsOfSegments(
       sparkSession,
@@ -386,7 +409,8 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
         sparkSession,
         Option(dataFrame),
         outputModel,
-        SparkSQLUtil.sessionState(sparkSession).newHadoopConf())
+        SparkSQLUtil.sessionState(sparkSession).newHadoopConf(),
+        segmentMetaDataAccumulator)
         .map { row =>
           (row._1, FailureCauses.NONE == row._2._2.failureCauses)
         }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CompactionTaskCompletionListener.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CompactionTaskCompletionListener.scala
new file mode 100644
index 0000000..c494b7f
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CompactionTaskCompletionListener.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.util
+
+import org.apache.log4j.Logger
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.CarbonCompactionTaskCompletionListener
+import org.apache.spark.sql.execution.command.management.CommonLoadUtils
+import org.apache.spark.util.CollectionAccumulator
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.merger.{AbstractResultProcessor, CarbonCompactionExecutor, CarbonCompactionUtil}
+
+class CompactionTaskCompletionListener(
+    carbonLoadModel: CarbonLoadModel,
+    exec: CarbonCompactionExecutor,
+    processor: AbstractResultProcessor,
+    rawResultIteratorMap: util.Map[String, util.List[RawResultIterator]],
+    segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]],
+    queryStartTime: Long)
+  extends CarbonCompactionTaskCompletionListener {
+
+  val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName)
+
+  override def onTaskCompletion(context: TaskContext): Unit = {
+    deleteLocalDataFolders()
+    // close all the query executor service and clean up memory acquired during query processing
+    if (null != exec) {
+      LOGGER.info("Cleaning up query resources acquired during compaction")
+      exec.close(rawResultIteratorMap.get(CarbonCompactionUtil.UNSORTED_IDX), queryStartTime)
+      exec.close(rawResultIteratorMap.get(CarbonCompactionUtil.SORTED_IDX), queryStartTime)
+    }
+    // clean up the resources for processor
+    if (null != processor) {
+      LOGGER.info("Closing compaction processor instance to clean up loading resources")
+      processor.close()
+    }
+    // fill segment metadata to accumulator
+    CommonLoadUtils.fillSegmentMetaDataInfoToAccumulator(carbonLoadModel.getTableName,
+      carbonLoadModel.getSegmentId,
+      segmentMetaDataAccumulator)
+  }
+
+  private def deleteLocalDataFolders(): Unit = {
+    try {
+      LOGGER.info("Deleting local folder store location")
+      val isCompactionFlow = true
+      TableProcessingOperations
+        .deleteLocalDataLoadFolderLocation(carbonLoadModel, isCompactionFlow, false)
+    } catch {
+      case e: Exception =>
+        LOGGER.error(e)
+    }
+  }
+
+}
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
index b6a2fa7..0daf4a3 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
@@ -20,21 +20,34 @@ package org.apache.carbondata.spark.rdd
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.CarbonLoadTaskCompletionListener
 import org.apache.spark.sql.execution.command.ExecutionErrors
+import org.apache.spark.sql.execution.command.management.CommonLoadUtils
+import org.apache.spark.util.CollectionAccumulator
 
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
 import org.apache.carbondata.core.util.{DataTypeUtil, ThreadLocalTaskInfo}
 import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses}
 import org.apache.carbondata.spark.util.CommonUtil
 
 class InsertTaskCompletionListener(dataLoadExecutor: DataLoadExecutor,
-    executorErrors: ExecutionErrors)
+    executorErrors: ExecutionErrors,
+    segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]],
+    tableName: String,
+    segmentId: String)
   extends CarbonLoadTaskCompletionListener {
   override def onTaskCompletion(context: TaskContext): Unit = {
     try {
-      dataLoadExecutor.close()
+      // fill segment metadata to accumulator
+      CommonLoadUtils.fillSegmentMetaDataInfoToAccumulator(
+        tableName,
+        segmentId,
+        segmentMetaDataAccumulator)
+      if (null != dataLoadExecutor) {
+        dataLoadExecutor.close()
+      }
     }
     catch {
       case e: Exception =>
-        if (executorErrors.failureCauses == FailureCauses.NONE) {
+        if (null != executorErrors && executorErrors.failureCauses == FailureCauses.NONE) {
           // If already error happened before task completion,
           // that error need to be thrown. Not the new error. Hence skip this.
           throw e
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index c1f18b4..23a2683 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -32,13 +32,14 @@ import org.apache.spark.serializer.SerializerInstance
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.command.ExecutionErrors
-import org.apache.spark.util.SparkUtil
+import org.apache.spark.util.{CollectionAccumulator, SparkUtil}
 
 import org.apache.carbondata.common.CarbonIterator
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, DataTypeUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -100,7 +101,8 @@ class NewCarbonDataLoadRDD[K, V](
     @transient private val ss: SparkSession,
     result: DataLoadResult[K, V],
     carbonLoadModel: CarbonLoadModel,
-    blocksGroupBy: Array[(String, Array[BlockDetails])])
+    blocksGroupBy: Array[(String, Array[BlockDetails])],
+    segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]])
   extends CarbonRDD[(K, V)](ss, Nil) {
 
   ss.sparkContext.setLocalProperty("spark.scheduler.pool", "DDL")
@@ -146,7 +148,13 @@ class NewCarbonDataLoadRDD[K, V](
         val executor = new DataLoadExecutor()
         // in case of success, failure or cancelation clear memory and stop execution
         context
-          .addTaskCompletionListener { new InsertTaskCompletionListener(executor, executionErrors) }
+          .addTaskCompletionListener {
+            new InsertTaskCompletionListener(executor,
+              executionErrors,
+              segmentMetaDataAccumulator,
+              carbonLoadModel.getTableName,
+              carbonLoadModel.getSegment.getSegmentNo)
+          }
         executor.execute(model,
           loader.storeLocation,
           recordReaders)
@@ -247,7 +255,9 @@ class NewDataFrameLoaderRDD[K, V](
     @transient private val ss: SparkSession,
     result: DataLoadResult[K, V],
     carbonLoadModel: CarbonLoadModel,
-    prev: DataLoadCoalescedRDD[_]) extends CarbonRDD[(K, V)](ss, prev) {
+    prev: DataLoadCoalescedRDD[_],
+    segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]]
+    ) extends CarbonRDD[(K, V)](ss, prev) {
 
   override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
@@ -298,7 +308,11 @@ class NewDataFrameLoaderRDD[K, V](
         val executor = new DataLoadExecutor
         // in case of success, failure or cancelation clear memory and stop execution
         context
-          .addTaskCompletionListener(new InsertTaskCompletionListener(executor, executionErrors))
+          .addTaskCompletionListener(new InsertTaskCompletionListener(executor,
+            executionErrors,
+            segmentMetaDataAccumulator,
+            carbonLoadModel.getTableName,
+            carbonLoadModel.getSegment.getSegmentNo))
         executor.execute(model, loader.storeLocation, recordReaders.toArray)
       } catch {
         case e: NoRetryException =>
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
index f4fdbc1..3060d4a 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
@@ -21,9 +21,12 @@ import scala.collection.mutable
 
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.command.management.CommonLoadUtils
+import org.apache.spark.util.CollectionAccumulator
 
 import org.apache.carbondata.common.CarbonIterator
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo
 import org.apache.carbondata.processing.loading.{DataLoadExecutor, TableProcessingOperations}
@@ -40,7 +43,8 @@ object UpdateDataLoad {
       index: Long,
       iter: Iterator[Row],
       carbonLoadModel: CarbonLoadModel,
-      loadMetadataDetails: LoadMetadataDetails): Unit = {
+      loadMetadataDetails: LoadMetadataDetails,
+      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]]): Unit = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     try {
       val recordReaders = mutable.Buffer[CarbonIterator[Array[AnyRef]]]()
@@ -58,6 +62,11 @@ object UpdateDataLoad {
       loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
       val executor = new DataLoadExecutor
       TaskContext.get().addTaskCompletionListener { context =>
+        // fill segment metadata to accumulator
+        CommonLoadUtils.fillSegmentMetaDataInfoToAccumulator(
+          carbonLoadModel.getTableName,
+          segId,
+          segmentMetaDataAccumulator)
         executor.close()
         CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)
       }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala
index 5547228..f3a63fb 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala
@@ -39,6 +39,11 @@ trait CarbonQueryTaskCompletionListener extends TaskCompletionListener
  */
 trait CarbonLoadTaskCompletionListener extends TaskCompletionListener
 
+/**
+ * Load completion listener
+ */
+trait CarbonCompactionTaskCompletionListener extends TaskCompletionListener
+
 case class CarbonQueryTaskCompletionListenerImpl(iter: RecordReaderIterator[InternalRow],
     freeMemory: Boolean = false) extends CarbonQueryTaskCompletionListener {
   override def onTaskCompletion(context: TaskContext): Unit = {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index b971dcd..c1888cb 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{ColumnarFormatVersion, SegmentFileStore}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager, StageInput}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.CarbonInputSplit
@@ -282,17 +283,27 @@ case class CarbonInsertFromStageCommand(
                   s"${table.getDatabaseName}.${table.getTableName}")
       val start = System.currentTimeMillis()
       val dataFrame = DataLoadProcessBuilderOnSpark.createInputDataFrame(spark, table, splits)
+      // accumulator to collect segment metadata info such as columnId and it's minMax values
+      val segmentMetaDataAccumulator = spark.sqlContext
+        .sparkContext
+        .collectionAccumulator[Map[String, SegmentMetaDataInfo]]
       if (table.getBucketingInfo == null) {
         DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
           spark,
           Option(dataFrame),
           loadModel,
-          SparkSQLUtil.sessionState(spark).newHadoopConf()
+          SparkSQLUtil.sessionState(spark).newHadoopConf(),
+          segmentMetaDataAccumulator
         ).map { row =>
           (row._1, FailureCauses.NONE == row._2._2.failureCauses)
         }
       } else {
-        CarbonDataRDDFactory.loadDataFrame(spark.sqlContext, Option(dataFrame), None, loadModel)
+        CarbonDataRDDFactory.loadDataFrame(
+          spark.sqlContext,
+          Option(dataFrame),
+          None,
+          loadModel,
+          segmentMetaDataAccumulator)
       }
       LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms")
 
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index 40a732a..f355795 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.unsafe.types.UTF8String
-import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil}
+import org.apache.spark.util.{CarbonReflectionUtils, CollectionAccumulator, SparkUtil}
 
 import org.apache.carbondata.common.Strings
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -56,6 +56,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
+import org.apache.carbondata.core.segmentmeta.{SegmentMetaDataInfo, SegmentMetaDataInfoStats}
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util._
 import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -1089,4 +1090,80 @@ object CommonLoadUtils {
     }
   }
 
+  /**
+   * Fill segment level metadata to accumulator based on tableName and segmentId
+   */
+  def fillSegmentMetaDataInfoToAccumulator(
+      tableName: String,
+      segmentId: String,
+      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]])
+  : CollectionAccumulator[Map[String, SegmentMetaDataInfo]] = {
+    synchronized {
+      val segmentMetaDataInfo = SegmentMetaDataInfoStats.getInstance()
+        .getTableSegmentMetaDataInfo(tableName, segmentId)
+      if (null != segmentMetaDataInfo) {
+        segmentMetaDataAccumulator.add(scala.Predef
+          .Map(segmentId -> segmentMetaDataInfo))
+        SegmentMetaDataInfoStats.getInstance().clear(tableName, segmentId)
+      }
+      segmentMetaDataAccumulator
+    }
+  }
+
+  /**
+   * Collect segmentMetaDataInfo from all tasks and compare min-max values and prepare final
+   * segmentMetaDataInfo
+   *
+   * @param segmentId           collect the segmentMetaDataInfo for the corresponding segmentId
+   * @param metaDataAccumulator segmentMetaDataAccumulator
+   * @return segmentMetaDataInfo
+   */
+  def getSegmentMetaDataInfoFromAccumulator(
+      segmentId: String,
+      metaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]])
+  : SegmentMetaDataInfo = {
+    var segmentMetaDataInfo: SegmentMetaDataInfo = null
+    if (!metaDataAccumulator.isZero) {
+      val segmentMetaData = metaDataAccumulator.value.asScala
+      segmentMetaData.foreach { segmentColumnMetaDataInfo =>
+        val currentValue = segmentColumnMetaDataInfo.get(segmentId)
+        if (currentValue.isDefined) {
+          if (null == segmentMetaDataInfo) {
+            segmentMetaDataInfo = currentValue.get
+          } else if (segmentMetaDataInfo.getSegmentColumnMetaDataInfoMap.isEmpty) {
+            segmentMetaDataInfo = currentValue.get
+          } else {
+            val iterator = currentValue.get.getSegmentColumnMetaDataInfoMap
+              .entrySet()
+              .iterator()
+            while (iterator.hasNext) {
+              val currentSegmentColumnMetaDataMap = iterator.next()
+              if (segmentMetaDataInfo.getSegmentColumnMetaDataInfoMap
+                .containsKey(currentSegmentColumnMetaDataMap.getKey)) {
+                val currentMax = SegmentMetaDataInfoStats.getInstance()
+                  .compareAndUpdateMinMax(segmentMetaDataInfo
+                    .getSegmentColumnMetaDataInfoMap
+                    .get(currentSegmentColumnMetaDataMap.getKey)
+                    .getColumnMaxValue,
+                    currentSegmentColumnMetaDataMap.getValue.getColumnMaxValue,
+                    false)
+                val currentMin = SegmentMetaDataInfoStats.getInstance()
+                  .compareAndUpdateMinMax(segmentMetaDataInfo.getSegmentColumnMetaDataInfoMap
+                    .get(currentSegmentColumnMetaDataMap.getKey).getColumnMinValue,
+                    currentSegmentColumnMetaDataMap.getValue.getColumnMinValue,
+                    true)
+                segmentMetaDataInfo.getSegmentColumnMetaDataInfoMap
+                  .get(currentSegmentColumnMetaDataMap.getKey)
+                  .setColumnMaxValue(currentMax)
+                segmentMetaDataInfo.getSegmentColumnMetaDataInfoMap
+                  .get(currentSegmentColumnMetaDataMap.getKey)
+                  .setColumnMinValue(currentMin)
+              }
+            }
+          }
+        }
+      }
+    }
+    segmentMetaDataInfo
+  }
 }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
index 91b5038..ef19585 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
@@ -203,6 +203,7 @@ object SecondaryIndexUtil {
               seg.getLoadName,
               segmentIdToLoadStartTimeMapping(seg.getLoadName).toString,
               carbonLoadModel.getFactTimeStamp.toString,
+              null,
               null)
             val segment = new Segment(seg.getLoadName, file)
             SegmentFileStore.updateTableStatusFile(indexCarbonTable,
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestPruneUsingSegmentMinMax.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestPruneUsingSegmentMinMax.scala
new file mode 100644
index 0000000..ab8dd62
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestPruneUsingSegmentMinMax.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.allqueries
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class TestPruneUsingSegmentMinMax extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll(): Unit = {
+    drop
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_LOAD_ALL_SEGMENT_INDEXES_TO_CACHE, "false")
+  }
+
+  private def drop = {
+    sql("drop table if exists carbon")
+    sql("drop table if exists parquet")
+  }
+
+  test("test if matched segment is only loaded to cache") {
+    createTablesAndLoadData
+    checkAnswer(sql("select * from carbon where a=1"),sql("select * from parquet where a=1"))
+    val showCache = sql("show metacache on table carbon").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("1/3 index files cached"))
+    drop
+  }
+
+  private def createTablesAndLoadData = {
+    drop
+    sql("create table carbon(a int, b string, c double,d int,e timestamp) stored as carbondata")
+    sql("insert into carbon values(1,'ab',23.4,5,'2017-09-01 00:00:00'),(2,'aa',23.6,8,'2017-09-02 00:00:00')")
+    sql("insert into carbon values(3,'ab',23.4,5,'2017-09-01 00:00:00'),(4,'aa',23.6,8,'2017-09-02 00:00:00')")
+    sql("insert into carbon values(5,'ab',23.4,5,'2017-09-01 00:00:00'),(6,'aa',23.6,8,'2017-09-02 00:00:00')")
+    sql("create table parquet(a int, b string, c double,d int,e timestamp) using parquet")
+    sql("insert into parquet values(1,'ab',23.4,5,'2017-09-01 00:00:00'),(2,'aa',23.6,8,'2017-09-02 00:00:00')")
+    sql("insert into parquet values(3,'ab',23.4,5,'2017-09-01 00:00:00'),(4,'aa',23.6,8,'2017-09-02 00:00:00')")
+    sql("insert into parquet values(5,'ab',23.4,5,'2017-09-01 00:00:00'),(6,'aa',23.6,8,'2017-09-02 00:00:00')")
+  }
+
+  test("test if matched segment is only loaded to cache after drop column") {
+    createTablesAndLoadData
+    checkAnswer(sql("select * from carbon where a=1"),sql("select * from parquet where a=1"))
+    var showCache = sql("show metacache on table carbon").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("1/3 index files cached"))
+    checkAnswer(sql("select * from carbon where a=5"),sql("select * from parquet where a=5"))
+    showCache = sql("show metacache on table carbon").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("2/3 index files cached"))
+    sql("alter table carbon drop columns(d,e)")
+    sql("insert into carbon values(7,'gg',45.6),(8,'eg',45.6)")
+    checkAnswer(sql("select a from carbon where a=1"),sql("select a from parquet where a=1"))
+    showCache = sql("show metacache on table carbon").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("1/4 index files cached"))
+    checkAnswer(sql("select * from carbon where a=7"), Seq(Row(7, "gg", 45.6)))
+    showCache = sql("show metacache on table carbon").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("2/4 index files cached"))
+    drop
+  }
+
+  test("test if matched segment is only loaded to cache after add column") {
+    createTablesAndLoadData
+    sql("alter table carbon add columns(g decimal(3,2))")
+    sql("insert into carbon values(7,'gg',45.6,3,'2017-09-01 00:00:00',23.5),(8,'eg',45.6,6,'2017-09-01 00:00:00', 4.34)")
+    checkAnswer(sql("select a from carbon where a=1"),sql("select a from parquet where a=1"))
+    var showCache = sql("show metacache on table carbon").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("1/4 index files cached"))
+    checkAnswer(sql("select a from carbon where a=7"), Seq(Row(7)))
+    showCache = sql("show metacache on table carbon").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("2/4 index files cached"))
+    drop
+  }
+
+  test("test segment pruning after update operation") {
+    createTablesAndLoadData
+    checkAnswer(sql("select a from carbon where a=1"), Seq(Row(1)))
+    var showCache = sql("show metacache on table carbon").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("1/3 index files cached"))
+    sql("insert into carbon values(1,'ab',23.4,5,'2017-09-01 00:00:00'),(2,'aa',23.6,8,'2017-09-02 00:00:00')")
+    sql("insert into carbon values(1,'ab',23.4,5,'2017-09-01 00:00:00'),(2,'aa',23.6,8,'2017-09-02 00:00:00')")
+    sql("update carbon set(a)=(10) where a=1").show(false)
+    checkAnswer(sql("select count(*) from carbon where a=10"), Seq(Row(3)))
+    showCache = sql("show metacache on table carbon").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("6/8 index files cached"))
+    drop
+  }
+
+  test("alter set/unset sort column properties") {
+    createTablesAndLoadData
+    sql(s"alter table carbon set tblproperties('sort_scope'='local_sort', 'sort_columns'='a')")
+    sql("insert into carbon values(3,'ab',23.4,5,'2017-09-01 00:00:00'),(4,'aa',23.6,8,'2017-09-02 00:00:00')")
+    sql("insert into carbon values(3,'ab',23.4,5,'2017-09-01 00:00:00'),(6,'aa',23.6,8,'2017-09-02 00:00:00')")
+    assert(sql("select a from carbon where a=3").count() == 3)
+    val showCache = sql("show metacache on table carbon").collect()
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("3/5 index files cached"))
+  }
+
+  override def afterAll(): Unit = {
+    drop
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_LOAD_ALL_SEGMENT_INDEXES_TO_CACHE,
+        CarbonCommonConstants.CARBON_LOAD_ALL_SEGMENT_INDEXES_TO_CACHE_DEFAULT)
+  }
+
+}
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 17c47bd..fadaef0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -40,7 +40,10 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
 import org.apache.carbondata.core.metadata.index.BlockIndexInfo;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.segmentmeta.BlockColumnMetaDataInfo;
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfoStats;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
@@ -389,6 +392,20 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
     indexHeader.setIs_sort(model.getSortScope() != null && model.getSortScope() != NO_SORT);
     // get the block index info thrift
     List<BlockIndex> blockIndexThrift = CarbonMetadataUtil.getBlockIndexInfo(blockIndexInfoList);
+    // get all block minmax and add to segmentMinMaxMap
+    CarbonTable carbonTable = model.getTableSpec().getCarbonTable();
+    if (null != model.getSegmentId() && !carbonTable.isHivePartitionTable() && !carbonTable
+        .isIndexTable()) {
+      for (BlockIndexInfo blockIndex : blockIndexInfoList) {
+        byte[][] min = blockIndex.getBlockletIndex().getMinMaxIndex().getMinValues();
+        byte[][] max = blockIndex.getBlockletIndex().getMinMaxIndex().getMaxValues();
+        BlockColumnMetaDataInfo blockColumnMetaDataInfo =
+            new BlockColumnMetaDataInfo(thriftColumnSchemaList, min, max);
+        SegmentMetaDataInfoStats.getInstance()
+            .setBlockMetaDataInfo(model.getTableName(), model.getSegmentId(),
+                blockColumnMetaDataInfo);
+      }
+    }
     String indexFileName;
     if (enableDirectlyWriteDataToStorePath) {
       String rawFileName = model.getCarbonDataDirectoryPath() + CarbonCommonConstants.FILE_SEPARATOR