You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/04/20 16:13:47 UTC

[1/2] carbondata git commit: [CARBONDATA-2361] Refactoring ReadCommittedStatus

Repository: carbondata
Updated Branches:
  refs/heads/master 2fc0ad306 -> c58eb43ba


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 30a620c..6e7f2d6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -80,7 +80,7 @@ public class DataMapWriterListener {
     }
     List<String> columns = factory.getMeta().getIndexedColumns();
     List<DataMapWriter> writers = registry.get(columns);
-    DataMapWriter writer = factory.createWriter(new Segment(segmentId, null), dataWritePath);
+    DataMapWriter writer = factory.createWriter(new Segment(segmentId, null, null), dataWritePath);
     if (writers != null) {
       writers.add(writer);
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 5bc85f8..2ac351c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -231,8 +231,8 @@ public final class CarbonDataMergerUtil {
                     .setUpdateStatusFileName(CarbonUpdateUtil.getUpdateStatusFileName(timestamp));
               }
               // Update segement file name to status file
-              int segmentFileIndex =
-                  segmentFilesToBeUpdated.indexOf(Segment.toSegment(loadDetail.getLoadName()));
+              int segmentFileIndex = segmentFilesToBeUpdated
+                  .indexOf(Segment.toSegment(loadDetail.getLoadName(), null));
               if (segmentFileIndex > -1) {
                 loadDetail.setSegmentFile(
                     segmentFilesToBeUpdated.get(segmentFileIndex).getSegmentFileName());
@@ -857,9 +857,9 @@ public final class CarbonDataMergerUtil {
       //check if this load is an already merged load.
       if (null != segment.getMergedLoadName()) {
 
-        segments.add(Segment.toSegment(segment.getMergedLoadName()));
+        segments.add(Segment.toSegment(segment.getMergedLoadName(), null));
       } else {
-        segments.add(Segment.toSegment(segment.getLoadName()));
+        segments.add(Segment.toSegment(segment.getLoadName(), null));
       }
     }
     return segments;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index aabe91a..2b4748f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -295,7 +295,8 @@ public final class CarbonLoaderUtil {
           // if the segments is in the list of marked for delete then update the status.
           if (segmentsToBeDeleted.contains(new Segment(detail.getLoadName(), null))) {
             detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
-          } else if (segmentFilesTobeUpdated.contains(Segment.toSegment(detail.getLoadName()))) {
+          } else if (segmentFilesTobeUpdated
+              .contains(Segment.toSegment(detail.getLoadName(), null))) {
             detail.setSegmentFile(
                 detail.getLoadName() + "_" + newMetaEntry.getUpdateStatusFileName()
                     + CarbonTablePath.SEGMENT_EXT);


[2/2] carbondata git commit: [CARBONDATA-2361] Refactoring ReadCommittedStatus

Posted by ja...@apache.org.
[CARBONDATA-2361] Refactoring ReadCommittedStatus

Move ReadCommittedStatus into Segment class

This closes #2179


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c58eb43b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c58eb43b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c58eb43b

Branch: refs/heads/master
Commit: c58eb43bacf4dd8918e647d507a74798f26fd3bb
Parents: 2fc0ad3
Author: sounakr <so...@gmail.com>
Authored: Wed Apr 18 09:08:27 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Sat Apr 21 00:13:29 2018 +0800

----------------------------------------------------------------------
 .../apache/carbondata/core/datamap/Segment.java | 63 +++++++++++++++++---
 .../carbondata/core/datamap/TableDataMap.java   | 31 ++++------
 .../core/datamap/dev/DataMapFactory.java        |  5 +-
 .../datamap/dev/expr/AndDataMapExprWrapper.java | 10 ++--
 .../datamap/dev/expr/DataMapExprWrapper.java    |  4 +-
 .../dev/expr/DataMapExprWrapperImpl.java        |  7 +--
 .../datamap/dev/expr/OrDataMapExprWrapper.java  | 10 ++--
 .../core/indexstore/BlockletDetailsFetcher.java | 13 +---
 .../indexstore/SegmentPropertiesFetcher.java    |  4 +-
 .../blockletindex/BlockletDataMapFactory.java   | 45 ++++++--------
 .../core/metadata/SegmentFileStore.java         |  4 +-
 .../core/mutate/CarbonUpdateUtil.java           |  5 +-
 .../LatestFilesReadCommittedScope.java          |  1 +
 .../ReadCommittedIndexFileSnapShot.java         |  3 +-
 .../core/readcommitter/ReadCommittedScope.java  |  3 +-
 .../TableStatusReadCommittedScope.java          |  8 +++
 .../statusmanager/SegmentStatusManager.java     | 35 +++++++----
 .../apache/carbondata/core/util/CarbonUtil.java |  2 +-
 .../examples/MinMaxIndexDataMapFactory.java     | 10 +---
 .../lucene/LuceneCoarseGrainDataMapFactory.java |  9 +--
 .../lucene/LuceneFineGrainDataMapFactory.java   |  7 +--
 .../hadoop/api/CarbonFileInputFormat.java       |  9 ++-
 .../hadoop/api/CarbonInputFormat.java           | 16 +++--
 .../hadoop/api/CarbonOutputCommitter.java       |  5 +-
 .../hadoop/api/CarbonTableInputFormat.java      | 40 +++++++------
 .../hadoop/api/DistributableDataMapFormat.java  |  7 +--
 .../testsuite/datamap/CGDataMapTestCase.scala   |  4 +-
 .../testsuite/datamap/DataMapWriterSuite.scala  |  4 +-
 .../testsuite/datamap/FGDataMapTestCase.scala   |  4 +-
 .../testsuite/datamap/TestDataMapStatus.scala   |  6 +-
 .../TestInsertAndOtherCommandConcurrent.scala   |  4 +-
 .../carbondata/spark/rdd/StreamHandoffRDD.scala |  2 +-
 .../org/apache/spark/util/PartitionUtils.scala  |  7 ++-
 .../spark/rdd/AggregateDataMapCompactor.scala   |  2 +-
 .../spark/rdd/CarbonTableCompactor.scala        |  1 +
 .../preaaggregate/PreAggregateListeners.scala   |  2 +-
 .../datasources/SparkCarbonFileFormat.scala     | 11 ++--
 .../spark/sql/optimizer/CarbonFilters.scala     |  2 +-
 .../datamap/DataMapWriterListener.java          |  2 +-
 .../processing/merger/CarbonDataMergerUtil.java |  8 +--
 .../processing/util/CarbonLoaderUtil.java       |  3 +-
 41 files changed, 222 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index a2a2a41..0eb82ec 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -16,11 +16,14 @@
  */
 package org.apache.carbondata.core.datamap;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -36,9 +39,47 @@ public class Segment implements Serializable {
 
   private String segmentFileName;
 
+  /**
+   * Points to the Read Committed Scope of the segment. This is a flavor of
+   * transactional isolation level which only allows snapshot read of the
+   * data and make non committed data invisible to the reader.
+   */
+  private ReadCommittedScope readCommittedScope;
+
+  /**
+   * ReadCommittedScope will be null. So getCommittedIndexFile will not work and will throw
+   * a NullPointerException. In case getCommittedIndexFile is need to be accessed then
+   * use the other constructor and pass proper ReadCommittedScope.
+   * @param segmentNo
+   * @param segmentFileName
+   */
   public Segment(String segmentNo, String segmentFileName) {
     this.segmentNo = segmentNo;
     this.segmentFileName = segmentFileName;
+    this.readCommittedScope = null;
+  }
+
+  /**
+   *
+   * @param segmentNo
+   * @param segmentFileName
+   * @param readCommittedScope
+   */
+  public Segment(String segmentNo, String segmentFileName, ReadCommittedScope readCommittedScope) {
+    this.segmentNo = segmentNo;
+    this.segmentFileName = segmentFileName;
+    this.readCommittedScope = readCommittedScope;
+  }
+
+  /**
+   *
+   * @return map of Absolute path of index file as key and null as value -- without mergeIndex
+   * map of AbsolutePath with fileName of MergeIndex parent file as key and mergeIndexFileName
+   *                                                             as value -- with mergeIndex
+   * @throws IOException
+   */
+  public Map<String, String> getCommittedIndexFile() throws IOException {
+    return readCommittedScope.getCommittedIndexFile(this);
   }
 
   public String getSegmentNo() {
@@ -49,35 +90,39 @@ public class Segment implements Serializable {
     return segmentFileName;
   }
 
-  public static List<Segment> toSegmentList(String[] segmentIds) {
+  public static List<Segment> toSegmentList(String[] segmentIds,
+      ReadCommittedScope readCommittedScope) {
     List<Segment> list = new ArrayList<>(segmentIds.length);
     for (String segmentId : segmentIds) {
-      list.add(toSegment(segmentId));
+      list.add(toSegment(segmentId, readCommittedScope));
     }
     return list;
   }
 
-  public static List<Segment> toSegmentList(List<String> segmentIds) {
+  public static List<Segment> toSegmentList(List<String> segmentIds,
+      ReadCommittedScope readCommittedScope) {
     List<Segment> list = new ArrayList<>(segmentIds.size());
     for (String segmentId : segmentIds) {
-      list.add(toSegment(segmentId));
+      list.add(toSegment(segmentId, readCommittedScope));
     }
     return list;
   }
 
   /**
-   * SegmentId can be combination of segmentNo and segmentFileName
+   * readCommittedScope provide Read Snapshot isolation.
    * @param segmentId
+   * @param readCommittedScope
    * @return
    */
-  public static Segment toSegment(String segmentId) {
+  public static Segment toSegment(String segmentId, ReadCommittedScope readCommittedScope) {
+    // SegmentId can be combination of segmentNo and segmentFileName.
     String[] split = segmentId.split("#");
     if (split.length > 1) {
-      return new Segment(split[0], split[1]);
+      return new Segment(split[0], split[1], readCommittedScope);
     } else if (split.length > 0) {
-      return new Segment(split[0], null);
+      return new Segment(split[0], null, readCommittedScope);
     }
-    return new Segment(segmentId, null);
+    return new Segment(segmentId, null, readCommittedScope);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 8143b1c..2dc6317 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -35,7 +35,6 @@ import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.events.Event;
 import org.apache.carbondata.events.OperationContext;
@@ -80,29 +79,26 @@ public final class TableDataMap extends OperationEventListener {
    *
    * @param segments
    * @param filterExp
-   * @param readCommittedScope
    * @return
    */
   public List<ExtendedBlocklet> prune(List<Segment> segments, FilterResolverIntf filterExp,
-      List<PartitionSpec> partitions, ReadCommittedScope readCommittedScope) throws IOException {
+      List<PartitionSpec> partitions) throws IOException {
     List<ExtendedBlocklet> blocklets = new ArrayList<>();
     SegmentProperties segmentProperties;
     for (Segment segment : segments) {
       List<Blocklet> pruneBlocklets = new ArrayList<>();
       // if filter is not passed then return all the blocklets
       if (filterExp == null) {
-        pruneBlocklets = blockletDetailsFetcher.getAllBlocklets(segment, partitions,
-            readCommittedScope);
+        pruneBlocklets = blockletDetailsFetcher.getAllBlocklets(segment, partitions);
       } else {
-        List<DataMap> dataMaps = dataMapFactory.getDataMaps(segment, readCommittedScope);
-        segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment,
-            readCommittedScope);
+        List<DataMap> dataMaps = dataMapFactory.getDataMaps(segment);
+        segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment);
         for (DataMap dataMap : dataMaps) {
           pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties, partitions));
         }
       }
       blocklets.addAll(addSegmentId(
-          blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment, readCommittedScope),
+          blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment),
           segment.getSegmentNo()));
     }
     return blocklets;
@@ -143,19 +139,16 @@ public final class TableDataMap extends OperationEventListener {
    *
    * @param distributable
    * @param filterExp
-   * @param readCommittedScope
    * @return
    */
   public List<ExtendedBlocklet> prune(DataMapDistributable distributable,
-      FilterResolverIntf filterExp, List<PartitionSpec> partitions,
-      ReadCommittedScope readCommittedScope) throws IOException {
+      FilterResolverIntf filterExp, List<PartitionSpec> partitions) throws IOException {
     List<ExtendedBlocklet> detailedBlocklets = new ArrayList<>();
     List<Blocklet> blocklets = new ArrayList<>();
-    List<DataMap> dataMaps = dataMapFactory.getDataMaps(distributable, readCommittedScope);
+    List<DataMap> dataMaps = dataMapFactory.getDataMaps(distributable);
     for (DataMap dataMap : dataMaps) {
       blocklets.addAll(dataMap.prune(filterExp,
-          segmentPropertiesFetcher.getSegmentProperties(distributable.getSegment(),
-              readCommittedScope),
+          segmentPropertiesFetcher.getSegmentProperties(distributable.getSegment()),
           partitions));
     }
     BlockletSerializer serializer = new BlockletSerializer();
@@ -167,7 +160,7 @@ public final class TableDataMap extends OperationEventListener {
     }
     for (Blocklet blocklet : blocklets) {
       ExtendedBlocklet detailedBlocklet = blockletDetailsFetcher
-          .getExtendedBlocklet(blocklet, distributable.getSegment(), readCommittedScope);
+          .getExtendedBlocklet(blocklet, distributable.getSegment());
       if (dataMapFactory.getDataMapType() == DataMapLevel.FG) {
         String blockletwritePath =
             writePath + CarbonCommonConstants.FILE_SEPARATOR + System.nanoTime();
@@ -221,16 +214,14 @@ public final class TableDataMap extends OperationEventListener {
    *
    * @param segments
    * @param filterExp
-   * @param readCommittedScope
    * @return
    * @throws IOException
    */
-  public List<Segment> pruneSegments(List<Segment> segments, FilterResolverIntf filterExp,
-      ReadCommittedScope readCommittedScope)
+  public List<Segment> pruneSegments(List<Segment> segments, FilterResolverIntf filterExp)
       throws IOException {
     List<Segment> prunedSegments = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     for (Segment segment : segments) {
-      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segment, readCommittedScope);
+      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segment);
       for (DataMap dataMap : dataMaps) {
         if (dataMap.isScanRequired(filterExp)) {
           // If any one task in a given segment contains the data that means the segment need to

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index 70f2772..a315ce6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -26,7 +26,6 @@ import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.events.Event;
 
 /**
@@ -48,12 +47,12 @@ public interface DataMapFactory<T extends DataMap> {
   /**
    * Get the datamap for segmentid
    */
-  List<T> getDataMaps(Segment segment, ReadCommittedScope readCommittedScope) throws IOException;
+  List<T> getDataMaps(Segment segment) throws IOException;
 
   /**
    * Get datamaps for distributable object.
    */
-  List<T> getDataMaps(DataMapDistributable distributable, ReadCommittedScope readCommittedScope)
+  List<T> getDataMaps(DataMapDistributable distributable)
       throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
index 850e08a..c573dcb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
@@ -24,7 +24,6 @@ import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 
 /**
@@ -46,11 +45,10 @@ public class AndDataMapExprWrapper implements DataMapExprWrapper {
   }
 
   @Override
-  public List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune,
-      ReadCommittedScope readCommittedScope) throws IOException {
-    List<ExtendedBlocklet> leftPrune = left.prune(segments, partitionsToPrune, readCommittedScope);
-    List<ExtendedBlocklet> rightPrune =
-        right.prune(segments, partitionsToPrune, readCommittedScope);
+  public List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune)
+      throws IOException {
+    List<ExtendedBlocklet> leftPrune = left.prune(segments, partitionsToPrune);
+    List<ExtendedBlocklet> rightPrune = right.prune(segments, partitionsToPrune);
     List<ExtendedBlocklet> andBlocklets = new ArrayList<>();
     for (ExtendedBlocklet blocklet : leftPrune) {
       if (rightPrune.contains(blocklet)) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
index b5fb173..14cfc33 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
@@ -24,7 +24,6 @@ import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 
 /**
@@ -37,8 +36,7 @@ public interface DataMapExprWrapper extends Serializable {
    * It get the blocklets from each leaf node datamap and apply expressions on the blocklets
    * using list of segments, it is used in case on non distributable datamap.
    */
-  List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune,
-      ReadCommittedScope readCommittedScope)
+  List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune)
       throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
index ffd4f80..f9518ba 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
@@ -27,7 +27,6 @@ import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 
 public class DataMapExprWrapperImpl implements DataMapExprWrapper {
@@ -47,9 +46,9 @@ public class DataMapExprWrapperImpl implements DataMapExprWrapper {
   }
 
   @Override
-  public List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune,
-      ReadCommittedScope readCommittedScope) throws IOException {
-    return dataMap.prune(segments, expression, partitionsToPrune, readCommittedScope);
+  public List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune)
+      throws IOException {
+    return dataMap.prune(segments, expression, partitionsToPrune);
   }
 
   @Override public List<ExtendedBlocklet> pruneBlocklets(List<ExtendedBlocklet> blocklets)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
index 0667d0a..93dd242 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
@@ -26,7 +26,6 @@ import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 
 /**
@@ -48,11 +47,10 @@ public class OrDataMapExprWrapper implements DataMapExprWrapper {
   }
 
   @Override
-  public List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune,
-      ReadCommittedScope readCommittedScope) throws IOException {
-    List<ExtendedBlocklet> leftPrune = left.prune(segments, partitionsToPrune, readCommittedScope);
-    List<ExtendedBlocklet> rightPrune =
-        right.prune(segments, partitionsToPrune, readCommittedScope);
+  public List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune)
+      throws IOException {
+    List<ExtendedBlocklet> leftPrune = left.prune(segments, partitionsToPrune);
+    List<ExtendedBlocklet> rightPrune = right.prune(segments, partitionsToPrune);
     Set<ExtendedBlocklet> andBlocklets = new HashSet<>();
     andBlocklets.addAll(leftPrune);
     andBlocklets.addAll(rightPrune);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
index cf283f2..58c11db 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
@@ -20,7 +20,6 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.carbondata.core.datamap.Segment;
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 
 /**
  * Fetches the detailed blocklet which has more information to execute the query
@@ -32,12 +31,10 @@ public interface BlockletDetailsFetcher {
    *
    * @param blocklets
    * @param segment
-   * @param readCommittedScope
    * @return
    * @throws IOException
    */
-  List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, Segment segment,
-      ReadCommittedScope readCommittedScope)
+  List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, Segment segment)
       throws IOException;
 
   /**
@@ -45,21 +42,17 @@ public interface BlockletDetailsFetcher {
    *
    * @param blocklet
    * @param segment
-   * @param readCommittedScope
    * @return
    * @throws IOException
    */
-  ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment,
-      ReadCommittedScope readCommittedScope) throws IOException;
+  ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment) throws IOException;
 
   /**
    * Get all the blocklets in a segment
    *
    * @param segment
-   * @param readCommittedScope
    * @return
    */
-  List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions,
-      ReadCommittedScope readCommittedScope)
+  List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions)
       throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
index d464083..b7fb98c 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 
 /**
  * Fetches the detailed segmentProperties which has more information to execute the query
@@ -31,10 +30,9 @@ public interface SegmentPropertiesFetcher {
   /**
    * get the Segment properties based on the SegmentID.
    * @param segmentId
-   * @param readCommittedScope
    * @return
    * @throws IOException
    */
-  SegmentProperties getSegmentProperties(Segment segment, ReadCommittedScope readCommittedScope)
+  SegmentProperties getSegmentProperties(Segment segment)
       throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 4c8ac0c..caac733 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -45,7 +45,6 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
@@ -84,24 +83,20 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
     throw new UnsupportedOperationException("not implemented");
   }
 
-  @Override public List<CoarseGrainDataMap> getDataMaps(Segment segment,
-      ReadCommittedScope readCommittedScope) throws IOException {
+  @Override public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException {
     List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
-        getTableBlockIndexUniqueIdentifiers(segment, readCommittedScope);
+        getTableBlockIndexUniqueIdentifiers(segment);
     return cache.getAll(tableBlockIndexUniqueIdentifiers);
   }
 
-  private List<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment,
-      ReadCommittedScope readCommittedScope) throws IOException {
-    if (readCommittedScope == null) {
-      throw new IOException("readCommittedScope is null. Internal error");
-    }
+  private List<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment)
+      throws IOException {
     List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
         segmentMap.get(segment.getSegmentNo());
     if (tableBlockIndexUniqueIdentifiers == null) {
       tableBlockIndexUniqueIdentifiers = new ArrayList<>();
-      Map<String, String> indexFiles = readCommittedScope.getCommittedIndexFile(segment);
-      for (Map.Entry<String, String> indexFileEntry: indexFiles.entrySet()) {
+      Map<String, String> indexFiles = segment.getCommittedIndexFile();
+      for (Map.Entry<String, String> indexFileEntry : indexFiles.entrySet()) {
         Path indexFile = new Path(indexFileEntry.getKey());
         tableBlockIndexUniqueIdentifiers.add(
             new TableBlockIndexUniqueIdentifier(indexFile.getParent().toString(),
@@ -118,8 +113,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
    * default datamap.
    */
   @Override
-  public List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, Segment segment,
-      ReadCommittedScope readCommittedScope)
+  public List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, Segment segment)
       throws IOException {
     List<ExtendedBlocklet> detailedBlocklets = new ArrayList<>();
     // If it is already detailed blocklet then type cast and return same
@@ -130,7 +124,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
       return detailedBlocklets;
     }
     List<TableBlockIndexUniqueIdentifier> identifiers =
-        getTableBlockIndexUniqueIdentifiers(segment, readCommittedScope);
+        getTableBlockIndexUniqueIdentifiers(segment);
     // Retrieve each blocklets detail information from blocklet datamap
     for (Blocklet blocklet : blocklets) {
       detailedBlocklets.add(getExtendedBlocklet(identifiers, blocklet));
@@ -139,14 +133,13 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
   }
 
   @Override
-  public ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment,
-      ReadCommittedScope readCommittedScope)
+  public ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment)
       throws IOException {
     if (blocklet instanceof ExtendedBlocklet) {
       return (ExtendedBlocklet) blocklet;
     }
     List<TableBlockIndexUniqueIdentifier> identifiers =
-        getTableBlockIndexUniqueIdentifiers(segment, readCommittedScope);
+        getTableBlockIndexUniqueIdentifiers(segment);
     return getExtendedBlocklet(identifiers, blocklet);
   }
 
@@ -221,13 +214,12 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
   @Override
   public void clear() {
     for (String segmentId : segmentMap.keySet().toArray(new String[segmentMap.size()])) {
-      clear(new Segment(segmentId, null));
+      clear(new Segment(segmentId, null, null));
     }
   }
 
   @Override
-  public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable,
-      ReadCommittedScope readCommittedScope)
+  public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
       throws IOException {
     BlockletDataMapDistributable mapDistributable = (BlockletDataMapDistributable) distributable;
     List<TableBlockIndexUniqueIdentifier> identifiers = new ArrayList<>();
@@ -267,9 +259,8 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
 
   }
 
-  @Override public SegmentProperties getSegmentProperties(Segment segment,
-      ReadCommittedScope readCommittedScope) throws IOException {
-    List<CoarseGrainDataMap> dataMaps = getDataMaps(segment, readCommittedScope);
+  @Override public SegmentProperties getSegmentProperties(Segment segment) throws IOException {
+    List<CoarseGrainDataMap> dataMaps = getDataMaps(segment);
     assert (dataMaps.size() > 0);
     CoarseGrainDataMap coarseGrainDataMap = dataMaps.get(0);
     assert (coarseGrainDataMap instanceof BlockletDataMap);
@@ -277,13 +268,13 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
     return dataMap.getSegmentProperties();
   }
 
-  @Override public List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions,
-      ReadCommittedScope readCommittedScope) throws IOException {
+  @Override public List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions)
+      throws IOException {
     List<Blocklet> blocklets = new ArrayList<>();
-    List<CoarseGrainDataMap> dataMaps = getDataMaps(segment, readCommittedScope);
+    List<CoarseGrainDataMap> dataMaps = getDataMaps(segment);
     for (CoarseGrainDataMap dataMap : dataMaps) {
       blocklets.addAll(
-          dataMap.prune(null, getSegmentProperties(segment, readCommittedScope), partitions));
+          dataMap.prune(null, getSegmentProperties(segment), partitions));
     }
     return blocklets;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index d609c56..dff496b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -586,8 +586,8 @@ public class SegmentFileStore {
           new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier())
               .getValidAndInvalidSegments().getValidSegments());
       CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, uniqueId, true,
-          Segment.toSegmentList(toBeDeleteSegments), Segment.toSegmentList(toBeUpdatedSegments),
-          uuid);
+          Segment.toSegmentList(toBeDeleteSegments, null),
+          Segment.toSegmentList(toBeUpdatedSegments, null), uuid);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index 93e5580..1d0ef44 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -248,7 +248,8 @@ public class CarbonUpdateUtil {
                 // update end timestamp for each time.
                 loadMetadata.setUpdateDeltaEndTimestamp(updatedTimeStamp);
               }
-              if (segmentFilesTobeUpdated.contains(Segment.toSegment(loadMetadata.getLoadName()))) {
+              if (segmentFilesTobeUpdated
+                  .contains(Segment.toSegment(loadMetadata.getLoadName(), null))) {
                 loadMetadata.setSegmentFile(loadMetadata.getLoadName() + "_" + updatedTimeStamp
                     + CarbonTablePath.SEGMENT_EXT);
               }
@@ -553,7 +554,7 @@ public class CarbonUpdateUtil {
           }
         }
         if (updateSegmentFile) {
-          segmentFilesToBeUpdated.add(Segment.toSegment(segment.getLoadName()));
+          segmentFilesToBeUpdated.add(Segment.toSegment(segment.getLoadName(), null));
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
index 631d12c..6afa280 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
@@ -42,6 +42,7 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
   private String carbonFilePath;
   private ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot;
   private LoadMetadataDetails[] loadMetadataDetails;
+
   public LatestFilesReadCommittedScope(String path) {
     this.carbonFilePath = path;
     try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java
index 4491a29..3e8e04f 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.core.readcommitter;
 
+import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 
@@ -29,7 +30,7 @@ import org.apache.carbondata.common.annotations.InterfaceStability;
  */
 @InterfaceAudience.Internal
 @InterfaceStability.Evolving
-public class ReadCommittedIndexFileSnapShot {
+public class ReadCommittedIndexFileSnapShot implements Serializable {
 
   /**
    * Segment Numbers are mapped with list of Index Files.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
index f6ba78e..9ae462b 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
@@ -17,6 +17,7 @@
 package org.apache.carbondata.core.readcommitter;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Map;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
@@ -29,7 +30,7 @@ import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
  */
 @InterfaceAudience.Internal
 @InterfaceStability.Stable
-public interface ReadCommittedScope {
+public interface ReadCommittedScope extends Serializable {
 
   public LoadMetadataDetails[] getSegmentList() throws IOException;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
index 4f54241..bc4a90d 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
@@ -35,7 +35,9 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
 @InterfaceAudience.Internal
 @InterfaceStability.Stable
 public class TableStatusReadCommittedScope implements ReadCommittedScope {
+
   private LoadMetadataDetails[] loadMetadataDetails;
+
   private AbsoluteTableIdentifier identifier;
 
   public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier) throws IOException {
@@ -43,6 +45,12 @@ public class TableStatusReadCommittedScope implements ReadCommittedScope {
     takeCarbonIndexFileSnapShot();
   }
 
+  public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
+      LoadMetadataDetails[] loadMetadataDetails) throws IOException {
+    this.identifier = identifier;
+    this.loadMetadataDetails = loadMetadataDetails;
+  }
+
   @Override public LoadMetadataDetails[] getSegmentList() throws IOException {
     try {
       if (loadMetadataDetails == null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index be53f2b..a4011bc 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -47,6 +47,8 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DeleteLoadFolders;
@@ -98,15 +100,20 @@ public class SegmentStatusManager {
    * @throws IOException
    */
   public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws IOException {
-    return getValidAndInvalidSegments(null);
+    return getValidAndInvalidSegments(null, null);
+  }
+
+  public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(
+      LoadMetadataDetails[] loadMetadataDetails) throws IOException {
+    return getValidAndInvalidSegments(loadMetadataDetails, null);
   }
 
   /**
    * get valid segment for given load status details.
-   *
    */
   public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments(
-      LoadMetadataDetails[] loadMetadataDetails) throws IOException {
+      LoadMetadataDetails[] loadMetadataDetails, ReadCommittedScope readCommittedScope)
+      throws IOException {
 
     // @TODO: move reading LoadStatus file to separate class
     List<Segment> listOfValidSegments = new ArrayList<>(10);
@@ -120,6 +127,10 @@ public class SegmentStatusManager {
         loadMetadataDetails = readTableStatusFile(
             CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()));
       }
+
+      if (readCommittedScope == null) {
+        readCommittedScope = new TableStatusReadCommittedScope(identifier, loadMetadataDetails);
+      }
       //just directly iterate Array
       for (LoadMetadataDetails segment : loadMetadataDetails) {
         if (SegmentStatus.SUCCESS == segment.getSegmentStatus()
@@ -129,7 +140,8 @@ public class SegmentStatusManager {
             || SegmentStatus.STREAMING_FINISH == segment.getSegmentStatus()) {
           // check for merged loads.
           if (null != segment.getMergedLoadName()) {
-            Segment seg = new Segment(segment.getMergedLoadName(), segment.getSegmentFile());
+            Segment seg = new Segment(segment.getMergedLoadName(), segment.getSegmentFile(),
+                readCommittedScope);
             if (!listOfValidSegments.contains(seg)) {
               listOfValidSegments.add(seg);
             }
@@ -142,24 +154,25 @@ public class SegmentStatusManager {
 
           if (SegmentStatus.MARKED_FOR_UPDATE == segment.getSegmentStatus()) {
 
-            listOfValidUpdatedSegments
-                .add(new Segment(segment.getLoadName(), segment.getSegmentFile()));
+            listOfValidUpdatedSegments.add(
+                new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope));
           }
           if (SegmentStatus.STREAMING == segment.getSegmentStatus()
               || SegmentStatus.STREAMING_FINISH == segment.getSegmentStatus()) {
-            listOfStreamSegments
-                .add(new Segment(segment.getLoadName(), segment.getSegmentFile()));
+            listOfStreamSegments.add(
+                new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope));
             continue;
           }
-          listOfValidSegments.add(new Segment(segment.getLoadName(), segment.getSegmentFile()));
+          listOfValidSegments.add(
+              new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope));
         } else if ((SegmentStatus.LOAD_FAILURE == segment.getSegmentStatus()
             || SegmentStatus.COMPACTED == segment.getSegmentStatus()
             || SegmentStatus.MARKED_FOR_DELETE == segment.getSegmentStatus())) {
           listOfInvalidSegments.add(new Segment(segment.getLoadName(), segment.getSegmentFile()));
         } else if (SegmentStatus.INSERT_IN_PROGRESS == segment.getSegmentStatus() ||
             SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == segment.getSegmentStatus()) {
-          listOfInProgressSegments
-              .add(new Segment(segment.getLoadName(), segment.getSegmentFile()));
+          listOfInProgressSegments.add(
+              new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope));
         }
       }
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 09692e6..91b35f5 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2440,7 +2440,7 @@ public final class CarbonUtil {
       }
       for (String value : values) {
         if (!value.equalsIgnoreCase("*")) {
-          Segment segment = Segment.toSegment(value);
+          Segment segment = Segment.toSegment(value, null);
           Float aFloatValue = Float.parseFloat(segment.getSegmentNo());
           if (aFloatValue < 0 || aFloatValue > Float.MAX_VALUE) {
             throw new InvalidConfigurationException(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
index 01aaffa..7e43610 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
@@ -37,7 +37,6 @@ import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
@@ -98,13 +97,11 @@ public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory {
    * getDataMaps Factory method Initializes the Min Max Data Map and returns.
    *
    * @param segment
-   * @param readCommittedScope
    * @return
    * @throws IOException
    */
   @Override
-  public List<CoarseGrainDataMap> getDataMaps(Segment segment,
-      ReadCommittedScope readCommittedScope)
+  public List<CoarseGrainDataMap> getDataMaps(Segment segment)
       throws IOException {
     List<CoarseGrainDataMap> dataMapList = new ArrayList<>();
     // Form a dataMap of Type MinMaxIndexDataMap.
@@ -144,10 +141,9 @@ public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory {
   @Override public void clear() {
   }
 
-  @Override public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable,
-      ReadCommittedScope readCommittedScope)
+  @Override public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
       throws IOException {
-    return getDataMaps(distributable.getSegment(), readCommittedScope);
+    return getDataMaps(distributable.getSegment());
   }
 
   @Override public void fireEvent(Event event) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
index f63656b..e8c740d 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
@@ -30,7 +30,6 @@ import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 
 /**
  * FG level of lucene DataMap
@@ -44,8 +43,7 @@ public class LuceneCoarseGrainDataMapFactory extends LuceneDataMapFactoryBase<Co
    * Get the datamap for segmentid
    */
   @Override
-  public List<CoarseGrainDataMap> getDataMaps(Segment segment,
-      ReadCommittedScope readCommittedScope) throws IOException {
+  public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException {
     List<CoarseGrainDataMap> lstDataMap = new ArrayList<>();
     CoarseGrainDataMap dataMap = new LuceneCoarseGrainDataMap(analyzer);
     try {
@@ -64,10 +62,9 @@ public class LuceneCoarseGrainDataMapFactory extends LuceneDataMapFactoryBase<Co
    * Get datamaps for distributable object.
    */
   @Override
-  public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable,
-      ReadCommittedScope readCommittedScope)
+  public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
       throws IOException {
-    return getDataMaps(distributable.getSegment(), readCommittedScope);
+    return getDataMaps(distributable.getSegment());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
index 151e674..9026fbc 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
@@ -28,7 +28,6 @@ import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainDataMap;
 import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 
 /**
  * CG level of lucene DataMap
@@ -39,8 +38,7 @@ public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<Fine
   /**
    * Get the datamap for segmentid
    */
-  @Override public List<FineGrainDataMap> getDataMaps(Segment segment,
-      ReadCommittedScope readCommittedScope) throws IOException {
+  @Override public List<FineGrainDataMap> getDataMaps(Segment segment) throws IOException {
     List<FineGrainDataMap> lstDataMap = new ArrayList<>();
     FineGrainDataMap dataMap = new LuceneFineGrainDataMap(analyzer);
     try {
@@ -59,8 +57,7 @@ public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<Fine
    * Get datamaps for distributable object.
    */
   @Override
-  public List<FineGrainDataMap> getDataMaps(DataMapDistributable distributable,
-      ReadCommittedScope readCommittedScope)
+  public List<FineGrainDataMap> getDataMaps(DataMapDistributable distributable)
       throws IOException {
     List<FineGrainDataMap> lstDataMap = new ArrayList<>();
     FineGrainDataMap dataMap = new LuceneFineGrainDataMap(analyzer);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index 214e534..3dac3bb 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -131,7 +131,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
       if (FileFactory.isFileExist(segmentDir, fileType)) {
         // if external table Segments are found, add it to the List
         List<Segment> externalTableSegments = new ArrayList<Segment>();
-        Segment seg = new Segment("null", null);
+        Segment seg = new Segment("null", null, readCommittedScope);
         externalTableSegments.add(seg);
 
         Map<String, String> indexFiles =
@@ -142,8 +142,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
         }
         // do block filtering and get split
         List<InputSplit> splits =
-            getSplits(job, filterInterface, externalTableSegments, null, partitionInfo, null,
-                readCommittedScope);
+            getSplits(job, filterInterface, externalTableSegments, null, partitionInfo, null);
 
         return splits;
       }
@@ -161,7 +160,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
    */
   private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterResolver,
       List<Segment> validSegments, BitSet matchedPartitions, PartitionInfo partitionInfo,
-      List<Integer> oldPartitionIdList, ReadCommittedScope readCommittedScope) throws IOException {
+      List<Integer> oldPartitionIdList) throws IOException {
 
     numSegments = validSegments.size();
     List<InputSplit> result = new LinkedList<InputSplit>();
@@ -175,7 +174,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
     // for each segment fetch blocks matching filter in Driver BTree
     List<CarbonInputSplit> dataBlocksOfSegment =
         getDataBlocksOfSegment(job, carbonTable, filterResolver, matchedPartitions,
-            validSegments, partitionInfo, oldPartitionIdList, readCommittedScope);
+            validSegments, partitionInfo, oldPartitionIdList);
     numBlocks = dataBlocksOfSegment.size();
     for (CarbonInputSplit inputSplit : dataBlocksOfSegment) {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index b1d7603..8016d90 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -42,7 +42,6 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.mutate.UpdateVO;
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.SingleTableProvider;
 import org.apache.carbondata.core.scan.filter.TableProvider;
@@ -238,8 +237,8 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     String segmentNumbersFromProperty = CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbName + "." + tbName, "*");
     if (!segmentNumbersFromProperty.trim().equals("*")) {
-      CarbonInputFormat
-          .setSegmentsToAccess(conf, Segment.toSegmentList(segmentNumbersFromProperty.split(",")));
+      CarbonInputFormat.setSegmentsToAccess(conf,
+          Segment.toSegmentList(segmentNumbersFromProperty.split(","), null));
     }
   }
 
@@ -249,7 +248,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   public static void setQuerySegment(Configuration conf, String segmentList) {
     if (!segmentList.trim().equals("*")) {
       CarbonInputFormat
-          .setSegmentsToAccess(conf, Segment.toSegmentList(segmentList.split(",")));
+          .setSegmentsToAccess(conf, Segment.toSegmentList(segmentList.split(","), null));
     }
   }
 
@@ -335,10 +334,9 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   /**
    * get data blocks of given segment
    */
-  protected List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job,
-      CarbonTable carbonTable, FilterResolverIntf resolver,
-      BitSet matchedPartitions, List<Segment> segmentIds, PartitionInfo partitionInfo,
-      List<Integer> oldPartitionIdList, ReadCommittedScope readCommittedScope) throws IOException {
+  protected List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job, CarbonTable carbonTable,
+      FilterResolverIntf resolver, BitSet matchedPartitions, List<Segment> segmentIds,
+      PartitionInfo partitionInfo, List<Integer> oldPartitionIdList) throws IOException {
 
     QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
     QueryStatistic statistic = new QueryStatistic();
@@ -362,7 +360,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       // Apply expression on the blocklets.
       prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
     } else {
-      prunedBlocklets = dataMapExprWrapper.prune(segmentIds, partitionsToPrune, readCommittedScope);
+      prunedBlocklets = dataMapExprWrapper.prune(segmentIds, partitionsToPrune);
     }
 
     List<CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 0add279..6f65d7d 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -179,7 +179,7 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
           context.getConfiguration().get(CarbonTableOutputFormat.UPADTE_TIMESTAMP, null);
       String segmentsToBeDeleted =
           context.getConfiguration().get(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, "");
-      List<Segment> segmentDeleteList = Segment.toSegmentList(segmentsToBeDeleted.split(","));
+      List<Segment> segmentDeleteList = Segment.toSegmentList(segmentsToBeDeleted.split(","), null);
       Set<Segment> segmentSet = new HashSet<>(
           new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier())
               .getValidAndInvalidSegments().getValidSegments());
@@ -230,7 +230,8 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
       newMetaEntry.setUpdateStatusFileName(uniqueId);
       // Commit the removed partitions in carbon store.
       CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false, uuid,
-          Segment.toSegmentList(tobeDeletedSegs), Segment.toSegmentList(tobeUpdatedSegs));
+          Segment.toSegmentList(tobeDeletedSegs, null),
+          Segment.toSegmentList(tobeUpdatedSegs, null));
       return uniqueId;
     }
     return null;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 06ada3d..7b5b8d1 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -149,8 +149,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     List<Segment> streamSegments = null;
     // get all valid segments and set them into the configuration
     SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
-    SegmentStatusManager.ValidAndInvalidSegmentsInfo segments =
-        segmentStatusManager.getValidAndInvalidSegments(loadMetadataDetails);
+    SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager
+        .getValidAndInvalidSegments(loadMetadataDetails, this.readCommittedScope);
     // to check whether only streaming segments access is enabled or not,
     // if access streaming segment is true then data will be read from streaming segments
     boolean accessStreamingSegments = getAccessStreamingSegments(job.getConfiguration());
@@ -158,12 +158,12 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
       if (!accessStreamingSegments) {
         List<Segment> validSegments = segments.getValidSegments();
         streamSegments = segments.getStreamSegments();
-        streamSegments = getFilteredSegment(job, streamSegments, true);
+        streamSegments = getFilteredSegment(job, streamSegments, true, readCommittedScope);
         if (validSegments.size() == 0) {
           return getSplitsOfStreaming(job, identifier, streamSegments);
         }
         List<Segment> filteredSegmentToAccess =
-            getFilteredSegment(job, segments.getValidSegments(), true);
+            getFilteredSegment(job, segments.getValidSegments(), true, readCommittedScope);
         if (filteredSegmentToAccess.size() == 0) {
           return getSplitsOfStreaming(job, identifier, streamSegments);
         } else {
@@ -171,7 +171,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
         }
       } else {
         List<Segment> filteredNormalSegments =
-            getFilteredNormalSegments(job, segments.getValidSegments(), getSegmentsToAccess(job));
+            getFilteredNormalSegments(job, segments.getValidSegments(),
+                getSegmentsToAccess(job, readCommittedScope));
         streamSegments = segments.getStreamSegments();
         if (filteredNormalSegments.size() == 0) {
           return getSplitsOfStreaming(job, identifier, streamSegments);
@@ -195,7 +196,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     validAndInProgressSegments.addAll(segments.getListOfInProgressSegments());
     // get updated filtered list
     List<Segment> filteredSegmentToAccess =
-        getFilteredSegment(job, new ArrayList<>(validAndInProgressSegments), false);
+        getFilteredSegment(job, new ArrayList<>(validAndInProgressSegments), false,
+            readCommittedScope);
     // Clean the updated segments from memory if the update happens on segments
     List<Segment> toBeCleanedSegments = new ArrayList<>();
     for (SegmentUpdateDetails segmentUpdateDetail : updateStatusManager
@@ -291,8 +293,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
    * `INPUT_SEGMENT_NUMBERS` in job configuration
    */
   private List<Segment> getFilteredSegment(JobContext job, List<Segment> validSegments,
-      boolean validationRequired) {
-    Segment[] segmentsToAccess = getSegmentsToAccess(job);
+      boolean validationRequired, ReadCommittedScope readCommittedScope) {
+    Segment[] segmentsToAccess = getSegmentsToAccess(job, readCommittedScope);
     List<Segment> segmentToAccessSet =
         new ArrayList<>(new HashSet<>(Arrays.asList(segmentsToAccess)));
     List<Segment> filteredSegmentToAccess = new ArrayList<>();
@@ -418,16 +420,16 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     List<Segment> invalidSegments = new ArrayList<>();
     List<UpdateVO> invalidTimestampsList = new ArrayList<>();
 
-    List<Segment> segmentList = new ArrayList<>();
-    segmentList.add(new Segment(targetSegment, null));
-    setSegmentsToAccess(job.getConfiguration(), segmentList);
-
     try {
       carbonTable = getOrCreateCarbonTable(job.getConfiguration());
       ReadCommittedScope readCommittedScope =
           getReadCommitted(job, carbonTable.getAbsoluteTableIdentifier());
       this.readCommittedScope = readCommittedScope;
 
+      List<Segment> segmentList = new ArrayList<>();
+      segmentList.add(new Segment(targetSegment, null, readCommittedScope));
+      setSegmentsToAccess(job.getConfiguration(), segmentList);
+
       // process and resolve the expression
       Expression filter = getFilterPredicates(job.getConfiguration());
       CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
@@ -522,7 +524,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     // for each segment fetch blocks matching filter in Driver BTree
     List<org.apache.carbondata.hadoop.CarbonInputSplit> dataBlocksOfSegment =
         getDataBlocksOfSegment(job, carbonTable, filterResolver, matchedPartitions,
-            validSegments, partitionInfo, oldPartitionIdList, readCommittedScope);
+            validSegments, partitionInfo, oldPartitionIdList);
     numBlocks = dataBlocksOfSegment.size();
     for (org.apache.carbondata.hadoop.CarbonInputSplit inputSplit : dataBlocksOfSegment) {
 
@@ -557,12 +559,12 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
   /**
    * return valid segment to access
    */
-  public Segment[] getSegmentsToAccess(JobContext job) {
+  public Segment[] getSegmentsToAccess(JobContext job, ReadCommittedScope readCommittedScope) {
     String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
     if (segmentString.trim().isEmpty()) {
       return new Segment[0];
     }
-    List<Segment> segments = Segment.toSegmentList(segmentString.split(","));
+    List<Segment> segments = Segment.toSegmentList(segmentString.split(","), readCommittedScope);
     return segments.toArray(new Segment[segments.size()]);
   }
 
@@ -580,15 +582,17 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(
         table, loadMetadataDetails);
     SegmentStatusManager.ValidAndInvalidSegmentsInfo allSegments =
-        new SegmentStatusManager(identifier).getValidAndInvalidSegments(loadMetadataDetails);
+        new SegmentStatusManager(identifier)
+            .getValidAndInvalidSegments(loadMetadataDetails, readCommittedScope);
     Map<String, Long> blockRowCountMapping = new HashMap<>();
     Map<String, Long> segmentAndBlockCountMapping = new HashMap<>();
 
     // TODO: currently only batch segment is supported, add support for streaming table
-    List<Segment> filteredSegment = getFilteredSegment(job, allSegments.getValidSegments(), false);
+    List<Segment> filteredSegment =
+        getFilteredSegment(job, allSegments.getValidSegments(), false, readCommittedScope);
 
     List<ExtendedBlocklet> blocklets =
-        blockletMap.prune(filteredSegment, null, partitions, readCommittedScope);
+        blockletMap.prune(filteredSegment, null, partitions);
     for (ExtendedBlocklet blocklet : blocklets) {
       String blockName = blocklet.getPath();
       blockName = CarbonTablePath.getCarbonDataFileName(blockName);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
index deeeabe..213c5a5 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
@@ -30,8 +30,6 @@ import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
-import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
 
@@ -108,11 +106,8 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
         DataMapDistributableWrapper distributable = (DataMapDistributableWrapper) inputSplit;
         TableDataMap dataMap = DataMapStoreManager.getInstance()
             .getDataMap(table, distributable.getDistributable().getDataMapSchema());
-        ReadCommittedScope readCommittedScope =
-            new TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier());
         List<ExtendedBlocklet> blocklets = dataMap.prune(distributable.getDistributable(),
-            dataMapExprWrapper.getFilterResolverIntf(distributable.getUniqueId()), partitions,
-            readCommittedScope);
+            dataMapExprWrapper.getFilterResolverIntf(distributable.getUniqueId()), partitions);
         for (ExtendedBlocklet blocklet : blocklets) {
           blocklet.setDataMapUniqueId(distributable.getUniqueId());
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index 6087727..ae21416 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -70,7 +70,7 @@ class CGDataMapFactory extends CoarseGrainDataMapFactory {
   /**
    * Get the datamap for segmentid
    */
-  override def getDataMaps(segment: Segment, readCommitted: ReadCommittedScope): java.util.List[CoarseGrainDataMap] = {
+  override def getDataMaps(segment: Segment): java.util.List[CoarseGrainDataMap] = {
     val file = FileFactory.getCarbonFile(
       CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
 
@@ -88,7 +88,7 @@ class CGDataMapFactory extends CoarseGrainDataMapFactory {
   /**
    * Get datamaps for distributable object.
    */
-  override def getDataMaps(distributable: DataMapDistributable, readCommitted: ReadCommittedScope): java.util.List[CoarseGrainDataMap] = {
+  override def getDataMaps(distributable: DataMapDistributable): java.util.List[CoarseGrainDataMap] = {
     val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
     val dataMap: CoarseGrainDataMap = new CGDataMap()
     dataMap.init(new DataMapModel(mapDistributable.getFilePath))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index 5fa5209..329c888 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -55,9 +55,9 @@ class C2DataMapFactory() extends CoarseGrainDataMapFactory {
 
   override def clear(): Unit = {}
 
-  override def getDataMaps(distributable: DataMapDistributable, readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = ???
+  override def getDataMaps(distributable: DataMapDistributable): util.List[CoarseGrainDataMap] = ???
 
-  override def getDataMaps(segment: Segment, readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = ???
+  override def getDataMaps(segment: Segment): util.List[CoarseGrainDataMap] = ???
 
   override def createWriter(segment: Segment, dataWritePath: String): DataMapWriter =
     DataMapWriterSuite.dataMapWriterC2Mock(identifier, segment, dataWritePath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index 551f9e1..d33191c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -72,7 +72,7 @@ class FGDataMapFactory extends FineGrainDataMapFactory {
   /**
    * Get the datamap for segmentid
    */
-  override def getDataMaps(segment: Segment, readCommitted: ReadCommittedScope): java.util.List[FineGrainDataMap] = {
+  override def getDataMaps(segment: Segment): java.util.List[FineGrainDataMap] = {
     val file = FileFactory
       .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
 
@@ -89,7 +89,7 @@ class FGDataMapFactory extends FineGrainDataMapFactory {
   /**
    * Get datamap for distributable object.
    */
-  override def getDataMaps(distributable: DataMapDistributable, readCommitted: ReadCommittedScope): java.util.List[FineGrainDataMap]= {
+  override def getDataMaps(distributable: DataMapDistributable): java.util.List[FineGrainDataMap]= {
     val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
     val dataMap: FineGrainDataMap = new FGDataMap()
     dataMap.init(new DataMapModel(mapDistributable.getFilePath))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
index 280c20d..e19c4b7 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
@@ -185,13 +185,11 @@ class TestDataMap() extends CoarseGrainDataMapFactory {
 
   override def clear(): Unit = {}
 
-  override def getDataMaps(distributable: DataMapDistributable,
-      readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = {
+  override def getDataMaps(distributable: DataMapDistributable): util.List[CoarseGrainDataMap] = {
     ???
   }
 
-  override def getDataMaps(segment: Segment,
-      readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = {
+  override def getDataMaps(segment: Segment): util.List[CoarseGrainDataMap] = {
     ???
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
index 5c9709c..55e9bac 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
@@ -293,9 +293,9 @@ class WaitingDataMap() extends CoarseGrainDataMapFactory {
 
   override def clear(): Unit = {}
 
-  override def getDataMaps(distributable: DataMapDistributable, readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = ???
+  override def getDataMaps(distributable: DataMapDistributable): util.List[CoarseGrainDataMap] = ???
 
-  override def getDataMaps(segment: Segment, readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = ???
+  override def getDataMaps(segment: Segment): util.List[CoarseGrainDataMap] = ???
 
   override def createWriter(segment: Segment, writeDirectoryPath: String): DataMapWriter = {
     new DataMapWriter(identifier, segment, writeDirectoryPath) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index f69e237..1f3decc 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -203,7 +203,7 @@ class StreamHandoffRDD[K, V](
     val job = Job.getInstance(FileFactory.getConfiguration)
     val inputFormat = new CarbonTableInputFormat[Array[Object]]()
     val segmentList = new util.ArrayList[Segment](1)
-    segmentList.add(Segment.toSegment(handOffSegmentId))
+    segmentList.add(Segment.toSegment(handOffSegmentId, null))
     val splits = inputFormat.getSplitsOfStreaming(
       job,
       carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
index 22a0112..15e82fe 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.CarbonInputSplit
@@ -211,9 +212,11 @@ object PartitionUtils {
         identifier.getTablePath,
         alterPartitionModel.segmentId,
         alterPartitionModel.carbonLoadModel.getFactTimeStamp.toString)
-      val segmentFiles = Seq(new Segment(alterPartitionModel.segmentId, file)).asJava
+      val segmentFiles = Seq(new Segment(alterPartitionModel.segmentId, file, null))
+        .asJava
       if (!CarbonUpdateUtil.updateTableMetadataStatus(
-        new util.HashSet[Segment](Seq(new Segment(alterPartitionModel.segmentId, null)).asJava),
+        new util.HashSet[Segment](Seq(new Segment(alterPartitionModel.segmentId,
+          null, null)).asJava),
         carbonTable,
         alterPartitionModel.carbonLoadModel.getFactTimeStamp.toString,
         true,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
index dd2b097..1487277 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
@@ -52,7 +52,7 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
     // therefore the segment file name should be loadName#segmentFileName.segment
     val segments = loadMetaDataDetails.asScala.map {
       loadDetail =>
-        new Segment(loadDetail.getLoadName, loadDetail.getSegmentFile).toString
+        new Segment(loadDetail.getLoadName, loadDetail.getSegmentFile, null).toString
     }
 
     if (segments.nonEmpty) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 91e2335..626a172 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCa
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.readcommitter.{ReadCommittedScope, TableStatusReadCommittedScope}
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events._

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index c86fe5a..86a235a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -451,7 +451,7 @@ object LoadPostAggregateListener extends OperationEventListener {
               val segment = if (currentSegmentFile != null) {
                 new Segment(carbonLoadModel.getSegmentId, currentSegmentFile.toString)
               } else {
-                Segment.toSegment(carbonLoadModel.getSegmentId)
+                Segment.toSegment(carbonLoadModel.getSegmentId, null)
               }
               (TableIdentifier(table.getTableName, Some(table.getDatabaseName)), segment.toString)
             }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
index 788a820..942a21b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
@@ -216,15 +216,16 @@ class SparkCarbonFileFormat extends FileFormat
 
         val model = format.createQueryModel(split, attemptContext)
 
-        var segments = new java.util.ArrayList[Segment]()
-        val seg = new Segment("null", null)
-        segments.add(seg)
         var partition : java.util.List[PartitionSpec] = new java.util.ArrayList[PartitionSpec]()
 
-
         val segmentPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), "null")
         val readCommittedScope = new LatestFilesReadCommittedScope(
           identifier.getTablePath + "/Fact/Part0/Segment_null/")
+
+        var segments = new java.util.ArrayList[Segment]()
+        val seg = new Segment("null", null, readCommittedScope)
+        segments.add(seg)
+
         val indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(segmentPath)
         if (indexFiles.size() == 0) {
           throw new SparkException("Index file not present to read the carbondata file")
@@ -236,7 +237,7 @@ class SparkCarbonFileFormat extends FileFormat
           .choose(tab, model.getFilterExpressionResolverTree)
 
         // TODO : handle the partition for CarbonFileLevelFormat
-        val prunedBlocklets = dataMapExprWrapper.prune(segments, null, readCommittedScope)
+        val prunedBlocklets = dataMapExprWrapper.prune(segments, null)
 
         val detailInfo = prunedBlocklets.get(0).getDetailInfo
         detailInfo.readColumnSchema(detailInfo.getColumnSchemaBinary)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c58eb43b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index e565e68..2290941 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -528,7 +528,7 @@ object CarbonFilters {
         // In case of compaction multiple segments will be passed as CARBON_INPUT_SEGMENTS.
         // Therefore partitionSpec will be extracted from all segments.
         val segments = segmentNumbersFromProperty.split(",").flatMap { a =>
-          val segment = Segment.toSegment(a)
+          val segment = Segment.toSegment(a, null)
           val segmentFile = new SegmentFileStore(table.getTablePath, segment.getSegmentFileName)
           segmentFile.getPartitionSpecs.asScala
         }