You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/04/05 13:27:47 UTC

[3/3] carbondata git commit: [CARBONDATA-2313] Support unmanaged carbon table read and write

[CARBONDATA-2313] Support unmanaged carbon table read and write

Carbon SDK writer will take the input data and write back the carbondata
and carbonindex files in the path specified.
This output doesn't have metadata folder. So, it is called unmanaged
carbon table.

This can be read by creating external table in the location of sdk
writer output path.

Please refer, TestUnmanagedCarbonTable.scla for the example scenario.

Load, insert, compaction, alter, IUD etc features are blocked for
unmanaged table

Co-authored-by: sounakr <so...@gmail.com>

This closes #2131


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/280a4003
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/280a4003
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/280a4003

Branch: refs/heads/master
Commit: 280a4003a6b68b286beac6dc31ff84772d5b5b84
Parents: 5508460
Author: ajantha-bhat <aj...@gmail.com>
Authored: Thu Apr 5 15:24:36 2018 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Thu Apr 5 18:56:01 2018 +0530

----------------------------------------------------------------------
 .../carbondata/core/datamap/TableDataMap.java   |  42 ++-
 .../core/datamap/dev/DataMapFactory.java        |   6 +-
 .../datamap/dev/expr/AndDataMapExprWrapper.java |  11 +-
 .../datamap/dev/expr/DataMapExprWrapper.java    |   4 +-
 .../dev/expr/DataMapExprWrapperImpl.java        |   8 +-
 .../datamap/dev/expr/OrDataMapExprWrapper.java  |  11 +-
 .../core/indexstore/BlockletDetailsFetcher.java |  13 +-
 .../indexstore/SegmentPropertiesFetcher.java    |   5 +-
 .../blockletindex/BlockletDataMapFactory.java   |  52 +--
 .../core/metadata/schema/SchemaReader.java      |   9 +-
 .../core/metadata/schema/table/CarbonTable.java |  17 +
 .../schema/table/CarbonTableBuilder.java        |  15 +-
 .../core/metadata/schema/table/TableInfo.java   |  17 +
 .../LatestFilesReadCommittedScope.java          | 155 ++++++++
 .../ReadCommittedIndexFileSnapShot.java         |  46 +++
 .../core/readcommitter/ReadCommittedScope.java  |  46 +++
 .../TableStatusReadCommittedScope.java          |  79 ++++
 .../executor/impl/AbstractQueryExecutor.java    |   3 +-
 .../SegmentUpdateStatusManager.java             |   2 +-
 .../apache/carbondata/core/util/CarbonUtil.java |  17 +-
 .../examples/MinMaxIndexDataMapFactory.java     |   9 +-
 .../lucene/LuceneCoarseGrainDataMapFactory.java |   9 +-
 .../lucene/LuceneFineGrainDataMapFactory.java   |  10 +-
 .../hadoop/api/CarbonFileInputFormat.java       |  13 +-
 .../hadoop/api/CarbonInputFormat.java           |  10 +-
 .../hadoop/api/CarbonTableInputFormat.java      |  45 ++-
 .../hadoop/api/DistributableDataMapFormat.java  |  15 +-
 .../createTable/TestUnmanagedCarbonTable.scala  | 369 +++++++++++++++++++
 .../testsuite/datamap/CGDataMapTestCase.scala   |   6 +-
 .../testsuite/datamap/DataMapWriterSuite.scala  |   5 +-
 .../testsuite/datamap/FGDataMapTestCase.scala   |   8 +-
 .../testsuite/datamap/TestDataMapStatus.scala   |  11 +-
 .../TestInsertAndOtherCommandConcurrent.scala   |   5 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   4 +-
 .../org/apache/spark/util/PartitionUtils.scala  |   2 +-
 .../carbondata/spark/util/CarbonSparkUtil.scala |   8 +-
 .../org/apache/spark/sql/CarbonCountStar.scala  |   2 +
 .../org/apache/spark/sql/CarbonSource.scala     |  15 +-
 .../datamap/CarbonCreateDataMapCommand.scala    |   5 +
 .../CarbonAlterTableCompactionCommand.scala     |   4 +
 .../CarbonDeleteLoadByIdCommand.scala           |   5 +
 .../CarbonDeleteLoadByLoadDateCommand.scala     |   5 +
 .../management/CarbonLoadDataCommand.scala      |   3 +
 .../management/CarbonShowLoadsCommand.scala     |   4 +
 .../CarbonProjectForDeleteCommand.scala         |   5 +
 .../CarbonProjectForUpdateCommand.scala         |   4 +
 .../table/CarbonCreateTableCommand.scala        |   4 +
 .../datasources/SparkCarbonFileFormat.scala     |   5 +-
 .../sql/execution/strategy/DDLStrategy.scala    |  12 +
 .../spark/sql/hive/CarbonFileMetastore.scala    |  87 ++++-
 .../spark/sql/parser/CarbonSparkSqlParser.scala |  15 +-
 .../loading/CarbonDataLoadConfiguration.java    |  10 +
 .../loading/DataLoadProcessBuilder.java         |   2 +
 .../loading/model/CarbonLoadModel.java          |  17 +
 .../loading/model/CarbonLoadModelBuilder.java   |   5 +-
 .../processing/loading/model/LoadOption.java    |   5 +-
 .../store/CarbonFactDataHandlerModel.java       |  10 +-
 .../carbondata/sdk/file/CarbonReader.java       |   4 +-
 .../sdk/file/CarbonReaderBuilder.java           |   6 +-
 .../sdk/file/CarbonWriterBuilder.java           |  36 +-
 .../sdk/file/CSVUnManagedCarbonWriterTest.java  | 277 ++++++++++++++
 .../carbondata/sdk/file/CarbonReaderTest.java   |   2 +-
 62 files changed, 1468 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 81571ce..6689e3b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.events.Event;
 import org.apache.carbondata.events.OperationContext;
