You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/04/05 13:27:45 UTC

[1/3] carbondata git commit: [CARBONDATA-2313] Support unmanaged carbon table read and write

Repository: carbondata
Updated Branches:
  refs/heads/master 550846029 -> 280a4003a


http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index 14802e6..5f7ef6a 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -43,7 +43,7 @@ public class CarbonReaderTest {
 
     TestUtil.writeFilesAndVerify(new Schema(fields), path, true);
 
-    CarbonReader reader = CarbonReader.builder(path)
+    CarbonReader reader = CarbonReader.builder(path, "_temp")
         .projection(new String[]{"name", "age"}).build();
 
     int i = 0;


[2/3] carbondata git commit: [CARBONDATA-2313] Support unmanaged carbon table read and write

Posted by gv...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala
new file mode 100644
index 0000000..bfb9471
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import java.io.{File, FileFilter}
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.junit.Assert
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
+
+
+class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll {
+
+  var writerPath = new File(this.getClass.getResource("/").getPath
+                            +
+                            "../." +
+                            "./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
+    .getCanonicalPath
+  //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+  writerPath = writerPath.replace("\\", "/");
+
+  // prepare SDK writer output
+  def buildTestData(persistSchema: Boolean, outputMultipleFiles: Boolean): Any = {
+
+    FileUtils.deleteDirectory(new File(writerPath))
+
+    val schema = new StringBuilder()
+      .append("[ \n")
+      .append("   {\"name\":\"string\"},\n")
+      .append("   {\"age\":\"int\"},\n")
+      .append("   {\"height\":\"double\"}\n")
+      .append("]")
+      .toString()
+
+    try {
+      val builder = CarbonWriter.builder()
+      val writer =
+        if (persistSchema) {
+          builder.persistSchemaFile(true)
+          builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).unManagedTable(true)
+            .uniqueIdentifier(
+              System.currentTimeMillis)
+            .buildWriterForCSVInput()
+        } else {
+          builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).unManagedTable(true)
+            .uniqueIdentifier(
+              System.currentTimeMillis).withBlockSize(1).withBlockletSize(1)
+            .buildWriterForCSVInput()
+        }
+      var i = 0
+      var row = 3
+      if (outputMultipleFiles) {
+        row = 1000000
+      }
+      while (i < row) {
+        writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
+        i += 1
+      }
+      writer.close()
+    } catch {
+      case ex: Exception => None
+      case _ => None
+    }
+  }
+
+  def cleanTestData() = {
+    FileUtils.deleteDirectory(new File(writerPath))
+  }
+
+  def deleteFile(path: String, extension: String): Unit = {
+    val file: CarbonFile = FileFactory
+      .getCarbonFile(path, FileFactory.getFileType(path))
+
+    for (eachDir <- file.listFiles) {
+      if (!eachDir.isDirectory) {
+        if (eachDir.getName.endsWith(extension)) {
+          CarbonUtil.deleteFoldersAndFilesSilent(eachDir)
+        }
+      } else {
+        deleteFile(eachDir.getPath, extension)
+      }
+    }
+  }
+
+  override def beforeAll(): Unit = {
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+  }
+
+  override def afterAll(): Unit = {
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+  }
+
+  test("test create External Table with Schema with partition, should ignore schema and partition")
+  {
+    buildTestData(false, false)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    // with partition
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable(name string) PARTITIONED BY (age int) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0),
+      Row("robot1", 1, 0.5),
+      Row("robot2", 2, 1.0)))
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+  test("read unmanaged table, files written from sdk Writer Output)") {
+    buildTestData(false, false)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable1")
+
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable1 STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql("select * from sdkOutputTable1"), Seq(Row("robot0", 0, 0.0),
+      Row("robot1", 1, 0.5),
+      Row("robot2", 2, 1.0)))
+
+    checkAnswer(sql("select name from sdkOutputTable1"), Seq(Row("robot0"),
+      Row("robot1"),
+      Row("robot2")))
+
+    checkAnswer(sql("select age from sdkOutputTable1"), Seq(Row(0), Row(1), Row(2)))
+
+    checkAnswer(sql("select * from sdkOutputTable1 where age > 1 and age < 8"),
+      Seq(Row("robot2", 2, 1.0)))
+
+    checkAnswer(sql("select * from sdkOutputTable1 where name = 'robot2'"),
+      Seq(Row("robot2", 2, 1.0)))
+
+    checkAnswer(sql("select * from sdkOutputTable1 where name like '%obot%' limit 2"),
+      Seq(Row("robot0", 0, 0.0),
+        Row("robot1", 1, 0.5)))
+
+    checkAnswer(sql("select sum(age) from sdkOutputTable1 where name like 'robot%'"), Seq(Row(3)))
+
+    checkAnswer(sql("select count(*) from sdkOutputTable1 where name like 'robot%' "), Seq(Row(3)))
+
+    checkAnswer(sql("select count(*) from sdkOutputTable1"), Seq(Row(3)))
+
+    sql("DROP TABLE sdkOutputTable1")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+  test("Test Blocked operations for unmanaged table ") {
+    buildTestData(false, false)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    //1. alter datatype
+    var exception = intercept[MalformedCarbonCommandException] {
+      sql("Alter table sdkOutputTable change age age BIGINT")
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on unmanaged table"))
+
+    //2. Load
+    exception = intercept[MalformedCarbonCommandException] {
+      sql("LOAD DATA LOCAL INPATH '/path/to/data' INTO TABLE sdkOutputTable ")
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on unmanaged table"))
+
+    //3. Datamap creation
+    exception = intercept[MalformedCarbonCommandException] {
+      sql(
+        "CREATE DATAMAP agg_sdkOutputTable ON TABLE sdkOutputTable USING \"preaggregate\" AS " +
+        "SELECT name, sum(age) FROM sdkOutputTable GROUP BY name,age")
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on unmanaged table"))
+
+    //4. Insert Into
+    exception = intercept[MalformedCarbonCommandException] {
+      sql("insert into table sdkOutputTable SELECT 20,'robotX',2.5")
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on unmanaged table"))
+
+    //5. compaction
+    exception = intercept[MalformedCarbonCommandException] {
+      sql("ALTER TABLE sdkOutputTable COMPACT 'MAJOR'")
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on unmanaged table"))
+
+    //6. Show segments
+    exception = intercept[MalformedCarbonCommandException] {
+      sql("Show segments for table sdkOutputTable").show(false)
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on unmanaged table"))
+
+    //7. Delete segment by ID
+    exception = intercept[MalformedCarbonCommandException] {
+      sql("DELETE FROM TABLE sdkOutputTable WHERE SEGMENT.ID IN (0)")
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on unmanaged table"))
+
+    //8. Delete segment by date
+    exception = intercept[MalformedCarbonCommandException] {
+      sql("DELETE FROM TABLE sdkOutputTable WHERE SEGMENT.STARTTIME BEFORE '2017-06-01 12:05:06'")
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on unmanaged table"))
+
+    //9. Update Segment
+    exception = intercept[MalformedCarbonCommandException] {
+      sql("UPDATE sdkOutputTable SET (age) = (age + 9) ").show(false)
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on unmanaged table"))
+
+    //10. Delete Segment
+    exception = intercept[MalformedCarbonCommandException] {
+      sql("DELETE FROM sdkOutputTable where name='robot1'").show(false)
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on unmanaged table"))
+
+    //11. Show partition
+    exception = intercept[MalformedCarbonCommandException] {
+      sql("Show partitions sdkOutputTable").show(false)
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on unmanaged table"))
+
+    //12. Streaming table creation
+    // External table don't accept table properties
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+  test("test create External Table With Schema, should ignore the schema provided") {
+    buildTestData(false, false)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    // with schema
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable(age int) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0),
+      Row("robot1", 1, 0.5),
+      Row("robot2", 2, 1.0)))
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+  test("Read sdk writer output file without Carbondata file should fail") {
+    buildTestData(false, false)
+    deleteFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    val exception = intercept[Exception] {
+      //    data source file format
+      sql(
+        s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+           |'$writerPath' """.stripMargin)
+    }
+    assert(exception.getMessage()
+      .contains("Operation not allowed: Invalid table path provided:"))
+
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+
+  test("Read sdk writer output file without any file should fail") {
+    buildTestData(false, false)
+    deleteFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
+    deleteFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    val exception = intercept[Exception] {
+      //data source file format
+      sql(
+        s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+           |'$writerPath' """.stripMargin)
+
+      sql("select * from sdkOutputTable").show(false)
+    }
+    assert(exception.getMessage()
+      .contains("Operation not allowed: Invalid table path provided:"))
+
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+  test("Read sdk writer output multiple files ") {
+    buildTestData(false, true)
+    assert(new File(writerPath).exists())
+    val folder = new File(writerPath)
+    val dataFiles = folder.listFiles(new FileFilter() {
+      override def accept(pathname: File): Boolean = {
+        pathname.getName
+          .endsWith(CarbonCommonConstants.FACT_FILE_EXT)
+      }
+    })
+    Assert.assertNotNull(dataFiles)
+    Assert.assertNotEquals(1, dataFiles.length)
+
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql("select count(*) from sdkOutputTable"), Seq(Row(1000000)))
+
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index b56bd6e..0c96247 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -38,6 +38,7 @@ import org.apache.carbondata.core.indexstore.{Blocklet, PartitionSpec}
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
@@ -69,7 +70,7 @@ class CGDataMapFactory extends CoarseGrainDataMapFactory {
   /**
    * Get the datamap for segmentid
    */
-  override def getDataMaps(segment: Segment): java.util.List[CoarseGrainDataMap] = {
+  override def getDataMaps(segment: Segment, readCommitted: ReadCommittedScope): java.util.List[CoarseGrainDataMap] = {
     val file = FileFactory.getCarbonFile(
       CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
 
@@ -87,8 +88,7 @@ class CGDataMapFactory extends CoarseGrainDataMapFactory {
   /**
    * Get datamaps for distributable object.
    */
-  override def getDataMaps(
-      distributable: DataMapDistributable): java.util.List[CoarseGrainDataMap] = {
+  override def getDataMaps(distributable: DataMapDistributable, readCommitted: ReadCommittedScope): java.util.List[CoarseGrainDataMap] = {
     val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
     val dataMap: CoarseGrainDataMap = new CGDataMap()
     dataMap.init(new DataMapModel(mapDistributable.getFilePath))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index bb8d7f8..270c676 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.Event
@@ -53,9 +54,9 @@ class C2DataMapFactory() extends CoarseGrainDataMapFactory {
 
   override def clear(): Unit = {}
 
-  override def getDataMaps(distributable: DataMapDistributable): util.List[CoarseGrainDataMap] = ???
+  override def getDataMaps(distributable: DataMapDistributable, readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = ???
 
-  override def getDataMaps(segment: Segment): util.List[CoarseGrainDataMap] = ???
+  override def getDataMaps(segment: Segment, readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = ???
 
   override def createWriter(segment: Segment, dataWritePath: String): DataMapWriter =
     DataMapWriterSuite.dataMapWriterC2Mock(identifier, segment, dataWritePath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index 3bfc7b9..060d06a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -25,7 +25,7 @@ import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.core.datamap.dev.{DataMapModel}
+import org.apache.carbondata.core.datamap.dev.DataMapModel
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment}
 import org.apache.carbondata.core.datamap.dev.fgdatamap.{FineGrainBlocklet, FineGrainDataMap, FineGrainDataMapFactory}
 import org.apache.carbondata.core.datamap.dev.{DataMapModel, DataMapWriter}
@@ -40,6 +40,7 @@ import org.apache.carbondata.core.indexstore.{Blocklet, PartitionSpec}
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
@@ -71,7 +72,7 @@ class FGDataMapFactory extends FineGrainDataMapFactory {
   /**
    * Get the datamap for segmentid
    */
-  override def getDataMaps(segment: Segment): java.util.List[FineGrainDataMap] = {
+  override def getDataMaps(segment: Segment, readCommitted: ReadCommittedScope): java.util.List[FineGrainDataMap] = {
     val file = FileFactory
       .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
 
@@ -88,8 +89,7 @@ class FGDataMapFactory extends FineGrainDataMapFactory {
   /**
    * Get datamap for distributable object.
    */
-  override def getDataMaps(
-      distributable: DataMapDistributable): java.util.List[FineGrainDataMap]= {
+  override def getDataMaps(distributable: DataMapDistributable, readCommitted: ReadCommittedScope): java.util.List[FineGrainDataMap]= {
     val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
     val dataMap: FineGrainDataMap = new FGDataMap()
     dataMap.init(new DataMapModel(mapDistributable.getFilePath))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
index 32c10ef..be9cc51 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Se
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.Event
@@ -183,9 +184,15 @@ class TestDataMap() extends CoarseGrainDataMapFactory {
 
   override def clear(): Unit = {}
 
-  override def getDataMaps(distributable: DataMapDistributable): util.List[CoarseGrainDataMap] = ???
+  override def getDataMaps(distributable: DataMapDistributable,
+      readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = {
+    ???
+  }
 
-  override def getDataMaps(segment: Segment): util.List[CoarseGrainDataMap] = ???
+  override def getDataMaps(segment: Segment,
+      readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = {
+    ???
+  }
 
   override def createWriter(segment: Segment, writeDirectoryPath: String): DataMapWriter = {
     new DataMapWriter(identifier, segment, writeDirectoryPath) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
index 1f4b7fe..65857b1 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, RelationIdentifier}
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.Event
@@ -276,9 +277,9 @@ class WaitingDataMap() extends CoarseGrainDataMapFactory {
 
   override def clear(): Unit = {}
 
-  override def getDataMaps(distributable: DataMapDistributable): util.List[CoarseGrainDataMap] = ???
+  override def getDataMaps(distributable: DataMapDistributable, readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = ???
 
-  override def getDataMaps(segment: Segment): util.List[CoarseGrainDataMap] = ???
+  override def getDataMaps(segment: Segment, readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = ???
 
   override def createWriter(segment: Segment, writeDirectoryPath: String): DataMapWriter = {
     new DataMapWriter(identifier, segment, writeDirectoryPath) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 8c433ea..d34d009 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -45,7 +45,7 @@ import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.block.Distributable
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
+import org.apache.carbondata.core.metadata.schema.table.{TableInfo}
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.filter.FilterUtil
 import org.apache.carbondata.core.scan.model.QueryModel
@@ -495,6 +495,8 @@ class CarbonScanRDD(
     if (partitionNames != null) {
       CarbonInputFormat.setPartitionsToPrune(conf, partitionNames.asJava)
     }
+
+    CarbonInputFormat.setUnmanagedTable(conf, tableInfo.isUnManagedTable)
     createInputFormat(conf)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
index 605777d..22a0112 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
@@ -158,7 +158,7 @@ object PartitionUtils {
       .createCarbonTableInputFormat(identifier, partitionIds.asJava, job)
     CarbonInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo)
     val splits = format.getSplitsOfOneSegment(job, segmentId,
-      oldPartitionIdList.map(_.asInstanceOf[Integer]).asJava, partitionInfo)
+        oldPartitionIdList.map(_.asInstanceOf[Integer]).asJava, partitionInfo)
     val blockList = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
     val tableBlockInfoList = CarbonInputSplit.createBlocks(blockList.asJava)
     tableBlockInfoList

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
index 5f78397..b9e2442 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
@@ -33,14 +33,14 @@ object CarbonSparkUtil {
 
   def createSparkMeta(carbonTable: CarbonTable): CarbonMetaData = {
     val dimensionsAttr = carbonTable.getDimensionByTableName(carbonTable.getTableName)
-        .asScala.map(x => x.getColName) // wf : may be problem
+      .asScala.map(x => x.getColName) // wf : may be problem
     val measureAttr = carbonTable.getMeasureByTableName(carbonTable.getTableName)
-        .asScala.map(x => x.getColName)
+      .asScala.map(x => x.getColName)
     val dictionary =
       carbonTable.getDimensionByTableName(carbonTable.getTableName).asScala.map { f =>
         (f.getColName.toLowerCase,
-            f.hasEncoding(Encoding.DICTIONARY) && !f.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
-                !f.getDataType.isComplexType)
+          f.hasEncoding(Encoding.DICTIONARY) && !f.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
+          !f.getDataType.isComplexType)
       }
     CarbonMetaData(dimensionsAttr,
       measureAttr,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
index b27a150..e29986a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
@@ -76,6 +76,8 @@ case class CarbonCountStar(
     SparkHadoopUtil.get.addCredentials(jobConf)
     val job = new Job(jobConf)
     FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
+    CarbonInputFormat
+      .setUnmanagedTable(job.getConfiguration, carbonTable.getTableInfo.isUnManagedTable)
     (job, carbonInputFormat)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 693b6c8..ef23926 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -299,7 +299,10 @@ object CarbonSource {
       tableDesc.copy(storage = updatedFormat)
     } else {
       val tableInfo = CarbonUtil.convertGsonToTableInfo(properties.asJava)
-      if (!metaStore.isReadFromHiveMetaStore) {
+      val isExternal = properties.getOrElse("isExternal", "false")
+      val isUnManagedTable = properties.getOrElse("isUnManaged", "false").contains("true")
+      tableInfo.setUnManagedTable(isUnManagedTable)
+      if (!isUnManagedTable && !metaStore.isReadFromHiveMetaStore) {
         // save to disk
         metaStore.saveToDisk(tableInfo, properties("tablePath"))
         // remove schema string from map as we don't store carbon schema to hive metastore
@@ -320,16 +323,20 @@ object CarbonSource {
       properties: Map[String, String]): Map[String, String] = {
     val model = createTableInfoFromParams(properties, dataSchema, identifier)
     val tableInfo: TableInfo = TableNewProcessor(model)
+    val isExternal = properties.getOrElse("isExternal", "false")
+    val isUnManagedTable = properties.getOrElse("isUnManaged", "false").contains("true")
+    val tablePath = properties.getOrElse("path", "")
     tableInfo.setTablePath(identifier.getTablePath)
+    tableInfo.setUnManagedTable(isUnManagedTable)
     tableInfo.setDatabaseName(identifier.getDatabaseName)
     val schemaEvolutionEntry = new SchemaEvolutionEntry
     schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
     tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
-    val map = if (metaStore.isReadFromHiveMetaStore) {
-      CarbonUtil.convertToMultiStringMap(tableInfo)
-    } else {
+    val map = if (!metaStore.isReadFromHiveMetaStore && !isUnManagedTable) {
       metaStore.saveToDisk(tableInfo, identifier.getTablePath)
       new java.util.HashMap[String, String]()
+    } else {
+      CarbonUtil.convertToMultiStringMap(tableInfo)
     }
     properties.foreach(e => map.put(e._1, e._2))
     map.put("tablepath", identifier.getTablePath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index a46d514..be5287f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -55,6 +55,11 @@ case class CarbonCreateDataMapCommand(
         CarbonEnv.getCarbonTable(table.database, table.table)(sparkSession)
       case _ => null
     }
+
+    if (mainTable != null && mainTable.getTableInfo.isUnManagedTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+    }
+
     if (mainTable != null && mainTable.getDataMapSchema(dataMapName) != null) {
       if (!ifNotExistsSet) {
         throw new MalformedDataMapCommandException(s"DataMap name '$dataMapName' already exist")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index bccc4dd..dc96399 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -77,6 +77,10 @@ case class CarbonAlterTableCompactionCommand(
       }
       relation.carbonTable
     }
+    if (table.getTableInfo.isUnManagedTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+    }
+
     if (CarbonUtil.hasAggregationDataMap(table) ||
         (table.isChildDataMap && null == operationContext.getProperty(table.getTableName))) {
       // If the compaction request is of 'streaming' type then we need to generate loadCommands

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
index 81427a1..57ccd82 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
@@ -21,6 +21,7 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 
 import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events.{DeleteSegmentByIdPostEvent, DeleteSegmentByIdPreEvent, OperationContext, OperationListenerBus}
@@ -35,6 +36,10 @@ case class CarbonDeleteLoadByIdCommand(
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
 
+    if (carbonTable.getTableInfo.isUnManagedTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+    }
+
     // if insert overwrite in progress, do not allow delete segment
     if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
       throw new ConcurrentOperationException(carbonTable, "insert overwrite", "delete segment")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
index 1d76bda..7d0655d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
@@ -21,6 +21,7 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 
 import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events.{DeleteSegmentByDatePostEvent, DeleteSegmentByDatePreEvent, OperationContext, OperationListenerBus}
@@ -36,6 +37,10 @@ case class CarbonDeleteLoadByLoadDateCommand(
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
 
+    if (carbonTable.getTableInfo.isUnManagedTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+    }
+
     // if insert overwrite in progress, do not allow delete segment
     if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
       throw new ConcurrentOperationException(carbonTable, "insert overwrite", "delete segment")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index f0a9a9e..3f86ca4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -153,6 +153,9 @@ case class CarbonLoadDataCommand(
     } else {
       null
     }
+    if (table.getTableInfo.isUnManagedTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+    }
     // get the value of 'spark.executor.cores' from spark conf, default value is 1
     val sparkExecutorCores = sparkSession.sparkContext.conf.get("spark.executor.cores", "1")
     // get the value of 'carbon.number.of.cores.while.loading' from carbon properties,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
index 1e5885e..1e65887 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 import org.apache.spark.sql.types.{StringType, TimestampType}
 
 import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 
 case class CarbonShowLoadsCommand(
     databaseNameOp: Option[String],
@@ -43,6 +44,9 @@ case class CarbonShowLoadsCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
+    if (carbonTable.getTableInfo.isUnManagedTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+    }
     CarbonStore.showSegments(
       limit,
       carbonTable.getMetadataPath

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index 230378b..ca9a6a1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
@@ -44,6 +45,10 @@ private[sql] case class CarbonProjectForDeleteCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
+    if (carbonTable.getTableInfo.isUnManagedTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+    }
+
     if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
       throw new ConcurrentOperationException(carbonTable, "loading", "data delete")
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 2a92478..24ac80c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.types.ArrayType
 import org.apache.spark.storage.StorageLevel
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
@@ -56,6 +57,9 @@ private[sql] case class CarbonProjectForUpdateCommand(
       return Seq.empty
     }
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
+    if (carbonTable.getTableInfo.isUnManagedTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+    }
     if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
       throw new ConcurrentOperationException(carbonTable, "loading", "data update")
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index e0e51ed..65c6269 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -39,6 +39,7 @@ case class CarbonCreateTableCommand(
     tableInfo: TableInfo,
     ifNotExistsSet: Boolean = false,
     tableLocation: Option[String] = None,
+    isExternal : Boolean = false,
     createDSTable: Boolean = true,
     isVisible: Boolean = true)
   extends MetadataCommand {
@@ -89,6 +90,7 @@ case class CarbonCreateTableCommand(
       OperationListenerBus.getInstance.fireEvent(createTablePreExecutionEvent, operationContext)
       val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
       val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tableIdentifier)
+      val isUnmanaged = tableInfo.isUnManagedTable
       if (createDSTable) {
         try {
           val tablePath = tableIdentifier.getTablePath
@@ -128,6 +130,8 @@ case class CarbonCreateTableCommand(
                |  dbName "$dbName",
                |  tablePath "$tablePath",
                |  path "$tablePath",
+               |  isExternal "$isExternal",
+               |  isUnManaged "$isUnmanaged",
                |  isVisible "$isVisible"
                |  $carbonSchemaString)
                |  $partitionString

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
index 2daece3..788a820 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
@@ -44,6 +44,7 @@ import org.apache.carbondata.core.datamap.{DataMapChooser, DataMapStoreManager,
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, ColumnarFormatVersion}
+import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope
 import org.apache.carbondata.core.reader.CarbonHeaderReader
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.expression.logical.AndExpression
@@ -222,6 +223,8 @@ class SparkCarbonFileFormat extends FileFormat
 
 
         val segmentPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), "null")
+        val readCommittedScope = new LatestFilesReadCommittedScope(
+          identifier.getTablePath + "/Fact/Part0/Segment_null/")
         val indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(segmentPath)
         if (indexFiles.size() == 0) {
           throw new SparkException("Index file not present to read the carbondata file")
@@ -233,7 +236,7 @@ class SparkCarbonFileFormat extends FileFormat
           .choose(tab, model.getFilterExpressionResolverTree)
 
         // TODO : handle the partition for CarbonFileLevelFormat
-        val prunedBlocklets = dataMapExprWrapper.prune(segments, null)
+        val prunedBlocklets = dataMapExprWrapper.prune(segments, null, readCommittedScope)
 
         val detailInfo = prunedBlocklets.get(0).getDetailInfo
         detailInfo.readColumnSchema(detailInfo.getColumnSchemaBinary)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index a5a96af..ad3ad2e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -117,6 +117,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           if (carbonTable != null && carbonTable.isFileLevelFormat) {
             throw new MalformedCarbonCommandException(
               "Unsupported alter operation on Carbon external fileformat table")
+          } else if (carbonTable != null && carbonTable.getTableInfo.isUnManagedTable) {
+            throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
           } else {
             ExecutedCommandExec(dataTypeChange) :: Nil
           }
@@ -133,6 +135,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           if (carbonTable != null && carbonTable.isFileLevelFormat) {
             throw new MalformedCarbonCommandException(
               "Unsupported alter operation on Carbon external fileformat table")
+          } else if (carbonTable != null && carbonTable.getTableInfo.isUnManagedTable) {
+            throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
           } else {
             ExecutedCommandExec(addColumn) :: Nil
           }
@@ -149,6 +153,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           if (carbonTable != null && carbonTable.isFileLevelFormat) {
             throw new MalformedCarbonCommandException(
               "Unsupported alter operation on Carbon external fileformat table")
+          } else if (carbonTable != null && carbonTable.getTableInfo.isUnManagedTable) {
+            throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
           } else {
             ExecutedCommandExec(dropColumn) :: Nil
           }
@@ -181,6 +187,9 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         if (isCarbonTable) {
           val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
             .lookupRelation(t)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
+          if (carbonTable != null && carbonTable.getTableInfo.isUnManagedTable) {
+            throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+          }
           if (!carbonTable.isHivePartitionTable) {
             ExecutedCommandExec(CarbonShowCarbonPartitionsCommand(t)) :: Nil
           } else {
@@ -231,6 +240,9 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         // if the table has 'preaggregate' DataMap, it doesn't support streaming now
         val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
           .lookupRelation(tableName)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
+        if (carbonTable != null && carbonTable.getTableInfo.isUnManagedTable) {
+          throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+        }
 
         // TODO remove this limitation later
         val property = properties.find(_._1.equalsIgnoreCase("streaming"))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 7a8601a..0075c13 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive
 
 import java.io.IOException
 import java.net.URI
-import java.util.UUID
 
 import scala.collection.mutable.ArrayBuffer
 
@@ -43,11 +42,10 @@ import org.apache.carbondata.core.fileoperations.FileWriteOperation
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema
-import org.apache.carbondata.core.metadata.schema.table
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.{table, SchemaReader}
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.core.util.path.CarbonTablePath.getNewTablePath
 import org.apache.carbondata.core.writer.ThriftWriter
 import org.apache.carbondata.events.{LookupRelationPostEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
@@ -105,7 +103,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
       case Some(t) =>
         CarbonRelation(database, tableName, CarbonSparkUtil.createSparkMeta(t), t)
       case None =>
-        readCarbonSchema(absIdentifier) match {
+        readCarbonSchema(absIdentifier,
+          parameters.getOrElse("isUnManaged", "false").toBoolean) match {
           case Some(meta) =>
             CarbonRelation(database, tableName,
               CarbonSparkUtil.createSparkMeta(meta), meta)
@@ -207,25 +206,51 @@ class CarbonFileMetastore extends CarbonMetaStore {
     true
   }
 
-  private def readCarbonSchema(identifier: AbsoluteTableIdentifier): Option[CarbonTable] = {
+  def isTableInMetastore(identifier: AbsoluteTableIdentifier,
+      sparkSession: SparkSession): Boolean = {
+    sparkSession.sessionState.catalog.listTables(identifier.getDatabaseName)
+      .exists(_.table.equalsIgnoreCase(identifier.getTableName))
+  }
+
+
+  private def readCarbonSchema(identifier: AbsoluteTableIdentifier,
+      inferSchema: Boolean): Option[CarbonTable] = {
+
+    val schemaConverter = new ThriftWrapperSchemaConverterImpl
     val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
     val tableName = identifier.getCarbonTableIdentifier.getTableName
+    val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableName)
     val tablePath = identifier.getTablePath
-    val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
-    val fileType = FileFactory.getFileType(tableMetadataFile)
-    if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
-      val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableName)
-      val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
-      val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val wrapperTableInfo =
+    if (inferSchema) {
+      val thriftTableInfo = schemaConverter
+        .fromWrapperToExternalTableInfo(SchemaReader.inferSchema(identifier, false),
+          dbName, tableName)
       val wrapperTableInfo =
-        schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath)
+        schemaConverter
+          .fromExternalToWrapperTableInfo(thriftTableInfo, dbName, tableName, tablePath)
+      wrapperTableInfo.getFactTable.getTableProperties.put("_external", "true")
+      wrapperTableInfo.setUnManagedTable(true)
+      Some(wrapperTableInfo)
+    } else {
+      val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
+      val fileType = FileFactory.getFileType(tableMetadataFile)
+      if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
+        val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
+        val wrapperTableInfo =
+          schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath)
+        Some(wrapperTableInfo)
+      } else {
+        None
+      }
+    }
+
+    wrapperTableInfo.map { tableInfo =>
       CarbonMetadata.getInstance().removeTable(tableUniqueName)
-      CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
+      CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
       val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
       metadata.carbonTables += carbonTable
-      Some(carbonTable)
-    } else {
-      None
+      carbonTable
     }
   }
 
@@ -448,6 +473,34 @@ class CarbonFileMetastore extends CarbonMetaStore {
       val tableIdentifier = TableIdentifier(tableName, Option(dbName))
       sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
       DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
+    } else {
+      if (isUnmanagedCarbonTable(absoluteTableIdentifier, sparkSession)) {
+        removeTableFromMetadata(dbName, tableName)
+        CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
+        // discard cached table info in cachedDataSourceTables
+        val tableIdentifier = TableIdentifier(tableName, Option(dbName))
+        sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
+        DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
+      }
+    }
+  }
+
+
+  def isUnmanagedCarbonTable(identifier: AbsoluteTableIdentifier,
+      sparkSession: SparkSession): Boolean = {
+    if (sparkSession.sessionState.catalog.listTables(identifier.getDatabaseName)
+      .exists(_.table.equalsIgnoreCase(identifier.getTableName))) {
+
+      val table = sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
+        .getCarbonEnv().carbonMetastore
+        .getTableFromMetadataCache(identifier.getDatabaseName, identifier.getTableName)
+
+      table match {
+        case null => false
+        case _ => table.get.getTableInfo.isUnManagedTable
+      }
+    } else {
+      false
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 7b464f2..33eac61 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -258,6 +258,7 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
     }
     // validate tblProperties
     val bucketFields = parser.getBucketFields(tableProperties, fields, options)
+    var unManagedTable : Boolean = false
 
     val tableInfo = if (external) {
       // read table info from schema file in the provided table path
@@ -267,9 +268,13 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
         tableIdentifier.table)
       val table = try {
         val schemaPath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
-        if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath)) &&
-            provider.equalsIgnoreCase("'carbonfile'")) {
-          SchemaReader.inferSchema(identifier)
+        if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) {
+          if (provider.equalsIgnoreCase("'carbonfile'")) {
+            SchemaReader.inferSchema(identifier, true)
+          } else {
+            unManagedTable = true
+            SchemaReader.inferSchema(identifier, false)
+          }
         }
         else {
           SchemaReader.getTableInfo(identifier)
@@ -302,6 +307,7 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
         tableComment)
       TableNewProcessor(tableModel)
     }
+    tableInfo.setUnManagedTable(unManagedTable)
     selectQuery match {
       case query@Some(q) =>
         CarbonCreateTableAsSelectCommand(
@@ -313,7 +319,8 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
         CarbonCreateTableCommand(
           tableInfo = tableInfo,
           ifNotExistsSet = ifNotExists,
-          tableLocation = tablePath)
+          tableLocation = tablePath,
+          external)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index c21c57f..829c17e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -110,6 +110,8 @@ public class CarbonDataLoadConfiguration {
 
   private SortColumnRangeInfo sortColumnRangeInfo;
 
+  private boolean carbonUnmanagedTable;
+
   /**
    * Flder path to where data should be written for this load.
    */
@@ -377,4 +379,12 @@ public class CarbonDataLoadConfiguration {
   public void setSortColumnRangeInfo(SortColumnRangeInfo sortColumnRangeInfo) {
     this.sortColumnRangeInfo = sortColumnRangeInfo;
   }
+
+  public boolean isCarbonUnmanagedTable() {
+    return carbonUnmanagedTable;
+  }
+
+  public void setCarbonUnmanagedTable(boolean carbonUnmanagedTable) {
+    this.carbonUnmanagedTable = carbonUnmanagedTable;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 21a4b3e..6d3f596 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -185,6 +185,7 @@ public final class DataLoadProcessBuilder {
     CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
     AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
     configuration.setTableIdentifier(identifier);
+    configuration.setCarbonUnmanagedTable(loadModel.isCarbonUnmanagedTable());
     configuration.setSchemaUpdatedTimeStamp(carbonTable.getTableLastUpdatedTime());
     configuration.setHeader(loadModel.getCsvHeaderColumns());
     configuration.setSegmentId(loadModel.getSegmentId());
@@ -214,6 +215,7 @@ public final class DataLoadProcessBuilder {
         loadModel.getGlobalSortPartitions());
     configuration.setDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
         loadModel.getBadRecordsLocation());
+
     CarbonMetadata.getInstance().addCarbonTable(carbonTable);
     List<CarbonDimension> dimensions =
         carbonTable.getDimensionByTableName(carbonTable.getTableName());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index ebf3cf1..fd39563 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -47,6 +47,13 @@ public class CarbonLoadModel implements Serializable {
 
   private String tablePath;
 
+  /*
+     This points if the carbonTable is a Unmanaged Table or not.
+     The path will be pointed by the tablePath. And there will be
+     no Metadata folder present for the unmanaged Table.
+   */
+  private boolean carbonUnmanagedTable;
+
   private String csvHeader;
   private String[] csvHeaderColumns;
   private String csvDelimiter;
@@ -410,6 +417,7 @@ public class CarbonLoadModel implements Serializable {
     copy.defaultTimestampFormat = defaultTimestampFormat;
     copy.maxColumns = maxColumns;
     copy.tablePath = tablePath;
+    copy.carbonUnmanagedTable = carbonUnmanagedTable;
     copy.useOnePass = useOnePass;
     copy.dictionaryServerHost = dictionaryServerHost;
     copy.dictionaryServerPort = dictionaryServerPort;
@@ -463,6 +471,7 @@ public class CarbonLoadModel implements Serializable {
     copyObj.defaultTimestampFormat = defaultTimestampFormat;
     copyObj.maxColumns = maxColumns;
     copyObj.tablePath = tablePath;
+    copyObj.carbonUnmanagedTable = carbonUnmanagedTable;
     copyObj.useOnePass = useOnePass;
     copyObj.dictionaryServerHost = dictionaryServerHost;
     copyObj.dictionaryServerPort = dictionaryServerPort;
@@ -825,4 +834,12 @@ public class CarbonLoadModel implements Serializable {
     LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metadataPath);
     setLoadMetadataDetails(Arrays.asList(details));
   }
+
+  public boolean isCarbonUnmanagedTable() {
+    return carbonUnmanagedTable;
+  }
+
+  public void setCarbonUnmanagedTable(boolean carbonUnmanagedTable) {
+    this.carbonUnmanagedTable = carbonUnmanagedTable;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 8c005c3..8eb5ed1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -60,7 +60,7 @@ public class CarbonLoadModelBuilder {
    * @return a new CarbonLoadModel instance
    */
   public CarbonLoadModel build(
-      Map<String, String> options) throws InvalidLoadOptionException, IOException {
+      Map<String, String> options, long UUID) throws InvalidLoadOptionException, IOException {
     Map<String, String> optionsFinal = LoadOption.fillOptionWithDefaultValue(options);
 
     if (!options.containsKey("fileheader")) {
@@ -72,10 +72,13 @@ public class CarbonLoadModelBuilder {
       optionsFinal.put("fileheader", Strings.mkString(columns, ","));
     }
     CarbonLoadModel model = new CarbonLoadModel();
+    model.setCarbonUnmanagedTable(table.isUnManagedTable());
+    model.setFactTimeStamp(UUID);
 
     // we have provided 'fileheader', so it hadoopConf can be null
     build(options, optionsFinal, model, null);
 
+
     // set default values
     model.setTimestampformat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
     model.setDateFormat(CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
index e605b9e..089b8c7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
@@ -244,8 +244,9 @@ public class LoadOption {
       }
     }
 
-    if (!CarbonDataProcessorUtil.isHeaderValid(carbonLoadModel.getTableName(), csvColumns,
-        carbonLoadModel.getCarbonDataLoadSchema(), staticPartitionCols)) {
+    if (!carbonLoadModel.isCarbonUnmanagedTable() && !CarbonDataProcessorUtil
+        .isHeaderValid(carbonLoadModel.getTableName(), csvColumns,
+            carbonLoadModel.getCarbonDataLoadSchema(), staticPartitionCols)) {
       if (csvFile == null) {
         LOG.error("CSV header in DDL is not proper."
             + " Column names in schema and CSV header are not the same.");

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 1d892e0..0625a66 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -366,8 +366,14 @@ public class CarbonFactDataHandlerModel {
       return paths;
     }
     AbsoluteTableIdentifier absoluteTableIdentifier = configuration.getTableIdentifier();
-    String carbonDataDirectoryPath = CarbonTablePath
-        .getSegmentPath(absoluteTableIdentifier.getTablePath(), configuration.getSegmentId() + "");
+    String carbonDataDirectoryPath;
+    if (configuration.isCarbonUnmanagedTable()) {
+      carbonDataDirectoryPath = absoluteTableIdentifier.getTablePath();
+    } else {
+      carbonDataDirectoryPath = CarbonTablePath
+          .getSegmentPath(absoluteTableIdentifier.getTablePath(),
+              configuration.getSegmentId() + "");
+    }
     CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
     return carbonDataDirectoryPath;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
index 210d516..716ec2a 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
@@ -84,8 +84,8 @@ public class CarbonReader<T> {
   /**
    * Return a new {@link CarbonReaderBuilder} instance
    */
-  public static CarbonReaderBuilder builder(String tablePath) {
-    return new CarbonReaderBuilder(tablePath);
+  public static CarbonReaderBuilder builder(String tablePath, String tableName) {
+    return new CarbonReaderBuilder(tablePath, tableName);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index 894b973..7f00b49 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -45,9 +45,11 @@ public class CarbonReaderBuilder {
   private String tablePath;
   private String[] projectionColumns;
   private Expression filterExpression;
+  private String tableName;
 
-  CarbonReaderBuilder(String tablePath) {
+  CarbonReaderBuilder(String tablePath, String tableName) {
     this.tablePath = tablePath;
+    this.tableName = tableName;
   }
 
   public CarbonReaderBuilder projection(String[] projectionColumnNames) {
@@ -63,7 +65,7 @@ public class CarbonReaderBuilder {
   }
 
   public <T> CarbonReader<T> build() throws IOException, InterruptedException {
-    CarbonTable table = CarbonTable.buildFromTablePath("_temp", tablePath);
+    CarbonTable table = CarbonTable.buildFromTablePath(tableName, tablePath);
 
     final CarbonFileInputFormat format = new CarbonFileInputFormat();
     final Job job = new Job(new Configuration());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 5be60c4..f70e165 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -55,6 +55,8 @@ public class CarbonWriterBuilder {
   private boolean persistSchemaFile;
   private int blockletSize;
   private int blockSize;
+  private boolean isUnManagedTable;
+  private long UUID;
 
   public CarbonWriterBuilder withSchema(Schema schema) {
     Objects.requireNonNull(schema, "schema should not be null");
@@ -82,9 +84,21 @@ public class CarbonWriterBuilder {
     return this;
   }
 
+  public CarbonWriterBuilder unManagedTable(boolean isUnManagedTable) {
+    Objects.requireNonNull(isUnManagedTable, "UnManaged Table should not be null");
+    this.isUnManagedTable = isUnManagedTable;
+    return this;
+  }
+
+  public CarbonWriterBuilder uniqueIdentifier(long UUID) {
+    Objects.requireNonNull(UUID, "Unique Identifier should not be null");
+    this.UUID = UUID;
+    return this;
+  }
+
   public CarbonWriterBuilder withBlockSize(int blockSize) {
-    if (blockSize <= 0) {
-      throw new IllegalArgumentException("blockSize should be greater than zero");
+    if (blockSize <= 0 || blockSize > 2048) {
+      throw new IllegalArgumentException("blockSize should be between 1 MB to 2048 MB");
     }
     this.blockSize = blockSize;
     return this;
@@ -129,7 +143,7 @@ public class CarbonWriterBuilder {
     }
 
     // build LoadModel
-    return buildLoadModel(table);
+    return buildLoadModel(table, UUID);
   }
 
   /**
@@ -152,8 +166,15 @@ public class CarbonWriterBuilder {
           new StructField(field.getFieldName(), field.getDataType()),
           sortColumnsList.contains(field.getFieldName()));
     }
-    String tableName = "_tempTable";
-    String dbName = "_tempDB";
+    String tableName;
+    String dbName;
+    if (!isUnManagedTable) {
+      tableName = "_tempTable";
+      dbName = "_tempDB";
+    } else {
+      dbName = null;
+      tableName = null;
+    }
     TableSchema schema = tableSchemaBuilder.build();
     schema.setTableName(tableName);
     CarbonTable table = CarbonTable.builder()
@@ -161,6 +182,7 @@ public class CarbonWriterBuilder {
         .databaseName(dbName)
         .tablePath(path)
         .tableSchema(schema)
+        .isUnManagedTable(isUnManagedTable)
         .build();
     return table;
   }
@@ -198,13 +220,13 @@ public class CarbonWriterBuilder {
   /**
    * Build a {@link CarbonLoadModel}
    */
-  private CarbonLoadModel buildLoadModel(CarbonTable table)
+  private CarbonLoadModel buildLoadModel(CarbonTable table, long UUID)
       throws InvalidLoadOptionException, IOException {
     Map<String, String> options = new HashMap<>();
     if (sortColumns != null) {
       options.put("sort_columns", Strings.mkString(sortColumns, ","));
     }
     CarbonLoadModelBuilder builder = new CarbonLoadModelBuilder(table);
-    return builder.build(options);
+    return builder.build(options, UUID);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVUnManagedCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVUnManagedCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVUnManagedCarbonWriterTest.java
new file mode 100644
index 0000000..4bcdfff
--- /dev/null
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVUnManagedCarbonWriterTest.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FilenameFilter;
+import java.io.IOException;
+
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test suite for {@link CSVCarbonWriter}
+ */
+public class CSVUnManagedCarbonWriterTest {
+
+  @Test
+  public void testWriteFiles() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(new Schema(fields), path);
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testWriteFilesJsonSchema() throws IOException {
+    String path = "./testWriteFilesJsonSchema";
+    FileUtils.deleteDirectory(new File(path));
+
+    String schema = new StringBuilder()
+        .append("[ \n")
+        .append("   {\"name\":\"string\"},\n")
+        .append("   {\"age\":\"int\"},\n")
+        .append("   {\"height\":\"double\"}\n")
+        .append("]")
+        .toString();
+
+    writeFilesAndVerify(Schema.parseJson(schema), path);
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  private void writeFilesAndVerify(Schema schema, String path) {
+    writeFilesAndVerify(schema, path, null);
+  }
+
+  private void writeFilesAndVerify(Schema schema, String path, String[] sortColumns) {
+    writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1);
+  }
+
+  private void writeFilesAndVerify(Schema schema, String path, boolean persistSchema) {
+    writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1);
+  }
+
+  /**
+   * Invoke CarbonWriter API to write carbon files and assert the file is rewritten
+   * @param rows number of rows to write
+   * @param schema schema of the file
+   * @param path local write path
+   * @param sortColumns sort columns
+   * @param persistSchema true if want to persist schema file
+   * @param blockletSize blockletSize in the file, -1 for default size
+   * @param blockSize blockSize in the file, -1 for default size
+   */
+  private void writeFilesAndVerify(int rows, Schema schema, String path, String[] sortColumns,
+      boolean persistSchema, int blockletSize, int blockSize) {
+    try {
+      CarbonWriterBuilder builder = CarbonWriter.builder()
+          .withSchema(schema)
+          .unManagedTable(true)
+          .uniqueIdentifier(System.currentTimeMillis())
+          .outputPath(path);
+      if (sortColumns != null) {
+        builder = builder.sortBy(sortColumns);
+      }
+      if (persistSchema) {
+        builder = builder.persistSchemaFile(true);
+      }
+      if (blockletSize != -1) {
+        builder = builder.withBlockletSize(blockletSize);
+      }
+      if (blockSize != -1) {
+        builder = builder.withBlockSize(blockSize);
+      }
+
+      CarbonWriter writer = builder.buildWriterForCSVInput();
+
+      for (int i = 0; i < rows; i++) {
+        writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)});
+      }
+      writer.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    } catch (InvalidLoadOptionException l) {
+      l.printStackTrace();
+      Assert.fail(l.getMessage());
+    }
+
+    File segmentFolder = new File(path);
+    Assert.assertTrue(segmentFolder.exists());
+
+    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+      @Override public boolean accept(File pathname) {
+        return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+      }
+    });
+    Assert.assertNotNull(dataFiles);
+    Assert.assertTrue(dataFiles.length > 0);
+  }
+  
+
+  @Test
+  public void testAllPrimitiveDataType() throws IOException {
+    // TODO: write all data type and read by CarbonRecordReader to verify the content
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[9];
+    fields[0] = new Field("stringField", DataTypes.STRING);
+    fields[1] = new Field("intField", DataTypes.INT);
+    fields[2] = new Field("shortField", DataTypes.SHORT);
+    fields[3] = new Field("longField", DataTypes.LONG);
+    fields[4] = new Field("doubleField", DataTypes.DOUBLE);
+    fields[5] = new Field("boolField", DataTypes.BOOLEAN);
+    fields[6] = new Field("dateField", DataTypes.DATE);
+    fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
+    fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
+
+    try {
+      CarbonWriterBuilder builder = CarbonWriter.builder()
+          .withSchema(new Schema(fields))
+          .uniqueIdentifier(System.currentTimeMillis())
+          .unManagedTable(true)
+          .outputPath(path);
+
+      CarbonWriter writer = builder.buildWriterForCSVInput();
+
+      for (int i = 0; i < 100; i++) {
+        String[] row = new String[]{
+            "robot" + (i % 10),
+            String.valueOf(i),
+            String.valueOf(i),
+            String.valueOf(Long.MAX_VALUE - i),
+            String.valueOf((double) i / 2),
+            String.valueOf(true),
+            "2019-03-02",
+            "2019-02-12 03:03:34"
+        };
+        writer.write(row);
+      }
+      writer.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+
+    File segmentFolder = new File(path);
+    Assert.assertTrue(segmentFolder.exists());
+
+    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+      @Override public boolean accept(File pathname) {
+        return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+      }
+    });
+    Assert.assertNotNull(dataFiles);
+    Assert.assertTrue(dataFiles.length > 0);
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void test2Blocklet() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 1, 100);
+
+    // TODO: implement reader to verify the number of blocklet in the file
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void test2Block() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 2, 2);
+
+    File segmentFolder = new File(path);
+    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+      @Override public boolean accept(File pathname) {
+        return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+      }
+    });
+    Assert.assertNotNull(dataFiles);
+    Assert.assertEquals(2, dataFiles.length);
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testSortColumns() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(new Schema(fields), path, new String[]{"name"});
+
+    // TODO: implement reader and verify the data is sorted
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testPartitionOutput() {
+    // TODO: test write data with partition
+  }
+
+  @Test
+  public void testSchemaPersistence() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(new Schema(fields), path, true);
+
+    String schemaFile = CarbonTablePath.getSchemaFilePath(path);
+    Assert.assertTrue(new File(schemaFile).exists());
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+}


[3/3] carbondata git commit: [CARBONDATA-2313] Support unmanaged carbon table read and write

Posted by gv...@apache.org.
[CARBONDATA-2313] Support unmanaged carbon table read and write

Carbon SDK writer will take the input data and write back the carbondata
and carbonindex files in the path specified.
This output doesn't have metadata folder. So, it is called unmanaged
carbon table.

This can be read by creating external table in the location of sdk
writer output path.

Please refer, TestUnmanagedCarbonTable.scla for the example scenario.

Load, insert, compaction, alter, IUD etc features are blocked for
unmanaged table

Co-authored-by: sounakr <so...@gmail.com>

This closes #2131


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/280a4003
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/280a4003
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/280a4003

Branch: refs/heads/master
Commit: 280a4003a6b68b286beac6dc31ff84772d5b5b84
Parents: 5508460
Author: ajantha-bhat <aj...@gmail.com>
Authored: Thu Apr 5 15:24:36 2018 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Thu Apr 5 18:56:01 2018 +0530

----------------------------------------------------------------------
 .../carbondata/core/datamap/TableDataMap.java   |  42 ++-
 .../core/datamap/dev/DataMapFactory.java        |   6 +-
 .../datamap/dev/expr/AndDataMapExprWrapper.java |  11 +-
 .../datamap/dev/expr/DataMapExprWrapper.java    |   4 +-
 .../dev/expr/DataMapExprWrapperImpl.java        |   8 +-
 .../datamap/dev/expr/OrDataMapExprWrapper.java  |  11 +-
 .../core/indexstore/BlockletDetailsFetcher.java |  13 +-
 .../indexstore/SegmentPropertiesFetcher.java    |   5 +-
 .../blockletindex/BlockletDataMapFactory.java   |  52 +--
 .../core/metadata/schema/SchemaReader.java      |   9 +-
 .../core/metadata/schema/table/CarbonTable.java |  17 +
 .../schema/table/CarbonTableBuilder.java        |  15 +-
 .../core/metadata/schema/table/TableInfo.java   |  17 +
 .../LatestFilesReadCommittedScope.java          | 155 ++++++++
 .../ReadCommittedIndexFileSnapShot.java         |  46 +++
 .../core/readcommitter/ReadCommittedScope.java  |  46 +++
 .../TableStatusReadCommittedScope.java          |  79 ++++
 .../executor/impl/AbstractQueryExecutor.java    |   3 +-
 .../SegmentUpdateStatusManager.java             |   2 +-
 .../apache/carbondata/core/util/CarbonUtil.java |  17 +-
 .../examples/MinMaxIndexDataMapFactory.java     |   9 +-
 .../lucene/LuceneCoarseGrainDataMapFactory.java |   9 +-
 .../lucene/LuceneFineGrainDataMapFactory.java   |  10 +-
 .../hadoop/api/CarbonFileInputFormat.java       |  13 +-
 .../hadoop/api/CarbonInputFormat.java           |  10 +-
 .../hadoop/api/CarbonTableInputFormat.java      |  45 ++-
 .../hadoop/api/DistributableDataMapFormat.java  |  15 +-
 .../createTable/TestUnmanagedCarbonTable.scala  | 369 +++++++++++++++++++
 .../testsuite/datamap/CGDataMapTestCase.scala   |   6 +-
 .../testsuite/datamap/DataMapWriterSuite.scala  |   5 +-
 .../testsuite/datamap/FGDataMapTestCase.scala   |   8 +-
 .../testsuite/datamap/TestDataMapStatus.scala   |  11 +-
 .../TestInsertAndOtherCommandConcurrent.scala   |   5 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   4 +-
 .../org/apache/spark/util/PartitionUtils.scala  |   2 +-
 .../carbondata/spark/util/CarbonSparkUtil.scala |   8 +-
 .../org/apache/spark/sql/CarbonCountStar.scala  |   2 +
 .../org/apache/spark/sql/CarbonSource.scala     |  15 +-
 .../datamap/CarbonCreateDataMapCommand.scala    |   5 +
 .../CarbonAlterTableCompactionCommand.scala     |   4 +
 .../CarbonDeleteLoadByIdCommand.scala           |   5 +
 .../CarbonDeleteLoadByLoadDateCommand.scala     |   5 +
 .../management/CarbonLoadDataCommand.scala      |   3 +
 .../management/CarbonShowLoadsCommand.scala     |   4 +
 .../CarbonProjectForDeleteCommand.scala         |   5 +
 .../CarbonProjectForUpdateCommand.scala         |   4 +
 .../table/CarbonCreateTableCommand.scala        |   4 +
 .../datasources/SparkCarbonFileFormat.scala     |   5 +-
 .../sql/execution/strategy/DDLStrategy.scala    |  12 +
 .../spark/sql/hive/CarbonFileMetastore.scala    |  87 ++++-
 .../spark/sql/parser/CarbonSparkSqlParser.scala |  15 +-
 .../loading/CarbonDataLoadConfiguration.java    |  10 +
 .../loading/DataLoadProcessBuilder.java         |   2 +
 .../loading/model/CarbonLoadModel.java          |  17 +
 .../loading/model/CarbonLoadModelBuilder.java   |   5 +-
 .../processing/loading/model/LoadOption.java    |   5 +-
 .../store/CarbonFactDataHandlerModel.java       |  10 +-
 .../carbondata/sdk/file/CarbonReader.java       |   4 +-
 .../sdk/file/CarbonReaderBuilder.java           |   6 +-
 .../sdk/file/CarbonWriterBuilder.java           |  36 +-
 .../sdk/file/CSVUnManagedCarbonWriterTest.java  | 277 ++++++++++++++
 .../carbondata/sdk/file/CarbonReaderTest.java   |   2 +-
 62 files changed, 1468 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 81571ce..6689e3b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.events.Event;
 import org.apache.carbondata.events.OperationContext;
@@ -79,26 +80,30 @@ public final class TableDataMap extends OperationEventListener {
    *
    * @param segments
    * @param filterExp
+   * @param readCommittedScope
    * @return
    */
   public List<ExtendedBlocklet> prune(List<Segment> segments, FilterResolverIntf filterExp,
-      List<PartitionSpec> partitions) throws IOException {
+      List<PartitionSpec> partitions, ReadCommittedScope readCommittedScope) throws IOException {
     List<ExtendedBlocklet> blocklets = new ArrayList<>();
     SegmentProperties segmentProperties;
     for (Segment segment : segments) {
       List<Blocklet> pruneBlocklets = new ArrayList<>();
       // if filter is not passed then return all the blocklets
       if (filterExp == null) {
-        pruneBlocklets = blockletDetailsFetcher.getAllBlocklets(segment, partitions);
+        pruneBlocklets = blockletDetailsFetcher.getAllBlocklets(segment, partitions,
+            readCommittedScope);
       } else {
-        List<DataMap> dataMaps = dataMapFactory.getDataMaps(segment);
-        segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment);
+        List<DataMap> dataMaps = dataMapFactory.getDataMaps(segment, readCommittedScope);
+        segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment,
+            readCommittedScope);
         for (DataMap dataMap : dataMaps) {
           pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties, partitions));
         }
       }
-      blocklets.addAll(addSegmentId(blockletDetailsFetcher
-          .getExtendedBlocklets(pruneBlocklets, segment), segment.getSegmentNo()));
+      blocklets.addAll(addSegmentId(
+          blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment, readCommittedScope),
+          segment.getSegmentNo()));
     }
     return blocklets;
   }
@@ -138,19 +143,20 @@ public final class TableDataMap extends OperationEventListener {
    *
    * @param distributable
    * @param filterExp
+   * @param readCommittedScope
    * @return
    */
   public List<ExtendedBlocklet> prune(DataMapDistributable distributable,
-      FilterResolverIntf filterExp, List<PartitionSpec> partitions) throws IOException {
+      FilterResolverIntf filterExp, List<PartitionSpec> partitions,
+      ReadCommittedScope readCommittedScope) throws IOException {
     List<ExtendedBlocklet> detailedBlocklets = new ArrayList<>();
     List<Blocklet> blocklets = new ArrayList<>();
-    List<DataMap> dataMaps = dataMapFactory.getDataMaps(distributable);
+    List<DataMap> dataMaps = dataMapFactory.getDataMaps(distributable, readCommittedScope);
     for (DataMap dataMap : dataMaps) {
-      blocklets.addAll(
-          dataMap.prune(
-              filterExp,
-              segmentPropertiesFetcher.getSegmentProperties(distributable.getSegment()),
-              partitions));
+      blocklets.addAll(dataMap.prune(filterExp,
+          segmentPropertiesFetcher.getSegmentProperties(distributable.getSegment(),
+              readCommittedScope),
+          partitions));
     }
     BlockletSerializer serializer = new BlockletSerializer();
     String writePath =
@@ -160,8 +166,8 @@ public final class TableDataMap extends OperationEventListener {
       FileFactory.mkdirs(writePath, FileFactory.getFileType(writePath));
     }
     for (Blocklet blocklet : blocklets) {
-      ExtendedBlocklet detailedBlocklet =
-          blockletDetailsFetcher.getExtendedBlocklet(blocklet, distributable.getSegment());
+      ExtendedBlocklet detailedBlocklet = blockletDetailsFetcher
+          .getExtendedBlocklet(blocklet, distributable.getSegment(), readCommittedScope);
       if (dataMapFactory.getDataMapType() == DataMapLevel.FG) {
         String blockletwritePath =
             writePath + CarbonCommonConstants.FILE_SEPARATOR + System.nanoTime();
@@ -208,14 +214,16 @@ public final class TableDataMap extends OperationEventListener {
    *
    * @param segments
    * @param filterExp
+   * @param readCommittedScope
    * @return
    * @throws IOException
    */
-  public List<Segment> pruneSegments(List<Segment> segments, FilterResolverIntf filterExp)
+  public List<Segment> pruneSegments(List<Segment> segments, FilterResolverIntf filterExp,
+      ReadCommittedScope readCommittedScope)
       throws IOException {
     List<Segment> prunedSegments = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     for (Segment segment : segments) {
-      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segment);
+      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segment, readCommittedScope);
       for (DataMap dataMap : dataMaps) {
         if (dataMap.isScanRequired(filterExp)) {
           // If any one task in a given segment contains the data that means the segment need to

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index 48038b7..d27b255 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.events.Event;
 
 /**
@@ -47,12 +48,13 @@ public interface DataMapFactory<T extends DataMap> {
   /**
    * Get the datamap for segmentid
    */
-  List<T> getDataMaps(Segment segment) throws IOException;
+  List<T> getDataMaps(Segment segment, ReadCommittedScope readCommittedScope) throws IOException;
 
   /**
    * Get datamaps for distributable object.
    */
-  List<T> getDataMaps(DataMapDistributable distributable) throws IOException;
+  List<T> getDataMaps(DataMapDistributable distributable, ReadCommittedScope readCommittedScope)
+      throws IOException;
 
   /**
    * Get all distributable objects of a segmentid

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
index 74469d7..850e08a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 
 /**
@@ -44,10 +45,12 @@ public class AndDataMapExprWrapper implements DataMapExprWrapper {
     this.resolverIntf = resolverIntf;
   }
 
-  @Override public List<ExtendedBlocklet> prune(List<Segment> segments,
-      List<PartitionSpec> partitionsToPrune) throws IOException {
-    List<ExtendedBlocklet> leftPrune = left.prune(segments, partitionsToPrune);
-    List<ExtendedBlocklet> rightPrune = right.prune(segments, partitionsToPrune);
+  @Override
+  public List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune,
+      ReadCommittedScope readCommittedScope) throws IOException {
+    List<ExtendedBlocklet> leftPrune = left.prune(segments, partitionsToPrune, readCommittedScope);
+    List<ExtendedBlocklet> rightPrune =
+        right.prune(segments, partitionsToPrune, readCommittedScope);
     List<ExtendedBlocklet> andBlocklets = new ArrayList<>();
     for (ExtendedBlocklet blocklet : leftPrune) {
       if (rightPrune.contains(blocklet)) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
index 14cfc33..b5fb173 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 
 /**
@@ -36,7 +37,8 @@ public interface DataMapExprWrapper extends Serializable {
    * It get the blocklets from each leaf node datamap and apply expressions on the blocklets
    * using list of segments, it is used in case on non distributable datamap.
    */
-  List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune)
+  List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune,
+      ReadCommittedScope readCommittedScope)
       throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
index c6b011c..ffd4f80 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 
 public class DataMapExprWrapperImpl implements DataMapExprWrapper {
@@ -45,9 +46,10 @@ public class DataMapExprWrapperImpl implements DataMapExprWrapper {
     this.uniqueId = UUID.randomUUID().toString();
   }
 
-  @Override public List<ExtendedBlocklet> prune(List<Segment> segments,
-      List<PartitionSpec> partitionsToPrune) throws IOException {
-    return dataMap.prune(segments, expression, partitionsToPrune);
+  @Override
+  public List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune,
+      ReadCommittedScope readCommittedScope) throws IOException {
+    return dataMap.prune(segments, expression, partitionsToPrune, readCommittedScope);
   }
 
   @Override public List<ExtendedBlocklet> pruneBlocklets(List<ExtendedBlocklet> blocklets)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
index 37cd5dd..0667d0a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 
 /**
@@ -46,10 +47,12 @@ public class OrDataMapExprWrapper implements DataMapExprWrapper {
     this.resolverIntf = resolverIntf;
   }
 
-  @Override public List<ExtendedBlocklet> prune(List<Segment> segments,
-      List<PartitionSpec> partitionsToPrune) throws IOException {
-    List<ExtendedBlocklet> leftPrune = left.prune(segments, partitionsToPrune);
-    List<ExtendedBlocklet> rightPrune = right.prune(segments, partitionsToPrune);
+  @Override
+  public List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune,
+      ReadCommittedScope readCommittedScope) throws IOException {
+    List<ExtendedBlocklet> leftPrune = left.prune(segments, partitionsToPrune, readCommittedScope);
+    List<ExtendedBlocklet> rightPrune =
+        right.prune(segments, partitionsToPrune, readCommittedScope);
     Set<ExtendedBlocklet> andBlocklets = new HashSet<>();
     andBlocklets.addAll(leftPrune);
     andBlocklets.addAll(rightPrune);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
index 58c11db..cf283f2 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 
 /**
  * Fetches the detailed blocklet which has more information to execute the query
@@ -31,10 +32,12 @@ public interface BlockletDetailsFetcher {
    *
    * @param blocklets
    * @param segment
+   * @param readCommittedScope
    * @return
    * @throws IOException
    */
-  List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, Segment segment)
+  List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, Segment segment,
+      ReadCommittedScope readCommittedScope)
       throws IOException;
 
   /**
@@ -42,17 +45,21 @@ public interface BlockletDetailsFetcher {
    *
    * @param blocklet
    * @param segment
+   * @param readCommittedScope
    * @return
    * @throws IOException
    */
-  ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment) throws IOException;
+  ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment,
+      ReadCommittedScope readCommittedScope) throws IOException;
 
   /**
    * Get all the blocklets in a segment
    *
    * @param segment
+   * @param readCommittedScope
    * @return
    */
-  List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions)
+  List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions,
+      ReadCommittedScope readCommittedScope)
       throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
index 6f94be5..d464083 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 
 /**
  * Fetches the detailed segmentProperties which has more information to execute the query
@@ -30,8 +31,10 @@ public interface SegmentPropertiesFetcher {
   /**
    * get the Segment properties based on the SegmentID.
    * @param segmentId
+   * @param readCommittedScope
    * @return
    * @throws IOException
    */
-  SegmentProperties getSegmentProperties(Segment segment) throws IOException;
+  SegmentProperties getSegmentProperties(Segment segment, ReadCommittedScope readCommittedScope)
+      throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 9674958..2425c4c 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -44,6 +44,7 @@ import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
@@ -82,29 +83,23 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
     throw new UnsupportedOperationException("not implemented");
   }
 
-  @Override
-  public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException {
+  @Override public List<CoarseGrainDataMap> getDataMaps(Segment segment,
+      ReadCommittedScope readCommittedScope) throws IOException {
     List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
-        getTableBlockIndexUniqueIdentifiers(segment);
+        getTableBlockIndexUniqueIdentifiers(segment, readCommittedScope);
     return cache.getAll(tableBlockIndexUniqueIdentifiers);
   }
 
-  private List<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(
-      Segment segment) throws IOException {
+  private List<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(Segment segment,
+      ReadCommittedScope readCommittedScope) throws IOException {
+    if (readCommittedScope == null) {
+      throw new IOException("readCommittedScope is null. Internal error");
+    }
     List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
         segmentMap.get(segment.getSegmentNo());
     if (tableBlockIndexUniqueIdentifiers == null) {
       tableBlockIndexUniqueIdentifiers = new ArrayList<>();
-      Map<String, String> indexFiles;
-      if (segment.getSegmentFileName() == null) {
-        String path =
-            CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
-        indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(path);
-      } else {
-        SegmentFileStore fileStore =
-            new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
-        indexFiles = fileStore.getIndexFiles();
-      }
+      Map<String, String> indexFiles = readCommittedScope.getCommittedIndexFile(segment);
       for (Map.Entry<String, String> indexFileEntry: indexFiles.entrySet()) {
         Path indexFile = new Path(indexFileEntry.getKey());
         tableBlockIndexUniqueIdentifiers.add(
@@ -122,7 +117,8 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
    * default datamap.
    */
   @Override
-  public List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, Segment segment)
+  public List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, Segment segment,
+      ReadCommittedScope readCommittedScope)
       throws IOException {
     List<ExtendedBlocklet> detailedBlocklets = new ArrayList<>();
     // If it is already detailed blocklet then type cast and return same
@@ -133,7 +129,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
       return detailedBlocklets;
     }
     List<TableBlockIndexUniqueIdentifier> identifiers =
-        getTableBlockIndexUniqueIdentifiers(segment);
+        getTableBlockIndexUniqueIdentifiers(segment, readCommittedScope);
     // Retrieve each blocklets detail information from blocklet datamap
     for (Blocklet blocklet : blocklets) {
       detailedBlocklets.add(getExtendedBlocklet(identifiers, blocklet));
@@ -142,13 +138,14 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
   }
 
   @Override
-  public ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment)
+  public ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment,
+      ReadCommittedScope readCommittedScope)
       throws IOException {
     if (blocklet instanceof ExtendedBlocklet) {
       return (ExtendedBlocklet) blocklet;
     }
     List<TableBlockIndexUniqueIdentifier> identifiers =
-        getTableBlockIndexUniqueIdentifiers(segment);
+        getTableBlockIndexUniqueIdentifiers(segment, readCommittedScope);
     return getExtendedBlocklet(identifiers, blocklet);
   }
 
@@ -228,7 +225,8 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
   }
 
   @Override
-  public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
+  public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable,
+      ReadCommittedScope readCommittedScope)
       throws IOException {
     BlockletDataMapDistributable mapDistributable = (BlockletDataMapDistributable) distributable;
     List<TableBlockIndexUniqueIdentifier> identifiers = new ArrayList<>();
@@ -264,8 +262,9 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
     return null;
   }
 
-  @Override public SegmentProperties getSegmentProperties(Segment segment) throws IOException {
-    List<CoarseGrainDataMap> dataMaps = getDataMaps(segment);
+  @Override public SegmentProperties getSegmentProperties(Segment segment,
+      ReadCommittedScope readCommittedScope) throws IOException {
+    List<CoarseGrainDataMap> dataMaps = getDataMaps(segment, readCommittedScope);
     assert (dataMaps.size() > 0);
     CoarseGrainDataMap coarseGrainDataMap = dataMaps.get(0);
     assert (coarseGrainDataMap instanceof BlockletDataMap);
@@ -273,12 +272,13 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
     return dataMap.getSegmentProperties();
   }
 
-  @Override public List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions)
-      throws IOException {
+  @Override public List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions,
+      ReadCommittedScope readCommittedScope) throws IOException {
     List<Blocklet> blocklets = new ArrayList<>();
-    List<CoarseGrainDataMap> dataMaps = getDataMaps(segment);
+    List<CoarseGrainDataMap> dataMaps = getDataMaps(segment, readCommittedScope);
     for (CoarseGrainDataMap dataMap : dataMaps) {
-      blocklets.addAll(dataMap.prune(null, getSegmentProperties(segment), partitions));
+      blocklets.addAll(
+          dataMap.prune(null, getSegmentProperties(segment, readCommittedScope), partitions));
     }
     return blocklets;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
index 54814cd..8692f13 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
@@ -80,14 +80,13 @@ public class SchemaReader {
         identifier.getTablePath());
   }
 
-
-  public static TableInfo inferSchema(AbsoluteTableIdentifier identifier)
-      throws IOException {
+  public static TableInfo inferSchema(AbsoluteTableIdentifier identifier,
+      boolean isCarbonFileProvider) throws IOException {
     // This routine is going to infer schema from the carbondata file footer
     // Convert the ColumnSchema -> TableSchema -> TableInfo.
     // Return the TableInfo.
-    org.apache.carbondata.format.TableInfo tableInfo =
-        CarbonUtil.inferSchema(identifier.getTablePath(), identifier.getTableName());
+    org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil
+        .inferSchema(identifier.getTablePath(), identifier.getTableName(), isCarbonFileProvider);
     SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
     TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
         tableInfo, identifier.getDatabaseName(), identifier.getTableName(),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 9d50048..d3eab6c 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -143,6 +143,14 @@ public class CarbonTable implements Serializable {
 
   private boolean hasDataMapSchema;
 
+  /**
+   * The boolean field which points if the data written for UnManaged Table
+   * or Managed Table. The difference between managed and unManaged table is
+   * unManaged Table will not contain any Metadata folder and subsequently
+   * no TableStatus or Schema files.
+   */
+  private boolean isUnManagedTable;
+
   private CarbonTable() {
     this.tableDimensionsMap = new HashMap<String, List<CarbonDimension>>();
     this.tableImplicitDimensionsMap = new HashMap<String, List<CarbonDimension>>();
@@ -241,6 +249,7 @@ public class CarbonTable implements Serializable {
     table.blockSize = tableInfo.getTableBlockSizeInMB();
     table.tableLastUpdatedTime = tableInfo.getLastUpdatedTime();
     table.tableUniqueName = tableInfo.getTableUniqueName();
+    table.setUnManagedTable(tableInfo.isUnManagedTable());
     table.fillDimensionsAndMeasuresForTables(tableInfo.getFactTable());
     table.fillCreateOrderColumn(tableInfo.getFactTable().getTableName());
     if (tableInfo.getFactTable().getBucketingInfo() != null) {
@@ -990,4 +999,12 @@ public class CarbonTable implements Serializable {
   public static CarbonTableBuilder builder() {
     return new CarbonTableBuilder();
   }
+
+  public boolean isUnManagedTable() {
+    return isUnManagedTable;
+  }
+
+  public void setUnManagedTable(boolean unManagedTable) {
+    isUnManagedTable = unManagedTable;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java
index 27808f8..82d0246 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java
@@ -28,16 +28,15 @@ public class CarbonTableBuilder {
   private String tableName;
   private String databaseName;
   private String tablePath;
+  private boolean unManagedTable;
   private TableSchema tableSchema;
 
   public CarbonTableBuilder tableName(String tableName) {
-    Objects.requireNonNull(tableName, "tableName should not be null");
     this.tableName = tableName;
     return this;
   }
 
   public CarbonTableBuilder databaseName(String databaseName) {
-    Objects.requireNonNull(databaseName, "databaseName should not be null");
     this.databaseName = databaseName;
     return this;
   }
@@ -48,6 +47,13 @@ public class CarbonTableBuilder {
     return this;
   }
 
+
+  public CarbonTableBuilder isUnManagedTable(boolean isUnManagedTable) {
+    Objects.requireNonNull(isUnManagedTable, "UnManaged Table should not be null");
+    this.unManagedTable = isUnManagedTable;
+    return this;
+  }
+
   public CarbonTableBuilder tableSchema(TableSchema tableSchema) {
     Objects.requireNonNull(tableSchema, "tableSchema should not be null");
     this.tableSchema = tableSchema;
@@ -55,16 +61,17 @@ public class CarbonTableBuilder {
   }
 
   public CarbonTable build() {
-    Objects.requireNonNull(tableName, "tableName should not be null");
-    Objects.requireNonNull(databaseName, "databaseName should not be null");
     Objects.requireNonNull(tablePath, "tablePath should not be null");
     Objects.requireNonNull(tableSchema, "tableSchema should not be null");
+    Objects.requireNonNull(unManagedTable, "UnManaged Table should not be null");
+
 
     TableInfo tableInfo = new TableInfo();
     tableInfo.setDatabaseName(databaseName);
     tableInfo.setTableUniqueName(databaseName + "_" + tableName);
     tableInfo.setFactTable(tableSchema);
     tableInfo.setTablePath(tablePath);
+    tableInfo.setUnManagedTable(unManagedTable);
     tableInfo.setLastUpdatedTime(System.currentTimeMillis());
     tableInfo.setDataMapSchemaList(new ArrayList<DataMapSchema>(0));
     return CarbonTable.buildFromTableInfo(tableInfo);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
index 47e09d0..3e7ea62 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
@@ -77,6 +77,14 @@ public class TableInfo implements Serializable, Writable {
    */
   private String tablePath;
 
+  /**
+   * The boolean field which points if the data written for UnManaged Table
+   * or Managed Table. The difference between managed and unManaged table is
+   * unManaged Table will not contain any Metadata folder and subsequently
+   * no TableStatus or Schema files.
+   */
+  private boolean isUnManagedTable;
+
   // this identifier is a lazy field which will be created when it is used first time
   private AbsoluteTableIdentifier identifier;
 
@@ -240,6 +248,7 @@ public class TableInfo implements Serializable, Writable {
     factTable.write(out);
     out.writeLong(lastUpdatedTime);
     out.writeUTF(getOrCreateAbsoluteTableIdentifier().getTablePath());
+    out.writeBoolean(isUnManagedTable);
     boolean isChildSchemaExists =
         null != dataMapSchemaList && dataMapSchemaList.size() > 0;
     out.writeBoolean(isChildSchemaExists);
@@ -267,6 +276,7 @@ public class TableInfo implements Serializable, Writable {
     this.factTable.readFields(in);
     this.lastUpdatedTime = in.readLong();
     this.tablePath = in.readUTF();
+    this.isUnManagedTable = in.readBoolean();
     boolean isChildSchemaExists = in.readBoolean();
     this.dataMapSchemaList = new ArrayList<>();
     if (isChildSchemaExists) {
@@ -320,4 +330,11 @@ public class TableInfo implements Serializable, Writable {
     return parentRelationIdentifiers;
   }
 
+  public boolean isUnManagedTable() {
+    return isUnManagedTable;
+  }
+
+  public void setUnManagedTable(boolean unManagedTable) {
+    isUnManagedTable = unManagedTable;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
new file mode 100644
index 0000000..631d12c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.readcommitter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+/**
+ * This is a readCommittedScope for unmanaged carbon table
+ */
+@InterfaceAudience.Internal
+@InterfaceStability.Stable
+public class LatestFilesReadCommittedScope implements ReadCommittedScope {
+
+  private String carbonFilePath;
+  private ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot;
+  private LoadMetadataDetails[] loadMetadataDetails;
+  public LatestFilesReadCommittedScope(String path) {
+    this.carbonFilePath = path;
+    try {
+      takeCarbonIndexFileSnapShot();
+    } catch (IOException ex) {
+      throw new RuntimeException("Error while taking index snapshot", ex);
+    }
+  }
+
+  private void prepareLoadMetadata() {
+    int loadCount = 0;
+    Map<String, List<String>> snapshotMap =
+        this.readCommittedIndexFileSnapShot.getSegmentIndexFileMap();
+    LoadMetadataDetails[] loadMetadataDetailsArray = new LoadMetadataDetails[snapshotMap.size()];
+    String segmentID;
+    for (Map.Entry<String, List<String>> entry : snapshotMap.entrySet()) {
+      segmentID = entry.getKey();
+      LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
+      long timeSet;
+      try {
+        timeSet = Long.parseLong(segmentID);
+      } catch (NumberFormatException nu) {
+        timeSet = 0;
+      }
+      loadMetadataDetails.setLoadEndTime(timeSet);
+      loadMetadataDetails.setLoadStartTime(timeSet);
+      loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS);
+      loadMetadataDetails.setLoadName(segmentID);
+      loadMetadataDetailsArray[loadCount++] = loadMetadataDetails;
+    }
+    this.loadMetadataDetails = loadMetadataDetailsArray;
+  }
+
+  @Override public LoadMetadataDetails[] getSegmentList() throws IOException {
+    try {
+      if (loadMetadataDetails == null) {
+        takeCarbonIndexFileSnapShot();
+      }
+      return loadMetadataDetails;
+
+    } catch (IOException ex) {
+      throw new IOException("Problem encountered while reading the Table Status file.", ex);
+    }
+  }
+
+  @Override public Map<String, String> getCommittedIndexFile(Segment segment) throws IOException {
+    Map<String, String> indexFileStore = new HashMap<>();
+    Map<String, List<String>> snapShot = readCommittedIndexFileSnapShot.getSegmentIndexFileMap();
+    String segName;
+    if (segment.getSegmentNo() != null) {
+      segName = segment.getSegmentNo();
+    } else {
+      segName = segment.getSegmentFileName();
+    }
+    List<String> index = snapShot.get(segName);
+    for (String indexPath : index) {
+      indexFileStore.put(indexPath, null);
+    }
+    return indexFileStore;
+  }
+
+  private String getSegmentID(String carbonIndexFileName, String indexFilePath) {
+    if (indexFilePath.contains("/Fact/Part0/Segment_")) {
+      // This is CarbonFile case where the Index files are present inside the Segment Folder
+      // So the Segment has to be extracted from the path not from the CarbonIndex file.
+      String segString = indexFilePath.substring(0, indexFilePath.lastIndexOf("/") + 1);
+      String segName = segString
+          .substring(segString.lastIndexOf("_") + 1, segString.lastIndexOf("/"));
+      return segName;
+    } else {
+      String fileName = carbonIndexFileName;
+      String segId = fileName.substring(fileName.lastIndexOf("-") + 1, fileName.lastIndexOf("."));
+      return segId;
+    }
+  }
+
+  @Override public void takeCarbonIndexFileSnapShot() throws IOException {
+    // Read the current file Path get the list of indexes from the path.
+    CarbonFile file = FileFactory.getCarbonFile(carbonFilePath);
+    Map<String, List<String>> indexFileStore = new HashMap<>();
+    if (file.isDirectory()) {
+      CarbonFile[] carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(carbonFilePath);
+      for (int i = 0; i < carbonIndexFiles.length; i++) {
+        // TODO. If Required to support merge index, then this code has to be modified.
+        // TODO. Nested File Paths.
+        if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+          // Get Segment Name from the IndexFile.
+          String segId =
+              getSegmentID(carbonIndexFiles[i].getName(), carbonIndexFiles[i].getAbsolutePath());
+          // TODO. During Partition table handling, place Segment File Name.
+          List<String> indexList;
+          if (indexFileStore.get(segId) == null) {
+            indexList = new ArrayList<>(1);
+          } else {
+            // Entry is already present.
+            indexList = indexFileStore.get(segId);
+          }
+          indexList.add(carbonIndexFiles[i].getAbsolutePath());
+          indexFileStore.put(segId, indexList);
+        }
+      }
+      ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot =
+          new ReadCommittedIndexFileSnapShot(indexFileStore);
+      this.readCommittedIndexFileSnapShot = readCommittedIndexFileSnapShot;
+      prepareLoadMetadata();
+    } else {
+      throw new IOException("Path is not pointing to directory");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java
new file mode 100644
index 0000000..4491a29
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedIndexFileSnapShot.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.readcommitter;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
+/**
+ * This class is going to save the the Index files which are taken snapshot
+ * from the readCommitter Interface.
+ */
+@InterfaceAudience.Internal
+@InterfaceStability.Evolving
+public class ReadCommittedIndexFileSnapShot {
+
+  /**
+   * Segment Numbers are mapped with list of Index Files.
+   */
+  private Map<String, List<String>> segmentIndexFileMap;
+
+  public ReadCommittedIndexFileSnapShot(Map<String, List<String>> segmentIndexFileMap) {
+    this.segmentIndexFileMap = segmentIndexFileMap;
+  }
+
+  public Map<String, List<String>> getSegmentIndexFileMap() {
+    return segmentIndexFileMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
new file mode 100644
index 0000000..f6ba78e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.readcommitter;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+
+/**
+ * ReadCommitted interface that defines a read scope.
+ */
+@InterfaceAudience.Internal
+@InterfaceStability.Stable
+public interface ReadCommittedScope {
+
+  public LoadMetadataDetails[] getSegmentList() throws IOException;
+
+  /**
+   * @param segment
+   * @return map of Absolute path of index file as key and null as value -- without mergeIndex
+   * map of AbsolutePath with fileName of MergeIndex parent file as key and mergeIndexFileName
+   *                                                             as value -- with mergeIndex
+   * @throws IOException
+   */
+  public Map<String, String> getCommittedIndexFile(Segment segment) throws IOException ;
+
+  public void takeCarbonIndexFileSnapShot() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
new file mode 100644
index 0000000..4f54241
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.readcommitter;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+/**
+ * ReadCommittedScope for the managed carbon table
+ */
+@InterfaceAudience.Internal
+@InterfaceStability.Stable
+public class TableStatusReadCommittedScope implements ReadCommittedScope {
+  private LoadMetadataDetails[] loadMetadataDetails;
+  private AbsoluteTableIdentifier identifier;
+
+  public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier) throws IOException {
+    this.identifier = identifier;
+    takeCarbonIndexFileSnapShot();
+  }
+
+  @Override public LoadMetadataDetails[] getSegmentList() throws IOException {
+    try {
+      if (loadMetadataDetails == null) {
+        takeCarbonIndexFileSnapShot();
+      }
+      return loadMetadataDetails;
+
+    } catch (IOException ex) {
+      throw new IOException("Problem encountered while reading the Table Status file.", ex);
+    }
+  }
+
+  @Override public Map<String, String> getCommittedIndexFile(Segment segment) throws IOException {
+    Map<String, String> indexFiles;
+    if (segment.getSegmentFileName() == null) {
+      String path =
+          CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
+      indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(path);
+    } else {
+      SegmentFileStore fileStore =
+          new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
+      indexFiles = fileStore.getIndexFiles();
+    }
+    return indexFiles;
+  }
+
+  @Override public void takeCarbonIndexFileSnapShot() throws IOException {
+    // Only Segment Information is updated.
+    // File information will be fetched on the fly according to the fecthed segment info.
+    this.loadMetadataDetails = SegmentStatusManager
+        .readTableStatusFile(CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 06c255e..676976a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -308,7 +308,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
             queryModel.getProjectionDimensions(), tableBlockDimensions,
             segmentProperties.getComplexDimensions(), queryModel.getProjectionMeasures().size());
     blockExecutionInfo.setBlockId(
-        CarbonUtil.getBlockId(queryModel.getAbsoluteTableIdentifier(), filePath, segmentId));
+        CarbonUtil.getBlockId(queryModel.getAbsoluteTableIdentifier(), filePath, segmentId,
+            queryModel.getTable().getTableInfo().isUnManagedTable()));
     blockExecutionInfo.setDeleteDeltaFilePath(deleteDeltaFiles);
     blockExecutionInfo.setStartBlockletIndex(startBlockletIndex);
     blockExecutionInfo.setNumberOfBlockletToScan(numberOfBlockletToScan);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 308fe30..0e2976a 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -240,7 +240,7 @@ public class SegmentUpdateStatusManager {
    * @throws Exception
    */
   public String[] getDeleteDeltaFilePath(String blockFilePath, String segmentId) throws Exception {
-    String blockId = CarbonUtil.getBlockId(identifier, blockFilePath, segmentId);
+    String blockId = CarbonUtil.getBlockId(identifier, blockFilePath, segmentId, false);
     String tupleId;
     if (isPartitionTable) {
       tupleId = CarbonTablePath.getShortBlockIdForPartitionTable(blockId);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 8e21d46..09692e6 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2336,10 +2336,14 @@ public final class CarbonUtil {
    *
    * @return table info containing the schema
    */
-  public static org.apache.carbondata.format.TableInfo inferSchema(
-      String carbonDataFilePath, String tableName) throws IOException {
-    List<String> filePaths =
-        getFilePathExternalFilePath(carbonDataFilePath + "/Fact/Part0/Segment_null");
+  public static org.apache.carbondata.format.TableInfo inferSchema(String carbonDataFilePath,
+      String tableName, boolean isCarbonFileProvider) throws IOException {
+    List<String> filePaths;
+    if (isCarbonFileProvider) {
+      filePaths = getFilePathExternalFilePath(carbonDataFilePath + "/Fact/Part0/Segment_null");
+    } else {
+      filePaths = getFilePathExternalFilePath(carbonDataFilePath);
+    }
     String fistFilePath = null;
     try {
       fistFilePath = filePaths.get(0);
@@ -2910,13 +2914,14 @@ public final class CarbonUtil {
    * @return
    */
   public static String getBlockId(AbsoluteTableIdentifier identifier, String filePath,
-      String segmentId) {
+      String segmentId, boolean isUnmangedTable) {
     String blockId;
     String blockName = filePath.substring(filePath.lastIndexOf("/") + 1, filePath.length());
     String tablePath = identifier.getTablePath();
+
     if (filePath.startsWith(tablePath)) {
       String factDir = CarbonTablePath.getFactDir(tablePath);
-      if (filePath.startsWith(factDir)) {
+      if (filePath.startsWith(factDir) || isUnmangedTable) {
         blockId = "Part0" + CarbonCommonConstants.FILE_SEPARATOR + "Segment_" + segmentId
             + CarbonCommonConstants.FILE_SEPARATOR + blockName;
       } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
index 45dee2a..a2f92c9 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactor
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
@@ -61,10 +62,13 @@ public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory {
    * getDataMaps Factory method Initializes the Min Max Data Map and returns.
    *
    * @param segment
+   * @param readCommittedScope
    * @return
    * @throws IOException
    */
-  @Override public List<CoarseGrainDataMap> getDataMaps(Segment segment)
+  @Override
+  public List<CoarseGrainDataMap> getDataMaps(Segment segment,
+      ReadCommittedScope readCommittedScope)
       throws IOException {
     List<CoarseGrainDataMap> dataMapList = new ArrayList<>();
     // Form a dataMap of Type MinMaxIndexDataMap.
@@ -101,7 +105,8 @@ public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory {
   @Override public void clear() {
   }
 
-  @Override public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
+  @Override public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable,
+      ReadCommittedScope readCommittedScope)
       throws IOException {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
index 04160c0..7308841 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 
 /**
  * FG level of lucene DataMap
@@ -43,7 +44,8 @@ public class LuceneCoarseGrainDataMapFactory extends LuceneDataMapFactoryBase<Co
    * Get the datamap for segmentid
    */
   @Override
-  public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException {
+  public List<CoarseGrainDataMap> getDataMaps(Segment segment,
+      ReadCommittedScope readCommittedScope) throws IOException {
     List<CoarseGrainDataMap> lstDataMap = new ArrayList<>();
     CoarseGrainDataMap dataMap = new LuceneCoarseGrainDataMap(analyzer);
     try {
@@ -62,9 +64,10 @@ public class LuceneCoarseGrainDataMapFactory extends LuceneDataMapFactoryBase<Co
    * Get datamaps for distributable object.
    */
   @Override
-  public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable)
+  public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable,
+      ReadCommittedScope readCommittedScope)
       throws IOException {
-    return getDataMaps(distributable.getSegment());
+    return getDataMaps(distributable.getSegment(), readCommittedScope);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
index a9376bc..23c9928 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
@@ -28,6 +28,7 @@ import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainDataMap;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 
 /**
  * CG level of lucene DataMap
@@ -38,8 +39,8 @@ public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<Fine
   /**
    * Get the datamap for segmentid
    */
-  @Override
-  public List<FineGrainDataMap> getDataMaps(Segment segment) throws IOException {
+  @Override public List<FineGrainDataMap> getDataMaps(Segment segment,
+      ReadCommittedScope readCommittedScope) throws IOException {
     List<FineGrainDataMap> lstDataMap = new ArrayList<>();
     FineGrainDataMap dataMap = new LuceneFineGrainDataMap(analyzer);
     try {
@@ -58,9 +59,10 @@ public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<Fine
    * Get datamaps for distributable object.
    */
   @Override
-  public List<FineGrainDataMap> getDataMaps(DataMapDistributable distributable)
+  public List<FineGrainDataMap> getDataMaps(DataMapDistributable distributable,
+      ReadCommittedScope readCommittedScope)
       throws IOException {
-    return getDataMaps(distributable.getSegment());
+    return getDataMaps(distributable.getSegment(), readCommittedScope);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index c352a95..214e534 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -36,6 +36,8 @@ import org.apache.carbondata.core.metadata.schema.SchemaReader;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.SingleTableProvider;
 import org.apache.carbondata.core.scan.filter.TableProvider;
@@ -78,7 +80,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
             .getSchemaFilePath(getAbsoluteTableIdentifier(configuration).getTablePath());
         if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) {
           TableInfo tableInfoInfer =
-              SchemaReader.inferSchema(getAbsoluteTableIdentifier(configuration));
+              SchemaReader.inferSchema(getAbsoluteTableIdentifier(configuration), true);
           localCarbonTable = CarbonTable.buildFromTableInfo(tableInfoInfer);
         } else {
           localCarbonTable =
@@ -114,6 +116,8 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
       // get all valid segments and set them into the configuration
       // check for externalTable segment (Segment_null)
       // process and resolve the expression
+      ReadCommittedScope readCommittedScope = new LatestFilesReadCommittedScope(
+          identifier.getTablePath() + "/Fact/Part0/Segment_null/");
       Expression filter = getFilterPredicates(job.getConfiguration());
       TableProvider tableProvider = new SingleTableProvider(carbonTable);
       // this will be null in case of corrupt schema file.
@@ -138,7 +142,8 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
         }
         // do block filtering and get split
         List<InputSplit> splits =
-            getSplits(job, filterInterface, externalTableSegments, null, partitionInfo, null);
+            getSplits(job, filterInterface, externalTableSegments, null, partitionInfo, null,
+                readCommittedScope);
 
         return splits;
       }
@@ -156,7 +161,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
    */
   private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterResolver,
       List<Segment> validSegments, BitSet matchedPartitions, PartitionInfo partitionInfo,
-      List<Integer> oldPartitionIdList) throws IOException {
+      List<Integer> oldPartitionIdList, ReadCommittedScope readCommittedScope) throws IOException {
 
     numSegments = validSegments.size();
     List<InputSplit> result = new LinkedList<InputSplit>();
@@ -170,7 +175,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
     // for each segment fetch blocks matching filter in Driver BTree
     List<CarbonInputSplit> dataBlocksOfSegment =
         getDataBlocksOfSegment(job, carbonTable, filterResolver, matchedPartitions,
-            validSegments, partitionInfo, oldPartitionIdList);
+            validSegments, partitionInfo, oldPartitionIdList, readCommittedScope);
     numBlocks = dataBlocksOfSegment.size();
     for (CarbonInputSplit inputSplit : dataBlocksOfSegment) {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index f85e0e9..be97d05 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -42,6 +42,7 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.SingleTableProvider;
 import org.apache.carbondata.core.scan.filter.TableProvider;
@@ -100,6 +101,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       "mapreduce.input.carboninputformat.filter.predicate";
   private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
   private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
+  private static final String UNMANAGED_TABLE = "mapreduce.input.carboninputformat.unmanaged";
   private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
   private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
   private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr";
@@ -160,6 +162,10 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     configuration.set(FileInputFormat.INPUT_DIR, tablePath);
   }
 
+  public static void setUnmanagedTable(Configuration configuration, boolean isUnmanagedTable) {
+    configuration.set(UNMANAGED_TABLE, String.valueOf(isUnmanagedTable));
+  }
+
   public static void setPartitionIdList(Configuration configuration, List<String> partitionIds) {
     configuration.set(ALTER_PARTITION_ID, partitionIds.toString());
   }
@@ -332,7 +338,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   protected List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job,
       CarbonTable carbonTable, FilterResolverIntf resolver,
       BitSet matchedPartitions, List<Segment> segmentIds, PartitionInfo partitionInfo,
-      List<Integer> oldPartitionIdList) throws IOException {
+      List<Integer> oldPartitionIdList, ReadCommittedScope readCommittedScope) throws IOException {
 
     QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
     QueryStatistic statistic = new QueryStatistic();
@@ -356,7 +362,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       // Apply expression on the blocklets.
       prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
     } else {
-      prunedBlocklets = dataMapExprWrapper.prune(segmentIds, partitionsToPrune);
+      prunedBlocklets = dataMapExprWrapper.prune(segmentIds, partitionsToPrune, readCommittedScope);
     }
 
     List<CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index b4a5c5e..06ada3d 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -45,6 +45,9 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
 import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.mutate.data.BlockMappingVO;
+import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
 import org.apache.carbondata.core.reader.CarbonIndexFileReader;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
@@ -89,10 +92,13 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
   private static final Log LOG = LogFactory.getLog(CarbonTableInputFormat.class);
   private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
   private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
+  private static final String CARBON_UNMANAGED_TABLE =
+      "mapreduce.input.carboninputformat.unmanaged";
   public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName";
   public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName";
   // a cache for carbon table, it will be used in task side
   private CarbonTable carbonTable;
+  private ReadCommittedScope readCommittedScope;
 
   /**
    * Get the cached CarbonTable or create it by TableInfo in `configuration`
@@ -128,12 +134,14 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
   @Override
   public List<InputSplit> getSplits(JobContext job) throws IOException {
     AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
-    LoadMetadataDetails[] loadMetadataDetails = SegmentStatusManager
-        .readTableStatusFile(CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()));
+
     CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
     if (null == carbonTable) {
       throw new IOException("Missing/Corrupt schema file for table.");
     }
+    this.readCommittedScope = getReadCommitted(job, identifier);
+    LoadMetadataDetails[] loadMetadataDetails = readCommittedScope.getSegmentList();
+
     SegmentUpdateStatusManager updateStatusManager =
         new SegmentUpdateStatusManager(carbonTable, loadMetadataDetails);
     List<Segment> invalidSegments = new ArrayList<>();
@@ -413,7 +421,13 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     List<Segment> segmentList = new ArrayList<>();
     segmentList.add(new Segment(targetSegment, null));
     setSegmentsToAccess(job.getConfiguration(), segmentList);
+
     try {
+      carbonTable = getOrCreateCarbonTable(job.getConfiguration());
+      ReadCommittedScope readCommittedScope =
+          getReadCommitted(job, carbonTable.getAbsoluteTableIdentifier());
+      this.readCommittedScope = readCommittedScope;
+
       // process and resolve the expression
       Expression filter = getFilterPredicates(job.getConfiguration());
       CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
@@ -508,7 +522,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     // for each segment fetch blocks matching filter in Driver BTree
     List<org.apache.carbondata.hadoop.CarbonInputSplit> dataBlocksOfSegment =
         getDataBlocksOfSegment(job, carbonTable, filterResolver, matchedPartitions,
-            validSegments, partitionInfo, oldPartitionIdList);
+            validSegments, partitionInfo, oldPartitionIdList, readCommittedScope);
     numBlocks = dataBlocksOfSegment.size();
     for (org.apache.carbondata.hadoop.CarbonInputSplit inputSplit : dataBlocksOfSegment) {
 
@@ -559,8 +573,10 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
       List<PartitionSpec> partitions) throws IOException {
     AbsoluteTableIdentifier identifier = table.getAbsoluteTableIdentifier();
     TableDataMap blockletMap = DataMapStoreManager.getInstance().getDefaultDataMap(table);
-    LoadMetadataDetails[] loadMetadataDetails = SegmentStatusManager
-        .readTableStatusFile(CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()));
+
+    ReadCommittedScope readCommittedScope = getReadCommitted(job, identifier);
+    LoadMetadataDetails[] loadMetadataDetails = readCommittedScope.getSegmentList();
+
     SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(
         table, loadMetadataDetails);
     SegmentStatusManager.ValidAndInvalidSegmentsInfo allSegments =
@@ -571,7 +587,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     // TODO: currently only batch segment is supported, add support for streaming table
     List<Segment> filteredSegment = getFilteredSegment(job, allSegments.getValidSegments(), false);
 
-    List<ExtendedBlocklet> blocklets = blockletMap.prune(filteredSegment, null, partitions);
+    List<ExtendedBlocklet> blocklets =
+        blockletMap.prune(filteredSegment, null, partitions, readCommittedScope);
     for (ExtendedBlocklet blocklet : blocklets) {
       String blockName = blocklet.getPath();
       blockName = CarbonTablePath.getCarbonDataFileName(blockName);
@@ -601,4 +618,18 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
 
     return new BlockMappingVO(blockRowCountMapping, segmentAndBlockCountMapping);
   }
-}
+
+  public ReadCommittedScope getReadCommitted(JobContext job, AbsoluteTableIdentifier identifier)
+      throws IOException {
+    if (readCommittedScope == null) {
+      ReadCommittedScope readCommittedScope;
+      if (job.getConfiguration().getBoolean(CARBON_UNMANAGED_TABLE, false)) {
+        readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath());
+      } else {
+        readCommittedScope = new TableStatusReadCommittedScope(identifier);
+      }
+      this.readCommittedScope = readCommittedScope;
+    }
+    return readCommittedScope;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
index 3f19a3f..deeeabe 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
@@ -30,6 +30,8 @@ import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
 
@@ -101,16 +103,17 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
       private Iterator<ExtendedBlocklet> blockletIterator;
       private ExtendedBlocklet currBlocklet;
 
-      @Override
-      public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+      @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
           throws IOException, InterruptedException {
         DataMapDistributableWrapper distributable = (DataMapDistributableWrapper) inputSplit;
         TableDataMap dataMap = DataMapStoreManager.getInstance()
             .getDataMap(table, distributable.getDistributable().getDataMapSchema());
-        List<ExtendedBlocklet> blocklets = dataMap.prune(
-            distributable.getDistributable(),
-            dataMapExprWrapper.getFilterResolverIntf(distributable.getUniqueId()), partitions);
-        for (ExtendedBlocklet blocklet: blocklets) {
+        ReadCommittedScope readCommittedScope =
+            new TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier());
+        List<ExtendedBlocklet> blocklets = dataMap.prune(distributable.getDistributable(),
+            dataMapExprWrapper.getFilterResolverIntf(distributable.getUniqueId()), partitions,
+            readCommittedScope);
+        for (ExtendedBlocklet blocklet : blocklets) {
           blocklet.setDataMapUniqueId(distributable.getUniqueId());
         }
         blockletIterator = blocklets.iterator();