@@ -79,26 +80,30 @@ public final class TableDataMap extends OperationEventListener {
    *
    * @param segments
    * @param filterExp
+   * @param readCommittedScope
    * @return
    */
   public List<ExtendedBlocklet> prune(List<Segment> segments, FilterResolverIntf filterExp,
-      List<PartitionSpec> partitions) throws IOException {
+      List<PartitionSpec> partitions, ReadCommittedScope readCommittedScope) throws IOException {
     List<ExtendedBlocklet> blocklets = new ArrayList<>();
     SegmentProperties segmentProperties;
     for (Segment segment : segments) {
       List<Blocklet> pruneBlocklets = new ArrayList<>();
       // if filter is not passed then return all the blocklets
       if (filterExp == null) {
-        pruneBlocklets = blockletDetailsFetcher.getAllBlocklets(segment, partitions);
+        pruneBlocklets = blockletDetailsFetcher.getAllBlocklets(segment, partitions,
+            readCommittedScope);
       } else {
-        List<DataMap> dataMaps = dataMapFactory.getDataMaps(segment);
-        segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment);
+        List<DataMap> dataMaps = dataMapFactory.getDataMaps(segment, readCommittedScope);
+        segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment,
+            readCommittedScope);
         for (DataMap dataMap : dataMaps) {
           pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties, partitions));
         }
       }
-      blocklets.addAll(addSegmentId(blockletDetailsFetcher
-          .getExtendedBlocklets(pruneBlocklets, segment), segment.getSegmentNo()));
+      blocklets.addAll(addSegmentId(
+          blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment, readCommittedScope),
+          segment.getSegmentNo()));
     }
     return blocklets;
   }
@@ -138,19 +143,20 @@ public final class TableDataMap extends OperationEventListener {
    *
    * @param distributable
    * @param filterExp
+   * @param readCommittedScope
    * @return
    */
   public List<ExtendedBlocklet> prune(DataMapDistributable distributable,
-      FilterResolverIntf filterExp, List<PartitionSpec> partitions) throws IOException {
+      FilterResolverIntf filterExp, List<PartitionSpec> partitions,
+      ReadCommittedScope readCommittedScope) throws IOException {
     List<ExtendedBlocklet> detailedBlocklets = new ArrayList<>();
     List<Blocklet> blocklets = new ArrayList<>();
-    List<DataMap> dataMaps = dataMapFactory.getDataMaps(distributable);
+    List<DataMap> dataMaps = dataMapFactory.getDataMaps(distributable, readCommittedScope);
     for (DataMap dataMap : dataMaps) {
-      blocklets.addAll(
-          dataMap.prune(
-              filterExp,
-              segmentPropertiesFetcher.getSegmentProperties(distributable.getSegment()),
-              partitions));
+      blocklets.addAll(dataMap.prune(filterExp,
+          segmentPropertiesFetcher.getSegmentProperties(distributable.getSegment(),
+              readCommittedScope),
+          partitions));
     }
     BlockletSerializer serializer = new BlockletSerializer();
     String writePath =
@@ -160,8 +166,8 @@ public final class TableDataMap extends OperationEventListener {
       FileFactory.mkdirs(writePath, FileFactory.getFileType(writePath));
     }
     for (Blocklet blocklet : blocklets) {
-      ExtendedBlocklet detailedBlocklet =
-          blockletDetailsFetcher.getExtendedBlocklet(blocklet, distributable.getSegment());
+      ExtendedBlocklet detailedBlocklet = blockletDetailsFetcher
+          .getExtendedBlocklet(blocklet, distributable.getSegment(), readCommittedScope);
       if (dataMapFactory.getDataMapType() == DataMapLevel.FG) {
         String blockletwritePath =
             writePath + CarbonCommonConstants.FILE_SEPARATOR + System.nanoTime();
@@ -208,14 +214,16 @@ public final class TableDataMap extends OperationEventListener {
    *
    * @param segments
    * @param filterExp
+   * @param readCommittedScope
    * @return
    * @throws IOException
    */
-  public List<Segment> pruneSegments(List<Segment> segments, FilterResolverIntf filterExp)
+  public List<Segment> pruneSegments(List<Segment> segments, FilterResolverIntf filterExp,
+      ReadCommittedScope readCommittedScope)
       throws IOException {
     List<Segment> prunedSegments = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     for (Segment segment : segments) {
-      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segment);
+      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segment, readCommittedScope);
       for (DataMap dataMap : dataMaps) {
         if (dataMap.isScanRequired(filterExp)) {
           // If any one task in a given segment contains the data that means the segment need to

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index 48038b7..d27b255 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.events.Event;
 
 /**
@@ -47,12 +48,13 @@ public interface DataMapFactory<T extends DataMap> {
   /**
    * Get the datamap for segmentid
    */
-  List<T> getDataMaps(Segment segment) throws IOException;
+  List<T> getDataMaps(Segment segment, ReadCommittedScope readCommittedScope) throws IOException;
 
   /**
    * Get datamaps for distributable object.
    */
-  List<T> getDataMaps(DataMapDistributable distributable) throws IOException;
+  List<T> getDataMaps(DataMapDistributable distributable, ReadCommittedScope readCommittedScope)
+      throws IOException;
 
   /**
    * Get all distributable objects of a segmentid

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
index 74469d7..850e08a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 
 /**
@@ -44,10 +45,12 @@ public class AndDataMapExprWrapper implements DataMapExprWrapper {
     this.resolverIntf = resolverIntf;
   }
 
-  @Override public List<ExtendedBlocklet> prune(List<Segment> segments,
-      List<PartitionSpec> partitionsToPrune) throws IOException {
-    List<ExtendedBlocklet> leftPrune = left.prune(segments, partitionsToPrune);
-    List<ExtendedBlocklet> rightPrune = right.prune(segments, partitionsToPrune);
+  @Override
+  public List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune,
+      ReadCommittedScope readCommittedScope) throws IOException {
+    List<ExtendedBlocklet> leftPrune = left.prune(segments, partitionsToPrune, readCommittedScope);
+    List<ExtendedBlocklet> rightPrune =
+        right.prune(segments, partitionsToPrune, readCommittedScope);
     List<ExtendedBlocklet> andBlocklets = new ArrayList<>();
     for (ExtendedBlocklet blocklet : leftPrune) {
       if (rightPrune.contains(blocklet)) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
index 14cfc33..b5fb173 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 
 /**
@@ -36,7 +37,8 @@ public interface DataMapExprWrapper extends Serializable {
    * It get the blocklets from each leaf node datamap and apply expressions on the blocklets
    * using list of segments, it is used in case on non distributable datamap.
    */
-  List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune)
+  List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune,
+      ReadCommittedScope readCommittedScope)
       throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
index c6b011c..ffd4f80 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 
 public class DataMapExprWrapperImpl implements DataMapExprWrapper {
@@ -45,9 +46,10 @@ public class DataMapExprWrapperImpl implements DataMapExprWrapper {
     this.uniqueId = UUID.randomUUID().toString();
   }
 
-  @Override public List<ExtendedBlocklet> prune(List<Segment> segments,
-      List<PartitionSpec> partitionsToPrune) throws IOException {
-    return dataMap.prune(segments, expression, partitionsToPrune);
+  @Override
+  public List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune,
+      ReadCommittedScope readCommittedScope) throws IOException {
+    return dataMap.prune(segments, expression, partitionsToPrune, readCommittedScope);
   }
 
   @Override public List<ExtendedBlocklet> pruneBlocklets(List<ExtendedBlocklet> blocklets)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
index 37cd5dd..0667d0a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 
 /**
@@ -46,10 +47,12 @@ public class OrDataMapExprWrapper implements DataMapExprWrapper {
     this.resolverIntf = resolverIntf;
   }
 
-  @Override public List<ExtendedBlocklet> prune(List<Segment> segments,
-      List<PartitionSpec> partitionsToPrune) throws IOException {
-    List<ExtendedBlocklet> leftPrune = left.prune(segments, partitionsToPrune);
-    List<ExtendedBlocklet> rightPrune = right.prune(segments, partitionsToPrune);
+  @Override
+  public List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune,
+      ReadCommittedScope readCommittedScope) throws IOException {
+    List<ExtendedBlocklet> leftPrune = left.prune(segments, partitionsToPrune, readCommittedScope);
+    List<ExtendedBlocklet> rightPrune =
+        right.prune(segments, partitionsToPrune, readCommittedScope);
     Set<ExtendedBlocklet> andBlocklets = new HashSet<>();
     andBlocklets.addAll(leftPrune);
     andBlocklets.addAll(rightPrune);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
index 58c11db..cf283f2 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 
 /**
  * Fetches the detailed blocklet which has more information to execute the query
@@ -31,10 +32,12 @@ public interface BlockletDetailsFetcher {
    *
    * @param blocklets
    * @param segment
+   * @param readCommittedScope
    * @return
    * @throws IOException
    */
-  List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, Segment segment)
+  List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, Segment segment,
+      ReadCommittedScope readCommittedScope)
       throws IOException;
 
   /**
@@ -42,17 +45,21 @@ public interface BlockletDetailsFetcher {
    *
    * @param blocklet
    * @param segment
+   * @param readCommittedScope
    * @return
    * @throws IOException
    */
-  ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment) throws IOException;
+  ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment,
+      ReadCommittedScope readCommittedScope) throws IOException;
 
   /**
    * Get all the blocklets in a segment
    *
    * @param segment
+   * @param readCommittedScope
    * @return
    */
-  List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions)
+  List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions,
+      ReadCommittedScope readCommittedScope)
       throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
index 6f94be5..d464083 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 
 /**
  * Fetches the detailed segmentProperties which has more information to execute the query
@@ -30,8 +31,10 @@ public interface SegmentPropertiesFetcher {
   /**
    * get the Segment properties based on the SegmentID.
    * @param segmentId
+   * @param readCommittedScope
    * @return
    * @throws IOException
    */
-  SegmentProperties getSegmentProperties(Segment segment) throws IOException;
+  SegmentProperties getSegmentProperties(Segment segment, ReadCommittedScope readCommittedScope)
+      throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 9674958..2425c4c 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -44,6 +44,7 @@ import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
@@ -82,29 +83,23 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
     throw new UnsupportedOperationException("not implemented");
   }
 
-  @Override
-  public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException {
+  @Override public List<CoarseGrainDataMap> getDataMaps(Segment segment,
+      ReadCommittedScope readCommittedScope) throws IOException {
     List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
-        getTableBlockIndexUniqueIdentifiers(segment);
+        getTableBlockIndexUniqueIdentifiers(segment, readCommittedScope);
     return cache.getAll(tableBlockIndexUniqueIdentifiers);
   }
 
-  private List<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(
-      Segment segment) throws IOException {
+  private List<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment,
+      ReadCommittedScope readCommittedScope) throws IOException {
+    if (readCommittedScope == null) {
+      throw new IOException("readCommittedScope is null. Internal error");
+    }
     List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
         segmentMap.get(segment.getSegmentNo());
     if (tableBlockIndexUniqueIdentifiers == null) {
       tableBlockIndexUniqueIdentifiers = new ArrayList<>();
-      Map<String, String> indexFiles;
-      if (segment.getSegmentFileName() == null) {
-        String path =
-            CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
-        indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(path);
-      } else {
-        SegmentFileStore fileStore =
-            new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
-        indexFiles = fileStore.getIndexFiles();
-      }
+      Map<String, String> indexFiles = readCommittedScope.getCommittedIndexFile(segment);
       for (Map.Entry<String, String> indexFileEntry: indexFiles.entrySet()) {
         Path indexFile = new Path(indexFileEntry.getKey());
         tableBlockIndexUniqueIdentifiers.add(
@@ -122,7 +117,8 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
    * default datamap.
    */
   @Override
-  public List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, Segment segment)
+  public List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, Segment segment,
+      ReadCommittedScope readCommittedScope)
       throws IOException {
     List<ExtendedBlocklet> detailedBlocklets = new ArrayList<>();
     // If it is already detailed blocklet then type cast and return same
@@ -133,7 +129,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
       return detailedBlocklets;
     }
     List<TableBlockIndexUniqueIdentifier> identifiers =
-        getTableBlockIndexUniqueIdentifiers(segment);
+        getTableBlockIndexUniqueIdentifiers(segment, readCommittedScope);
     // Retrieve each blocklets detail information from blocklet datamap
     for (Blocklet blocklet : blocklets) {
       detailedBlocklets.add(getExtendedBlocklet(identifiers, blocklet));
@@ -142,13 +138,14 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
   }
 
   @Override
-  public ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment)
+  public ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment,
+      ReadCommittedScope readCommittedScope)
       throws IOException {
     if (blocklet instanceof ExtendedBlocklet) {
       return (ExtendedBlocklet) blocklet;
     }
     List<TableBlockIndexUniqueIdentifier> identifiers =
-        getTableBlockIndexUniqueIdentifiers(segment);
+        getTableBlockIndexUniqueIdentifiers(segment, readCommittedScope);
     return getExtendedBlocklet(identifiers, blocklet);
   }
 
@@ -228,7 +225,8 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
   }
 
   @Override
-  public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
+  public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable,
+      ReadCommittedScope readCommittedScope)
       throws IOException {
     BlockletDataMapDistributable mapDistributable = (BlockletDataMapDistributable) distributable;
     List<TableBlockIndexUniqueIdentifier> identifiers = new ArrayList<>();
@@ -264,8 +262,9 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
     return null;
   }
 
-  @Override public SegmentProperties getSegmentProperties(Segment segment) throws IOException {
-    List<CoarseGrainDataMap> dataMaps = getDataMaps(segment);
+  @Override public SegmentProperties getSegmentProperties(Segment segment,
+      ReadCommittedScope readCommittedScope) throws IOException {
+    List<CoarseGrainDataMap> dataMaps = getDataMaps(segment, readCommittedScope);
     assert (dataMaps.size() > 0);
     CoarseGrainDataMap coarseGrainDataMap = dataMaps.get(0);
     assert (coarseGrainDataMap instanceof BlockletDataMap);
@@ -273,12 +272,13 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
     return dataMap.getSegmentProperties();
   }
 
-  @Override public List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions)
-      throws IOException {
+  @Override public List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions,
+      ReadCommittedScope readCommittedScope) throws IOException {
     List<Blocklet> blocklets = new ArrayList<>();
-    List<CoarseGrainDataMap> dataMaps = getDataMaps(segment);
+    List<CoarseGrainDataMap> dataMaps = getDataMaps(segment, readCommittedScope);
     for (CoarseGrainDataMap dataMap : dataMaps) {
-      blocklets.addAll(dataMap.prune(null, getSegmentProperties(segment), partitions));
+      blocklets.addAll(
+          dataMap.prune(null, getSegmentProperties(segment, readCommittedScope), partitions));
     }
     return blocklets;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
index 54814cd..8692f13 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
@@ -80,14 +80,13 @@ public class SchemaReader {
         identifier.getTablePath());
   }
 
-
-  public static TableInfo inferSchema(AbsoluteTableIdentifier identifier)
-      throws IOException {
+  public static TableInfo inferSchema(AbsoluteTableIdentifier identifier,
+      boolean isCarbonFileProvider) throws IOException {
     // This routine is going to infer schema from the carbondata file footer
     // Convert the ColumnSchema -> TableSchema -> TableInfo.
     // Return the TableInfo.
-    org.apache.carbondata.format.TableInfo tableInfo =
-        CarbonUtil.inferSchema(identifier.getTablePath(), identifier.getTableName());
+    org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil
+        .inferSchema(identifier.getTablePath(), identifier.getTableName(), isCarbonFileProvider);
     SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
     TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
         tableInfo, identifier.getDatabaseName(), identifier.getTableName(),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 9d50048..d3eab6c 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -143,6 +143,14 @@ public class CarbonTable implements Serializable {
 
   private boolean hasDataMapSchema;
 
+  /**
+   * The boolean field which points if the data written for UnManaged Table
+   * or Managed Table. The difference between managed and unManaged table is
+   * unManaged Table will not contain any Metadata folder and subsequently
+   * no TableStatus or Schema files.
+   */
+  private boolean isUnManagedTable;
+
   private CarbonTable() {
     this.tableDimensionsMap = new HashMap<String, List<CarbonDimension>>();
     this.tableImplicitDimensionsMap = new HashMap<String, List<CarbonDimension>>();
@@ -241,6 +249,7 @@ public class CarbonTable implements Serializable {
     table.blockSize = tableInfo.getTableBlockSizeInMB();
     table.tableLastUpdatedTime = tableInfo.getLastUpdatedTime();
     table.tableUniqueName = tableInfo.getTableUniqueName();
+    table.setUnManagedTable(tableInfo.isUnManagedTable());
     table.fillDimensionsAndMeasuresForTables(tableInfo.getFactTable());
     table.fillCreateOrderColumn(tableInfo.getFactTable().getTableName());
     if (tableInfo.getFactTable().getBucketingInfo() != null) {
@@ -990,4 +999,12 @@ public class CarbonTable implements Serializable {
   public static CarbonTableBuilder builder() {
     return new CarbonTableBuilder();
   }
+
+  public boolean isUnManagedTable() {
+    return isUnManagedTable;
+  }
+
+  public void setUnManagedTable(boolean unManagedTable) {
+    isUnManagedTable = unManagedTable;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java
index 27808f8..82d0246 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java
@@ -28,16 +28,15 @@ public class CarbonTableBuilder {
   private String tableName;
   private String databaseName;
   private String tablePath;
+  private boolean unManagedTable;
   private TableSchema tableSchema;
 
   public CarbonTableBuilder tableName(String tableName) {
-    Objects.requireNonNull(tableName, "tableName should not be null");
     this.tableName = tableName;
     return this;
   }
 
   public CarbonTableBuilder databaseName(String databaseName) {
-    Objects.requireNonNull(databaseName, "databaseName should not be null");
     this.databaseName = databaseName;
     return this;
   }
@@ -48,6 +47,13 @@ public class CarbonTableBuilder {
     return this;
   }
 
+
+  public CarbonTableBuilder isUnManagedTable(boolean isUnManagedTable) {
+    Objects.requireNonNull(isUnManagedTable, "UnManaged Table should not be null");
+    this.unManagedTable = isUnManagedTable;
+    return this;
+  }
+
   public CarbonTableBuilder tableSchema(TableSchema tableSchema) {
     Objects.requireNonNull(tableSchema, "tableSchema should not be null");
     this.tableSchema = tableSchema;
@@ -55,16 +61,17 @@ public class CarbonTableBuilder {
   }
 
   public CarbonTable build() {
-    Objects.requireNonNull(tableName, "tableName should not be null");
-    Objects.requireNonNull(databaseName, "databaseName should not be null");
     Objects.requireNonNull(tablePath, "tablePath should not be null");
     Objects.requireNonNull(tableSchema, "tableSchema should not be null");
+    Objects.requireNonNull(unManagedTable, "UnManaged Table should not be null");
+
 
     TableInfo tableInfo = new TableInfo();
     tableInfo.setDatabaseName(databaseName);
     tableInfo.setTableUniqueName(databaseName + "_" + tableName);
     tableInfo.setFactTable(tableSchema);
     tableInfo.setTablePath(tablePath);
+    tableInfo.setUnManagedTable(unManagedTable);
     tableInfo.setLastUpdatedTime(System.currentTimeMillis());
     tableInfo.setDataMapSchemaList(new ArrayList<DataMapSchema>(0));
     return CarbonTable.buildFromTableInfo(tableInfo);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
index 47e09d0..3e7ea62 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
@@ -77,6 +77,14 @@ public class TableInfo implements Serializable, Writable {
    */
   private String tablePath;
 
+  /**
+   * The boolean field which points if the data written for UnManaged Table
+   * or Managed Table. The difference between managed and unManaged table is
+   * unManaged Table will not contain any Metadata folder and subsequently
+   * no TableStatus or Schema files.
+   */
+  private boolean isUnManagedTable;
+
   // this identifier is a lazy field which will be created when it is used first time
   private AbsoluteTableIdentifier identifier;
 
@@ -240,6 +248,7 @@ public class TableInfo implements Serializable, Writable {
     factTable.write(out);
     out.writeLong(lastUpdatedTime);
     out.writeUTF(getOrCreateAbsoluteTableIdentifier().getTablePath());
+    out.writeBoolean(isUnManagedTable);
     boolean isChildSchemaExists =
         null != dataMapSchemaList && dataMapSchemaList.size() > 0;
     out.writeBoolean(isChildSchemaExists);
@@ -267,6 +276,7 @@ public class TableInfo implements Serializable, Writable {
     this.factTable.readFields(in);
     this.lastUpdatedTime = in.readLong();
     this.tablePath = in.readUTF();
+    this.isUnManagedTable = in.readBoolean();
     boolean isChildSchemaExists = in.readBoolean();
     this.dataMapSchemaList = new ArrayList<>();
     if (isChildSchemaExists) {
@@ -320,4 +330,11 @@ public class TableInfo implements Serializable, Writable {
     return parentRelationIdentifiers;
   }
 
+  public boolean isUnManagedTable() {
+    return isUnManagedTable;
+  }
+
+  public void setUnManagedTable(boolean unManagedTable) {
+    isUnManagedTable = unManagedTable;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
new file mode 100644
index 0000000..631d12c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.readcommitter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+/**
+ * This is a readCommittedScope for unmanaged carbon table
+ */
+@InterfaceAudience.Internal
+@InterfaceStability.Stable
+public class LatestFilesReadCommittedScope implements ReadCommittedScope {
+
+  private String carbonFilePath;
+  private ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot;
+  private LoadMetadataDetails[] loadMetadataDetails;
+  public LatestFilesReadCommittedScope(String path) {
+    this.carbonFilePath = path;
+    try {
+      takeCarbonIndexFileSnapShot();
+    } catch (IOException ex) {
+      throw new RuntimeException("Error while taking index snapshot", ex);
+    }
+  }
+
+  private void prepareLoadMetadata() {
+    int loadCount = 0;
+    Map<String, List<String>> snapshotMap =
+        this.readCommittedIndexFileSnapShot.getSegmentIndexFileMap();
+    LoadMetadataDetails[] loadMetadataDetailsArray = new LoadMetadataDetails[snapshotMap.size()];
+    String segmentID;
+    for (Map.Entry<String, List<String>> entry : snapshotMap.entrySet()) {
+      segmentID = entry.getKey();
+      LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
+      long timeSet;
+      try {
+        timeSet = Long.parseLong(segmentID);
+      } catch (NumberFormatException nu) {
+        timeSet = 0;
+      }
+      loadMetadataDetails.setLoadEndTime(timeSet);
+      loadMetadataDetails.setLoadStartTime(timeSet);
+      loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS);
+      loadMetadataDetails.setLoadName(segmentID);
+      loadMetadataDetailsArray[loadCount++] = loadMetadataDetails;
+    }
+    this.loadMetadataDetails = loadMetadataDetailsArray;
+  }
+
+  @Override public LoadMetadataDetails[] getSegmentList() throws IOException {
+    try {
+      if (loadMetadataDetails == null) {
+        takeCarbonIndexFileSnapShot();
+      }
+      return loadMetadataDetails;
+
+    } catch (IOException ex) {
+      throw new IOException("Problem encountered while reading the Table Status file.", ex);
+    }
+  }
+
+  @Override public Map<String, String> getCommittedIndexFile(Segment segment) throws IOException {
+    Map<String, String> indexFileStore = new HashMap<>();
+    Map<String, List<String>> snapShot = readCommittedIndexFileSnapShot.getSegmentIndexFileMap();
+    String segName;
+    if (segment.getSegmentNo() != null) {
+      segName = segment.getSegmentNo();
+    } else {
+      segName = segment.getSegmentFileName();
+    }
+    List<String> index = snapShot.get(segName);
+    for (String indexPath : index) {
+      indexFileStore.put(indexPath, null);
+    }
+    return indexFileStore;
+  }
+
+  private String getSegmentID(String carbonIndexFileName, String indexFilePath) {
+    if (indexFilePath.contains("/Fact/Part0/Segment_")) {
+      // This is CarbonFile case where the Index files are present inside the Segment Folder
+      // So the Segment has to be extracted from the path not from the CarbonIndex file.
+      String segString = indexFilePath.substring(0, indexFilePath.lastIndexOf("/") + 1);
+      String segName = segString
+          .substring(segString.lastIndexOf("_") + 1, segString.lastIndexOf("/"));
+      return segName;
+    } else {
+      String fileName = carbonIndexFileName;
+      String segId = fileName.substring(fileName.lastIndexOf("-") + 1, fileName.lastIndexOf("."));
+      return segId;
+    }
+  }
+
+  @Override public void takeCarbonIndexFileSnapShot() throws IOException {
+    // Read the current file Path get the list of indexes from the path.
+    CarbonFile file = FileFactory.getCarbonFile(carbonFilePath);
+    Map<String, List<String>> indexFileStore = new HashMap<>();
+    if (file.isDirectory()) {
+      CarbonFile[] carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(carbonFilePath);
+      for (int i = 0; i < carbonIndexFiles.length; i++) {
+        // TODO. If Required to support merge index, then this code has to be modified.
+        // TODO. Nested File Paths.
+        if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+          // Get Segment Name from the IndexFile.
+          String segId =
+              getSegmentID(carbonIndexFiles[i].getName(), carbonIndexFiles[i].getAbsolutePath());
+          // TODO. During Partition table handling, place Segment File Name.
+          List<String> indexList;
+          if (indexFileStore.get(segId) == null) {
+            indexList = new ArrayList<>(1);
+          } else {
+            // Entry is already present.
+            indexList = indexFileStore.get(segId);
+          }
+          indexList.add(carbonIndexFiles[i].getAbsolutePath());
+          indexFileStore.put(segId, indexList);
+        }
+      }
+      ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot =
+          new ReadCommittedIndexFileSnapShot(indexFileStore);
+      this.readCommittedIndexFileSnapShot = readCommittedIndexFileSnapShot;
+      prepareLoadMetadata();
+    } else {
+      throw new IOException("Path is not pointing to directory");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java
new file mode 100644
index 0000000..4491a29
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.readcommitter;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
+/**
+ * This class is going to save the the Index files which are taken snapshot
+ * from the readCommitter Interface.
+ */
+@InterfaceAudience.Internal
+@InterfaceStability.Evolving
+public class ReadCommittedIndexFileSnapShot {
+
+  /**
+   * Segment Numbers are mapped with list of Index Files.
+   */
+  private Map<String, List<String>> segmentIndexFileMap;
+
+  public ReadCommittedIndexFileSnapShot(Map<String, List<String>> segmentIndexFileMap) {
+    this.segmentIndexFileMap = segmentIndexFileMap;
+  }
+
+  public Map<String, List<String>> getSegmentIndexFileMap() {
+    return segmentIndexFileMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
new file mode 100644
index 0000000..f6ba78e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.readcommitter;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+
+/**
+ * ReadCommitted interface that defines a read scope.
+ */
+@InterfaceAudience.Internal
+@InterfaceStability.Stable
+public interface ReadCommittedScope {
+
+  public LoadMetadataDetails[] getSegmentList() throws IOException;
+
+  /**
+   * @param segment
+   * @return map of Absolute path of index file as key and null as value -- without mergeIndex
+   * map of AbsolutePath with fileName of MergeIndex parent file as key and mergeIndexFileName
+   *                                                             as value -- with mergeIndex
+   * @throws IOException
+   */
+  public Map<String, String> getCommittedIndexFile(Segment segment) throws IOException ;
+
+  public void takeCarbonIndexFileSnapShot() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
new file mode 100644
index 0000000..4f54241
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.readcommitter;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+/**
+ * ReadCommittedScope for the managed carbon table
+ */
+@InterfaceAudience.Internal
+@InterfaceStability.Stable
+public class TableStatusReadCommittedScope implements ReadCommittedScope {
+  private LoadMetadataDetails[] loadMetadataDetails;
+  private AbsoluteTableIdentifier identifier;
+
+  public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier) throws IOException {
+    this.identifier = identifier;
+    takeCarbonIndexFileSnapShot();
+  }
+
+  @Override public LoadMetadataDetails[] getSegmentList() throws IOException {
+    try {
+      if (loadMetadataDetails == null) {
+        takeCarbonIndexFileSnapShot();
+      }
+      return loadMetadataDetails;
+
+    } catch (IOException ex) {
+      throw new IOException("Problem encountered while reading the Table Status file.", ex);
+    }
+  }
+
+  @Override public Map<String, String> getCommittedIndexFile(Segment segment) throws IOException {
+    Map<String, String> indexFiles;
+    if (segment.getSegmentFileName() == null) {
+      String path =
+          CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
+      indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(path);
+    } else {
+      SegmentFileStore fileStore =
+          new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
+      indexFiles = fileStore.getIndexFiles();
+    }
+    return indexFiles;
+  }
+
+  @Override public void takeCarbonIndexFileSnapShot() throws IOException {
+    // Only Segment Information is updated.
+    // File information will be fetched on the fly according to the fecthed segment info.
+    this.loadMetadataDetails = SegmentStatusManager
+        .readTableStatusFile(CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 06c255e..676976a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -308,7 +308,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
             queryModel.getProjectionDimensions(), tableBlockDimensions,
             segmentProperties.getComplexDimensions(), queryModel.getProjectionMeasures().size());
     blockExecutionInfo.setBlockId(
-        CarbonUtil.getBlockId(queryModel.getAbsoluteTableIdentifier(), filePath, segmentId));
+        CarbonUtil.getBlockId(queryModel.getAbsoluteTableIdentifier(), filePath, segmentId,
+            queryModel.getTable().getTableInfo().isUnManagedTable()));
     blockExecutionInfo.setDeleteDeltaFilePath(deleteDeltaFiles);
     blockExecutionInfo.setStartBlockletIndex(startBlockletIndex);
     blockExecutionInfo.setNumberOfBlockletToScan(numberOfBlockletToScan);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 308fe30..0e2976a 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -240,7 +240,7 @@ public class SegmentUpdateStatusManager {
    * @throws Exception
    */
   public String[] getDeleteDeltaFilePath(String blockFilePath, String segmentId) throws Exception {
-    String blockId = CarbonUtil.getBlockId(identifier, blockFilePath, segmentId);
+    String blockId = CarbonUtil.getBlockId(identifier, blockFilePath, segmentId, false);
     String tupleId;
     if (isPartitionTable) {
       tupleId = CarbonTablePath.getShortBlockIdForPartitionTable(blockId);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 8e21d46..09692e6 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2336,10 +2336,14 @@ public final class CarbonUtil {
    *
    * @return table info containing the schema
    */
-  public static org.apache.carbondata.format.TableInfo inferSchema(
-      String carbonDataFilePath, String tableName) throws IOException {
-    List<String> filePaths =
-        getFilePathExternalFilePath(carbonDataFilePath + "/Fact/Part0/Segment_null");
+  public static org.apache.carbondata.format.TableInfo inferSchema(String carbonDataFilePath,
+      String tableName, boolean isCarbonFileProvider) throws IOException {
+    List<String> filePaths;
+    if (isCarbonFileProvider) {
+      filePaths = getFilePathExternalFilePath(carbonDataFilePath + "/Fact/Part0/Segment_null");
+    } else {
+      filePaths = getFilePathExternalFilePath(carbonDataFilePath);
+    }
     String fistFilePath = null;
     try {
       fistFilePath = filePaths.get(0);
@@ -2910,13 +2914,14 @@ public final class CarbonUtil {
    * @return
    */
   public static String getBlockId(AbsoluteTableIdentifier identifier, String filePath,
-      String segmentId) {
+      String segmentId, boolean isUnmangedTable) {
     String blockId;
     String blockName = filePath.substring(filePath.lastIndexOf("/") + 1, filePath.length());
     String tablePath = identifier.getTablePath();
+
     if (filePath.startsWith(tablePath)) {
       String factDir = CarbonTablePath.getFactDir(tablePath);
-      if (filePath.startsWith(factDir)) {
+      if (filePath.startsWith(factDir) || isUnmangedTable) {
         blockId = "Part0" + CarbonCommonConstants.FILE_SEPARATOR + "Segment_" + segmentId
             + CarbonCommonConstants.FILE_SEPARATOR + blockName;
       } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
index 45dee2a..a2f92c9 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactor
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
@@ -61,10 +62,13 @@ public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory {
    * getDataMaps Factory method Initializes the Min Max Data Map and returns.
    *
    * @param segment
+   * @param readCommittedScope
    * @return
    * @throws IOException
    */
-  @Override public List<CoarseGrainDataMap> getDataMaps(Segment segment)
+  @Override
+  public List<CoarseGrainDataMap> getDataMaps(Segment segment,
+      ReadCommittedScope readCommittedScope)
       throws IOException {
     List<CoarseGrainDataMap> dataMapList = new ArrayList<>();
     // Form a dataMap of Type MinMaxIndexDataMap.
@@ -101,7 +105,8 @@ public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory {
   @Override public void clear() {
   }
 
-  @Override public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
+  @Override public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable,
+      ReadCommittedScope readCommittedScope)
       throws IOException {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
index 04160c0..7308841 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 
 /**
  * FG level of lucene DataMap
@@ -43,7 +44,8 @@ public class LuceneCoarseGrainDataMapFactory extends LuceneDataMapFactoryBase<Co
    * Get the datamap for segmentid
    */
   @Override
-  public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException {
+  public List<CoarseGrainDataMap> getDataMaps(Segment segment,
+      ReadCommittedScope readCommittedScope) throws IOException {
     List<CoarseGrainDataMap> lstDataMap = new ArrayList<>();
     CoarseGrainDataMap dataMap = new LuceneCoarseGrainDataMap(analyzer);
     try {
@@ -62,9 +64,10 @@ public class LuceneCoarseGrainDataMapFactory extends LuceneDataMapFactoryBase<Co
    * Get datamaps for distributable object.
    */
   @Override
-  public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
+  public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable,
+      ReadCommittedScope readCommittedScope)
       throws IOException {
-    return getDataMaps(distributable.getSegment());
+    return getDataMaps(distributable.getSegment(), readCommittedScope);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
index a9376bc..23c9928 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
@@ -28,6 +28,7 @@ import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainDataMap;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 
 /**
  * CG level of lucene DataMap
@@ -38,8 +39,8 @@ public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<Fine
   /**
    * Get the datamap for segmentid
    */
-  @Override
-  public List<FineGrainDataMap> getDataMaps(Segment segment) throws IOException {
+  @Override public List<FineGrainDataMap> getDataMaps(Segment segment,
+      ReadCommittedScope readCommittedScope) throws IOException {
     List<FineGrainDataMap> lstDataMap = new ArrayList<>();
     FineGrainDataMap dataMap = new LuceneFineGrainDataMap(analyzer);
     try {
@@ -58,9 +59,10 @@ public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<Fine
    * Get datamaps for distributable object.
    */
   @Override
-  public List<FineGrainDataMap> getDataMaps(DataMapDistributable distributable)
+  public List<FineGrainDataMap> getDataMaps(DataMapDistributable distributable,
+      ReadCommittedScope readCommittedScope)
       throws IOException {
-    return getDataMaps(distributable.getSegment());
+    return getDataMaps(distributable.getSegment(), readCommittedScope);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index c352a95..214e534 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -36,6 +36,8 @@ import org.apache.carbondata.core.metadata.schema.SchemaReader;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.SingleTableProvider;
 import org.apache.carbondata.core.scan.filter.TableProvider;
@@ -78,7 +80,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
             .getSchemaFilePath(getAbsoluteTableIdentifier(configuration).getTablePath());
         if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) {
           TableInfo tableInfoInfer =
-              SchemaReader.inferSchema(getAbsoluteTableIdentifier(configuration));
+              SchemaReader.inferSchema(getAbsoluteTableIdentifier(configuration), true);
           localCarbonTable = CarbonTable.buildFromTableInfo(tableInfoInfer);
         } else {
           localCarbonTable =
@@ -114,6 +116,8 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
       // get all valid segments and set them into the configuration
       // check for externalTable segment (Segment_null)
       // process and resolve the expression
+      ReadCommittedScope readCommittedScope = new LatestFilesReadCommittedScope(
+          identifier.getTablePath() + "/Fact/Part0/Segment_null/");
       Expression filter = getFilterPredicates(job.getConfiguration());
       TableProvider tableProvider = new SingleTableProvider(carbonTable);
       // this will be null in case of corrupt schema file.
@@ -138,7 +142,8 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
         }
         // do block filtering and get split
         List<InputSplit> splits =
-            getSplits(job, filterInterface, externalTableSegments, null, partitionInfo, null);
+            getSplits(job, filterInterface, externalTableSegments, null, partitionInfo, null,
+                readCommittedScope);
 
         return splits;
       }
@@ -156,7 +161,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
    */
   private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterResolver,
       List<Segment> validSegments, BitSet matchedPartitions, PartitionInfo partitionInfo,
-      List<Integer> oldPartitionIdList) throws IOException {
+      List<Integer> oldPartitionIdList, ReadCommittedScope readCommittedScope) throws IOException {
 
     numSegments = validSegments.size();
     List<InputSplit> result = new LinkedList<InputSplit>();
@@ -170,7 +175,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
     // for each segment fetch blocks matching filter in Driver BTree
     List<CarbonInputSplit> dataBlocksOfSegment =
         getDataBlocksOfSegment(job, carbonTable, filterResolver, matchedPartitions,
-            validSegments, partitionInfo, oldPartitionIdList);
+            validSegments, partitionInfo, oldPartitionIdList, readCommittedScope);
     numBlocks = dataBlocksOfSegment.size();
     for (CarbonInputSplit inputSplit : dataBlocksOfSegment) {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index f85e0e9..be97d05 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -42,6 +42,7 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.SingleTableProvider;
 import org.apache.carbondata.core.scan.filter.TableProvider;
@@ -100,6 +101,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       "mapreduce.input.carboninputformat.filter.predicate";
   private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
   private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
+  private static final String UNMANAGED_TABLE = "mapreduce.input.carboninputformat.unmanaged";
   private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
   private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
   private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr";
@@ -160,6 +162,10 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     configuration.set(FileInputFormat.INPUT_DIR, tablePath);
   }
 
+  public static void setUnmanagedTable(Configuration configuration, boolean isUnmanagedTable) {
+    configuration.set(UNMANAGED_TABLE, String.valueOf(isUnmanagedTable));
+  }
+
   public static void setPartitionIdList(Configuration configuration, List<String> partitionIds) {
     configuration.set(ALTER_PARTITION_ID, partitionIds.toString());
   }
@@ -332,7 +338,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   protected List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job,
       CarbonTable carbonTable, FilterResolverIntf resolver,
       BitSet matchedPartitions, List<Segment> segmentIds, PartitionInfo partitionInfo,
-      List<Integer> oldPartitionIdList) throws IOException {
+      List<Integer> oldPartitionIdList, ReadCommittedScope readCommittedScope) throws IOException {
 
     QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
     QueryStatistic statistic = new QueryStatistic();
@@ -356,7 +362,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       // Apply expression on the blocklets.
       prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
     } else {
-      prunedBlocklets = dataMapExprWrapper.prune(segmentIds, partitionsToPrune);
+      prunedBlocklets = dataMapExprWrapper.prune(segmentIds, partitionsToPrune, readCommittedScope);
     }
 
     List<CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index b4a5c5e..06ada3d 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -45,6 +45,9 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
 import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.mutate.data.BlockMappingVO;
+import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
 import org.apache.carbondata.core.reader.CarbonIndexFileReader;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
@@ -89,10 +92,13 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
   private static final Log LOG = LogFactory.getLog(CarbonTableInputFormat.class);
   private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
   private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
+  private static final String CARBON_UNMANAGED_TABLE =
+      "mapreduce.input.carboninputformat.unmanaged";
   public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName";
   public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName";
   // a cache for carbon table, it will be used in task side
   private CarbonTable carbonTable;
+  private ReadCommittedScope readCommittedScope;
 
   /**
    * Get the cached CarbonTable or create it by TableInfo in `configuration`
@@ -128,12 +134,14 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
   @Override
   public List<InputSplit> getSplits(JobContext job) throws IOException {
     AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
-    LoadMetadataDetails[] loadMetadataDetails = SegmentStatusManager
-        .readTableStatusFile(CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()));
+
     CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
     if (null == carbonTable) {
       throw new IOException("Missing/Corrupt schema file for table.");
     }
+    this.readCommittedScope = getReadCommitted(job, identifier);
+    LoadMetadataDetails[] loadMetadataDetails = readCommittedScope.getSegmentList();
+
     SegmentUpdateStatusManager updateStatusManager =
         new SegmentUpdateStatusManager(carbonTable, loadMetadataDetails);
     List<Segment> invalidSegments = new ArrayList<>();
@@ -413,7 +421,13 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     List<Segment> segmentList = new ArrayList<>();
     segmentList.add(new Segment(targetSegment, null));
     setSegmentsToAccess(job.getConfiguration(), segmentList);
+
     try {
+      carbonTable = getOrCreateCarbonTable(job.getConfiguration());
+      ReadCommittedScope readCommittedScope =
+          getReadCommitted(job, carbonTable.getAbsoluteTableIdentifier());
+      this.readCommittedScope = readCommittedScope;
+
       // process and resolve the expression
       Expression filter = getFilterPredicates(job.getConfiguration());
       CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
@@ -508,7 +522,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     // for each segment fetch blocks matching filter in Driver BTree
     List<org.apache.carbondata.hadoop.CarbonInputSplit> dataBlocksOfSegment =
         getDataBlocksOfSegment(job, carbonTable, filterResolver, matchedPartitions,
-            validSegments, partitionInfo, oldPartitionIdList);
+            validSegments, partitionInfo, oldPartitionIdList, readCommittedScope);
     numBlocks = dataBlocksOfSegment.size();
     for (org.apache.carbondata.hadoop.CarbonInputSplit inputSplit : dataBlocksOfSegment) {
 
@@ -559,8 +573,10 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
       List<PartitionSpec> partitions) throws IOException {
     AbsoluteTableIdentifier identifier = table.getAbsoluteTableIdentifier();
     TableDataMap blockletMap = DataMapStoreManager.getInstance().getDefaultDataMap(table);
-    LoadMetadataDetails[] loadMetadataDetails = SegmentStatusManager
-        .readTableStatusFile(CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()));
+
+    ReadCommittedScope readCommittedScope = getReadCommitted(job, identifier);
+    LoadMetadataDetails[] loadMetadataDetails = readCommittedScope.getSegmentList();
+
     SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(
         table, loadMetadataDetails);
     SegmentStatusManager.ValidAndInvalidSegmentsInfo allSegments =
@@ -571,7 +587,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     // TODO: currently only batch segment is supported, add support for streaming table
     List<Segment> filteredSegment = getFilteredSegment(job, allSegments.getValidSegments(), false);
 
-    List<ExtendedBlocklet> blocklets = blockletMap.prune(filteredSegment, null, partitions);
+    List<ExtendedBlocklet> blocklets =
+        blockletMap.prune(filteredSegment, null, partitions, readCommittedScope);
     for (ExtendedBlocklet blocklet : blocklets) {
       String blockName = blocklet.getPath();
       blockName = CarbonTablePath.getCarbonDataFileName(blockName);
@@ -601,4 +618,18 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
 
     return new BlockMappingVO(blockRowCountMapping, segmentAndBlockCountMapping);
   }
-}
+
+  public ReadCommittedScope getReadCommitted(JobContext job, AbsoluteTableIdentifier identifier)
+      throws IOException {
+    if (readCommittedScope == null) {
+      ReadCommittedScope readCommittedScope;
+      if (job.getConfiguration().getBoolean(CARBON_UNMANAGED_TABLE, false)) {
+        readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath());
+      } else {
+        readCommittedScope = new TableStatusReadCommittedScope(identifier);
+      }
+      this.readCommittedScope = readCommittedScope;
+    }
+    return readCommittedScope;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
index 3f19a3f..deeeabe 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
@@ -30,6 +30,8 @@ import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
 
@@ -101,16 +103,17 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
       private Iterator<ExtendedBlocklet> blockletIterator;
       private ExtendedBlocklet currBlocklet;
 
-      @Override
-      public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+      @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
           throws IOException, InterruptedException {
         DataMapDistributableWrapper distributable = (DataMapDistributableWrapper) inputSplit;
         TableDataMap dataMap = DataMapStoreManager.getInstance()
             .getDataMap(table, distributable.getDistributable().getDataMapSchema());
-        List<ExtendedBlocklet> blocklets = dataMap.prune(
-            distributable.getDistributable(),
-            dataMapExprWrapper.getFilterResolverIntf(distributable.getUniqueId()), partitions);
-        for (ExtendedBlocklet blocklet: blocklets) {
+        ReadCommittedScope readCommittedScope =
+            new TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier());
+        List<ExtendedBlocklet> blocklets = dataMap.prune(distributable.getDistributable(),
+            dataMapExprWrapper.getFilterResolverIntf(distributable.getUniqueId()), partitions,
+            readCommittedScope);
+        for (ExtendedBlocklet blocklet : blocklets) {
           blocklet.setDataMapUniqueId(distributable.getUniqueId());
         }
         blockletIterator = blocklets.iterator();