You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/06/22 01:34:30 UTC

[29/50] [abbrv] carbondata git commit: [CARBONDATA-2428] Support flat folder for managed carbon table

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
index 99b536c..b4937e6 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
@@ -17,12 +17,15 @@
 
 package org.apache.carbondata.spark.testsuite.datacompaction
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
-import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
 
@@ -61,7 +64,7 @@ class CarbonIndexFileMergeTestCase
       """.stripMargin)
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE indexmerge OPTIONS('header'='false', " +
         s"'GLOBAL_SORT_PARTITIONS'='100')")
-    val table = CarbonMetadata.getInstance().getCarbonTable("default","indexmerge")
+    val table = CarbonMetadata.getInstance().getCarbonTable("default", "indexmerge")
     new CarbonIndexFileMergeWriter(table)
       .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
     assert(getIndexFileCount("default_indexmerge", "0") == 0)
@@ -84,7 +87,7 @@ class CarbonIndexFileMergeTestCase
     val rows = sql("""Select count(*) from nonindexmerge""").collect()
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
-    val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
+    val table = CarbonMetadata.getInstance().getCarbonTable("default", "nonindexmerge")
     new CarbonIndexFileMergeWriter(table)
       .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
     new CarbonIndexFileMergeWriter(table)
@@ -109,7 +112,7 @@ class CarbonIndexFileMergeTestCase
     val rows = sql("""Select count(*) from nonindexmerge""").collect()
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
-    val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
+    val table = CarbonMetadata.getInstance().getCarbonTable("default", "nonindexmerge")
     new CarbonIndexFileMergeWriter(table)
       .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
     new CarbonIndexFileMergeWriter(table)
@@ -138,7 +141,7 @@ class CarbonIndexFileMergeTestCase
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
     sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
-    val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
+    val table = CarbonMetadata.getInstance().getCarbonTable("default", "nonindexmerge")
     new CarbonIndexFileMergeWriter(table)
       .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false)
     assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
@@ -167,7 +170,7 @@ class CarbonIndexFileMergeTestCase
     assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
     sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
-    val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
+    val table = CarbonMetadata.getInstance().getCarbonTable("default", "nonindexmerge")
     new CarbonIndexFileMergeWriter(table)
       .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false)
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
@@ -190,18 +193,32 @@ class CarbonIndexFileMergeTestCase
     sql("select * from mitable").show()
   }
 
-  private def getIndexFileCount(tableName: String, segment: String): Int = {
-    val table = CarbonMetadata.getInstance().getCarbonTable(tableName)
-    val path = CarbonTablePath
-      .getSegmentPath(table.getAbsoluteTableIdentifier.getTablePath, segment)
-    val carbonFiles = FileFactory.getCarbonFile(path).listFiles(new CarbonFileFilter {
-      override def accept(file: CarbonFile): Boolean = file.getName.endsWith(CarbonTablePath
-        .INDEX_FILE_EXT)
-    })
-    if (carbonFiles != null) {
-      carbonFiles.length
+  private def getIndexFileCount(tableName: String, segmentNo: String): Int = {
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableName)
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNo)
+    if (FileFactory.isFileExist(segmentDir)) {
+      val indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir)
+      indexFiles.asScala.map { f =>
+        if (f._2 == null) {
+          1
+        } else {
+          0
+        }
+      }.sum
     } else {
-      0
+      val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath)
+      if (segment != null) {
+        val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
+        store.getSegmentFile.getLocationMap.values().asScala.map { f =>
+          if (f.getMergeFileName == null) {
+            f.getFiles.size()
+          } else {
+            0
+          }
+        }.sum
+      } else {
+        0
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
index 8f891ce..d49b962 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
@@ -22,9 +22,12 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
 
 class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
   val filePath: String = s"$resourcesPath/globalsort"
@@ -527,8 +530,12 @@ class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAf
 
   private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName)
-    val store = carbonTable.getAbsoluteTableIdentifier.getTablePath + "/Fact/Part0/Segment_" +
-                segmentNo
-    new SegmentIndexFileStore().getIndexFilesFromSegment(store).size()
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNo)
+    if (FileFactory.isFileExist(segmentDir)) {
+      new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
+    } else {
+      val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath)
+      new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName).getIndexCarbonFiles.size()
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
index 2da1ada..54c19c2 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
@@ -23,8 +23,11 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 
 class CompactionSupportGlobalSortParameterTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
   val filePath: String = s"$resourcesPath/globalsort"
@@ -567,8 +570,12 @@ class CompactionSupportGlobalSortParameterTest extends QueryTest with BeforeAndA
 
   private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName)
-    val store = carbonTable.getAbsoluteTableIdentifier.getTablePath + "/Fact/Part0/Segment_" +
-                segmentNo
-    new SegmentIndexFileStore().getIndexFilesFromSegment(store).size()
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNo)
+    if (FileFactory.isFileExist(segmentDir)) {
+      new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
+    } else {
+      val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath)
+      new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName).getIndexCarbonFiles.size()
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
index f3e12d1..c695b05 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
@@ -26,8 +26,10 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.spark.sql.test.util.QueryTest
 
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.path.CarbonTablePath
 
@@ -189,12 +191,14 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll {
   }
 
   def getIndexfileCount(tableName: String, segmentNo: String = "0"): Int = {
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
-      CarbonCommonConstants.DATABASE_DEFAULT_NAME,
-      tableName
-    )
-    val segmentDir = carbonTable.getSegmentPath(segmentNo)
-    new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName)
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNo)
+    if (FileFactory.isFileExist(segmentDir)) {
+      new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
+    } else {
+      val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath)
+      new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName).getIndexCarbonFiles.size()
+    }
   }
 
   override def afterAll {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
index b9d8e12..39785a3 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.spark.testsuite.dataload
 
+import scala.collection.JavaConverters._
+
 import java.io.{File, FilenameFilter}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -26,7 +28,9 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
 
 class TestDataLoadWithFileName extends QueryTest with BeforeAndAfterAll {
   var originVersion = ""
@@ -49,12 +53,20 @@ class TestDataLoadWithFileName extends QueryTest with BeforeAndAfterAll {
     val indexReader = new CarbonIndexFileReader()
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "test_table_v3")
     val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, "0")
-    val carbonIndexPaths = new File(segmentDir)
-      .listFiles(new FilenameFilter {
-        override def accept(dir: File, name: String): Boolean = {
-          name.endsWith(CarbonTablePath.getCarbonIndexExtension)
-        }
-      })
+
+    val carbonIndexPaths = if (FileFactory.isFileExist(segmentDir)) {
+      new File(segmentDir)
+        .listFiles(new FilenameFilter {
+          override def accept(dir: File, name: String): Boolean = {
+            name.endsWith(CarbonTablePath.getCarbonIndexExtension)
+          }
+        })
+    } else {
+      val segment = Segment.getSegment("0", carbonTable.getTablePath)
+      val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
+      store.readIndexFiles()
+      store.getIndexCarbonFiles.asScala.map(f => new File(f.getAbsolutePath)).toArray
+    }
     for (carbonIndexPath <- carbonIndexPaths) {
       indexReader.openThriftReader(carbonIndexPath.getCanonicalPath)
       assert(indexReader.readIndexHeader().getVersion === 3)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index bba75ad..d7b1172 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.spark.testsuite.dataload
 
+import scala.collection.JavaConverters._
+
 import java.io.{File, FileWriter}
 
 import org.apache.commons.io.FileUtils
@@ -31,8 +33,10 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
 import org.apache.carbondata.core.util.path.CarbonTablePath
 
@@ -273,7 +277,15 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
     sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE carbon_globalsort")
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "carbon_globalsort")
     val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, "0")
-    assertResult(Math.max(7, defaultParallelism) + 1)(new File(segmentDir).listFiles().length)
+    if (FileFactory.isFileExist(segmentDir)) {
+      assertResult(Math.max(7, defaultParallelism) + 1)(new File(segmentDir).listFiles().length)
+    } else {
+      val segment = Segment.getSegment("0", carbonTable.getTablePath)
+      val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
+      store.readIndexFiles()
+      val size = store.getIndexFilesMap.asScala.map(f => f._2.size()).sum
+      assertResult(Math.max(7, defaultParallelism) + 1)(size + store.getIndexFilesMap.size())
+    }
   }
 
   test("Query with small files") {
@@ -379,6 +391,11 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
   private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName)
     val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNo)
-    new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
+    if (FileFactory.isFileExist(segmentDir)) {
+      new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
+    } else {
+      val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath)
+      new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName).getIndexCarbonFiles.size()
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index b5c3df1..074c807 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -64,8 +64,9 @@ class CGDataMapFactory(
    * Get the datamap for segmentid
    */
   override def getDataMaps(segment: Segment): java.util.List[CoarseGrainDataMap] = {
-    val path = CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo)
-    val file = FileFactory.getCarbonFile(path+ "/" +dataMapSchema.getDataMapName)
+    val path = identifier.getTablePath
+    val file = FileFactory.getCarbonFile(
+      path+ "/" +dataMapSchema.getDataMapName + "/" + segment.getSegmentNo)
 
     val files = file.listFiles()
     files.map {f =>
@@ -100,8 +101,9 @@ class CGDataMapFactory(
    * @return
    */
   override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = {
-    val path = CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo)
-    val file = FileFactory.getCarbonFile(path+ "/" +dataMapSchema.getDataMapName)
+    val path = identifier.getTablePath
+    val file = FileFactory.getCarbonFile(
+      path+ "/" +dataMapSchema.getDataMapName + "/" + segment.getSegmentNo)
 
     val files = file.listFiles()
     files.map { f =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index 2d666c3..08d8911 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -89,8 +89,9 @@ class FGDataMapFactory(carbonTable: CarbonTable,
    * @return
    */
   override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = {
-    val path = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segment.getSegmentNo)
-    val file = FileFactory.getCarbonFile(path+ "/" +dataMapSchema.getDataMapName)
+    val path = carbonTable.getTablePath
+    val file = FileFactory.getCarbonFile(
+      path+ "/" +dataMapSchema.getDataMapName + "/" + segment.getSegmentNo)
 
     val files = file.listFiles()
     files.map { f =>
@@ -416,7 +417,6 @@ class FGDataMapWriter(carbonTable: CarbonTable,
     stream.write(bytes)
     stream.writeInt(bytes.length)
     stream.close()
-//    commitFile(fgwritepath)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
index eaa2ae7..642607c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.spark.testsuite.datamap
 
+import scala.collection.JavaConverters._
+
 import java.io.{File, FilenameFilter}
 
 import org.apache.spark.sql.Row
@@ -26,7 +28,9 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.common.exceptions.MetadataProcessException
 import org.apache.carbondata.common.exceptions.sql.{MalformedDataMapCommandException, NoSuchDataMapException}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 
@@ -261,12 +265,21 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
          |    group by name
        """.stripMargin)
     assertResult(true)(new File(path).exists())
-    assertResult(true)(new File(s"${CarbonTablePath.getSegmentPath(path, "0")}")
-      .list(new FilenameFilter {
-        override def accept(dir: File, name: String): Boolean = {
-          name.contains(CarbonCommonConstants.FACT_FILE_EXT)
-        }
-      }).length > 0)
+    if (FileFactory.isFileExist(CarbonTablePath.getSegmentPath(path, "0"))) {
+      assertResult(true)(new File(s"${CarbonTablePath.getSegmentPath(path, "0")}")
+         .list(new FilenameFilter {
+           override def accept(dir: File, name: String): Boolean = {
+             name.contains(CarbonCommonConstants.FACT_FILE_EXT)
+           }
+         }).length > 0)
+    } else {
+      val segment = Segment.getSegment("0", path)
+      val store = new SegmentFileStore(path, segment.getSegmentFileName)
+      store.readIndexFiles()
+      val size = store.getIndexFilesMap.asScala.map(f => f._2.size()).sum
+      assertResult(true)(size > 0)
+    }
+
     checkAnswer(sql("select name,avg(salary) from main group by name"), Row("amy", 13.0))
     checkAnswer(sql("select * from main_preagg"), Row("amy", 26, 2))
     sql("drop datamap preagg on table main")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
new file mode 100644
index 0000000..d786d10
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.flatfolder
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class FlatFolderTableLoadingTestCase extends QueryTest with BeforeAndAfterAll {
+  override def beforeAll {
+    dropTable
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    sql(
+      """
+        | CREATE TABLE originTable (empno int, empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+  }
+
+  def validateDataFiles(tableUniqueName: String, segmentId: String): Unit = {
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
+    val files = FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles()
+    assert(files.exists(_.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT)))
+  }
+
+  test("data loading for flat folder with global sort") {
+    sql(
+      """
+        | CREATE TABLE flatfolder_gs (empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int,empno int)
+        | STORED BY 'org.apache.carbondata.format' tblproperties('sort_scope'='global_sort', 'flat_folder'='true')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE flatfolder_gs OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    validateDataFiles("default_flatfolder_gs", "0")
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from flatfolder_gs order by empno"),
+      sql("select  empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+
+  }
+
+  test("data loading for flat folder") {
+    sql(
+      """
+        | CREATE TABLE flatfolder (empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int,empno int)
+        | STORED BY 'org.apache.carbondata.format' tblproperties('flat_folder'='true')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE flatfolder OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    validateDataFiles("default_flatfolder", "0")
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from flatfolder order by empno"),
+      sql("select  empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+
+  }
+
+  test("data loading for flat folder pre-agg") {
+    sql(
+      """
+        | CREATE TABLE flatfolder_preagg (empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int,empno int)
+        | STORED BY 'org.apache.carbondata.format' tblproperties('flat_folder'='true')
+      """.stripMargin)
+    sql("create datamap p2 on table flatfolder_preagg using 'preaggregate' as select empname, designation, min(salary) from flatfolder_preagg group by empname, designation ")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE flatfolder_preagg OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    validateDataFiles("default_flatfolder_preagg", "0")
+    validateDataFiles("default_flatfolder_preagg_p2", "0")
+
+    checkAnswer(sql("select empname, designation, min(salary) from flatfolder_preagg group by empname, designation"),
+      sql("select empname, designation, min(salary) from originTable group by empname, designation"))
+
+  }
+
+  override def afterAll = {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION ,
+      CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)
+    dropTable
+  }
+
+  def dropTable = {
+    sql("drop table if exists originTable")
+    sql("drop table if exists flatfolder")
+    sql("drop table if exists flatfolder_gs")
+    sql("drop table if exists flatfolder_preagg")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index ec39f66..2432715 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -26,6 +26,9 @@ import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOp
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.spark.sql.test.util.QueryTest
 
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
 class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
   override def beforeAll {
 
@@ -689,7 +692,12 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
                      CarbonCommonConstants.FILE_SEPARATOR + "t" +
                      CarbonCommonConstants.FILE_SEPARATOR + "Fact" +
                      CarbonCommonConstants.FILE_SEPARATOR + "Part0")
-    assert(f.list().length == 2)
+    if (!FileFactory.isFileExist(
+      CarbonTablePath.getSegmentFilesLocation(
+        dblocation + CarbonCommonConstants.FILE_SEPARATOR +
+        CarbonCommonConstants.FILE_SEPARATOR + "t"))) {
+      assert(f.list().length == 2)
+    }
   }
   test("test sentences func in update statement") {
     sql("drop table if exists senten")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
index 0eaaec5..133454a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
@@ -16,17 +16,22 @@
  */
 package org.apache.carbondata.spark.testsuite.partition
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.TestQueryExecutor
 import org.scalatest.BeforeAndAfterAll
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.spark.sql.test.util.QueryTest
 
+import org.apache.carbondata.core.datamap.Segment
+
 class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll {
@@ -62,12 +67,20 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll
   def validateDataFiles(tableUniqueName: String, segmentId: String, partitions: Seq[Int]): Unit = {
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
     val segmentDir = carbonTable.getSegmentPath(segmentId)
-    val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
-    val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
-      override def accept(file: CarbonFile): Boolean = {
-        return file.getName.endsWith(".carbondata")
-      }
-    })
+
+    val dataFiles = if (FileFactory.isFileExist(segmentDir)) {
+      val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
+      carbonFile.listFiles(new CarbonFileFilter() {
+        override def accept(file: CarbonFile): Boolean = {
+          return file.getName.endsWith(".carbondata")
+        }
+      })
+    } else {
+      val segment = Segment.getSegment(segmentId, carbonTable.getTablePath)
+      val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
+      store.readIndexFiles()
+      store.getIndexFilesMap.asScala.flatMap(_._2.asScala).map(f => FileFactory.getCarbonFile(f)).toArray
+    }
 
     assert(dataFiles.size == partitions.size)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 0422239..f443214 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -41,7 +41,7 @@ import org.apache.carbondata.processing.sort.sortdata.SortParameters
 import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory}
 import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataProcessorUtil}
 import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow}
-import org.apache.carbondata.spark.util.Util
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
 
 object DataLoadProcessorStepOnSpark {
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index 2ba6e5e..3aaf0ae 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -28,6 +28,7 @@ import org.apache.spark.{Partition, SparkContext}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.execution.command.CarbonMergerMapping
 
+import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
 import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
@@ -74,7 +75,8 @@ class CarbonIUDMergerRDD[K, V](
     val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
 
     // group blocks by segment.
-    val splitsGroupedMySegment = carbonInputSplits.groupBy(_.getSegmentId)
+    val splitsGroupedMySegment =
+      carbonInputSplits.groupBy(_.getSegmentId)
 
     var i = -1
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index d29284f..2fca57e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.block._
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
@@ -133,7 +134,7 @@ class CarbonMergerRDD[K, V](
             .toList
         }
         mergeNumber = if (CompactionType.IUD_UPDDEL_DELTA == carbonMergerMapping.campactionType) {
-          tableBlockInfoList.get(0).getSegmentId
+          tableBlockInfoList.get(0).getSegment.toString
         } else {
           mergedLoadName.substring(
             mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
@@ -326,7 +327,9 @@ class CarbonMergerRDD[K, V](
         val blockInfo = new TableBlockInfo(entry.getPath.toString,
           entry.getStart, entry.getSegmentId,
           entry.getLocations, entry.getLength, entry.getVersion,
-          updateStatusManager.getDeleteDeltaFilePath(entry.getPath.toString, entry.getSegmentId)
+          updateStatusManager.getDeleteDeltaFilePath(
+            entry.getPath.toString,
+            Segment.toSegment(entry.getSegmentId).getSegmentNo)
         )
         (!updated || (updated && (!CarbonUtil
           .isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 77ff139..3995aa7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -530,6 +530,23 @@ object CommonUtil {
   }
 
   /**
+   * This method will validate the flat folder property specified by the user
+   *
+   * @param tableProperties
+   */
+  def validateFlatFolder(tableProperties: Map[String, String]): Unit = {
+    val tblPropName = CarbonCommonConstants.FLAT_FOLDER
+    if (tableProperties.get(tblPropName).isDefined) {
+      val trimStr = tableProperties(tblPropName).trim
+      if (!trimStr.equalsIgnoreCase("true") && !trimStr.equalsIgnoreCase("false")) {
+        throw new MalformedCarbonCommandException(s"Invalid $tblPropName value found: " +
+                                                  s"$trimStr, only true|false is supported.")
+      }
+      tableProperties.put(tblPropName, trimStr)
+    }
+  }
+
+  /**
    * This method will validate the compaction level threshold property specified by the user
    * the property is used while doing minor compaction
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 61a5b42..7d28790 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -301,6 +301,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
     CommonUtil.validateTableBlockSize(tableProperties)
     // validate table level properties for compaction
     CommonUtil.validateTableLevelCompactionProperties(tableProperties)
+    // validate flat folder property.
+    CommonUtil.validateFlatFolder(tableProperties)
 
     TableModel(
       ifNotExistPresent,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
index 84d9c47..fdbf400 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
@@ -25,18 +25,23 @@ import scala.collection.mutable
 import scala.collection.mutable.ListBuffer
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
 import org.apache.spark.sql.execution.command.{AlterPartitionModel, DataMapField, Field, PartitionerField}
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.block.{SegmentProperties, TableBlockInfo}
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier, SegmentFileStore}
 import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.CarbonInputSplit
@@ -192,9 +197,13 @@ object PartitionUtils {
         val batchNo = CarbonTablePath.DataFileUtil.getBatchNoFromTaskNo(taskNo)
         val taskId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(taskNo)
         val bucketNumber = CarbonTablePath.DataFileUtil.getBucketNo(path)
-        val indexFilePath = CarbonTablePath.getCarbonIndexFilePath(
-          tablePath, String.valueOf(taskId), segmentId, batchNo, String.valueOf(bucketNumber),
-          timestamp, version)
+        val indexFilePath =
+          new Path(new Path(path).getParent,
+            CarbonTablePath.getCarbonIndexFileName(taskId,
+            bucketNumber.toInt,
+            batchNo,
+            timestamp,
+            segmentId)).toString
         // indexFilePath could be duplicated when multiple data file related to one index file
         if (indexFilePath != null && !pathList.contains(indexFilePath)) {
           pathList.add(indexFilePath)
@@ -209,11 +218,13 @@ object PartitionUtils {
     CarbonUtil.deleteFiles(files.asScala.toArray)
     if (!files.isEmpty) {
       val carbonTable = alterPartitionModel.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-      val file = SegmentFileStore.writeSegmentFile(
-        identifier.getTablePath,
-        alterPartitionModel.segmentId,
-        alterPartitionModel.carbonLoadModel.getFactTimeStamp.toString)
-      val segmentFiles = Seq(new Segment(alterPartitionModel.segmentId, file, null))
+      val updatedSegFile: String = mergeAndUpdateSegmentFile(alterPartitionModel,
+        identifier,
+        segmentId,
+        carbonTable,
+        files.asScala)
+
+      val segmentFiles = Seq(new Segment(alterPartitionModel.segmentId, updatedSegFile, null))
         .asJava
       if (!CarbonUpdateUtil.updateTableMetadataStatus(
         new util.HashSet[Segment](Seq(new Segment(alterPartitionModel.segmentId,
@@ -283,4 +294,50 @@ object PartitionUtils {
     generatePartitionerField(allPartitionColumn.toList, Seq.empty)
   }
 
+
+  private def mergeAndUpdateSegmentFile(alterPartitionModel: AlterPartitionModel,
+      identifier: AbsoluteTableIdentifier,
+      segmentId: String,
+      carbonTable: CarbonTable, filesToBeDelete: Seq[File]) = {
+    val metadataDetails =
+      SegmentStatusManager.readTableStatusFile(
+        CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath))
+    val segmentFile =
+      metadataDetails.find(_.getLoadName.equals(segmentId)).get.getSegmentFile
+    var allSegmentFiles: Seq[CarbonFile] = Seq.empty[CarbonFile]
+    val file = SegmentFileStore.writeSegmentFile(
+      carbonTable,
+      alterPartitionModel.segmentId,
+      System.currentTimeMillis().toString)
+    if (segmentFile != null) {
+      allSegmentFiles ++= FileFactory.getCarbonFile(
+        SegmentFileStore.getSegmentFilePath(carbonTable.getTablePath, segmentFile)) :: Nil
+    }
+    val updatedSegFile = {
+      val carbonFile = FileFactory.getCarbonFile(
+        SegmentFileStore.getSegmentFilePath(carbonTable.getTablePath, file))
+      allSegmentFiles ++= carbonFile :: Nil
+
+      val mergedSegFileName = SegmentFileStore.genSegmentFileName(
+        segmentId,
+        alterPartitionModel.carbonLoadModel.getFactTimeStamp.toString)
+      val tmpFile = mergedSegFileName + "_tmp"
+      val segmentStoreFile = SegmentFileStore.mergeSegmentFiles(
+        tmpFile,
+        CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath),
+        allSegmentFiles.toArray)
+      val indexFiles = segmentStoreFile.getLocationMap.values().asScala.head.getFiles
+      filesToBeDelete.foreach(f => indexFiles.remove(f.getName))
+      SegmentFileStore.writeSegmentFile(
+        segmentStoreFile,
+        CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + mergedSegFileName + CarbonTablePath.SEGMENT_EXT)
+      carbonFile.delete()
+      FileFactory.getCarbonFile(
+        SegmentFileStore.getSegmentFilePath(
+          carbonTable.getTablePath, tmpFile + CarbonTablePath.SEGMENT_EXT)).delete()
+      mergedSegFileName + CarbonTablePath.SEGMENT_EXT
+    }
+    updatedSegFile
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
index 5902783..f3f2650 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -84,9 +84,7 @@ object IndexDataMapRebuildRDD {
       segmentId: String): Unit = {
 
     val dataMapStorePath =
-      CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId) +
-      File.separator +
-      dataMapName
+      CarbonTablePath.getDataMapStorePath(carbonTable.getTablePath, segmentId, dataMapName)
 
     if (!FileFactory.isFileExist(dataMapStorePath)) {
       if (FileFactory.mkdirs(dataMapStorePath, FileFactory.getFileType(dataMapStorePath))) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 21a8641..5d53ccc 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -50,6 +50,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
 import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion, SegmentFileStore}
@@ -434,13 +436,7 @@ object CarbonDataRDDFactory {
             segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName, null))
           }
         }
-        val segmentFiles = segmentDetails.asScala.map{seg =>
-          val file = SegmentFileStore.writeSegmentFile(
-            carbonTable.getTablePath,
-            seg.getSegmentNo,
-            updateModel.get.updatedTimeStamp.toString)
-          new Segment(seg.getSegmentNo, file)
-        }.filter(_.getSegmentFileName != null).asJava
+        val segmentFiles = updateSegmentFiles(carbonTable, segmentDetails, updateModel.get)
 
         // this means that the update doesnt have any records to update so no need to do table
         // status file updation.
@@ -517,9 +513,13 @@ object CarbonDataRDDFactory {
       writeDictionary(carbonLoadModel, result, writeAll = false)
 
       val segmentFileName =
-        SegmentFileStore.writeSegmentFile(carbonTable.getTablePath, carbonLoadModel.getSegmentId,
+        SegmentFileStore.writeSegmentFile(carbonTable, carbonLoadModel.getSegmentId,
           String.valueOf(carbonLoadModel.getFactTimeStamp))
 
+      SegmentFileStore.updateSegmentFile(
+        carbonTable.getTablePath,
+        carbonLoadModel.getSegmentId,
+        segmentFileName)
       operationContext.setProperty(carbonTable.getTableUniqueName + "_Segment",
         carbonLoadModel.getSegmentId)
       val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
@@ -588,6 +588,58 @@ object CarbonDataRDDFactory {
   }
 
   /**
+   * Add and update the segment files. In case of update scenario the carbonindex files are written
+   * to the same segment so we need to update old segment file. So this ethod writes the latest data
+   * to new segment file and merges this file old file to get latest updated files.
+   * @param carbonTable
+   * @param segmentDetails
+   * @return
+   */
+  private def updateSegmentFiles(
+      carbonTable: CarbonTable,
+      segmentDetails: util.HashSet[Segment],
+      updateModel: UpdateTableModel) = {
+    val metadataDetails =
+      SegmentStatusManager.readTableStatusFile(
+        CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath))
+    val segmentFiles = segmentDetails.asScala.map { seg =>
+      val segmentFile =
+        metadataDetails.find(_.getLoadName.equals(seg.getSegmentNo)).get.getSegmentFile
+      var segmentFiles: Seq[CarbonFile] = Seq.empty[CarbonFile]
+
+      val file = SegmentFileStore.writeSegmentFile(
+        carbonTable,
+        seg.getSegmentNo,
+        String.valueOf(System.currentTimeMillis()))
+
+      if (segmentFile != null) {
+        segmentFiles ++= FileFactory.getCarbonFile(
+          SegmentFileStore.getSegmentFilePath(carbonTable.getTablePath, segmentFile)) :: Nil
+      }
+      val updatedSegFile = if (file != null) {
+        val carbonFile = FileFactory.getCarbonFile(
+          SegmentFileStore.getSegmentFilePath(carbonTable.getTablePath, file))
+        segmentFiles ++= carbonFile :: Nil
+
+        val mergedSegFileName = SegmentFileStore.genSegmentFileName(
+          seg.getSegmentNo,
+          updateModel.updatedTimeStamp.toString)
+        SegmentFileStore.mergeSegmentFiles(
+          mergedSegFileName,
+          CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath),
+          segmentFiles.toArray)
+        carbonFile.delete()
+        mergedSegFileName + CarbonTablePath.SEGMENT_EXT
+      } else {
+        null
+      }
+
+      new Segment(seg.getSegmentNo, updatedSegFile)
+    }.filter(_.getSegmentFileName != null).asJava
+    segmentFiles
+  }
+
+  /**
    * If data load is triggered by UPDATE query, this func will execute the update
    * TODO: move it to a separate update command
    */
@@ -614,10 +666,11 @@ object CarbonDataRDDFactory {
         carbonTable.getMetadataPath)
         .filter(lmd => lmd.getSegmentStatus.equals(SegmentStatus.LOAD_PARTIAL_SUCCESS) ||
                        lmd.getSegmentStatus.equals(SegmentStatus.SUCCESS))
-      val segmentIds = loadMetadataDetails.map(_.getLoadName)
-      val segmentIdIndex = segmentIds.zipWithIndex.toMap
-      val segmentId2maxTaskNo = segmentIds.map { segId =>
-        (segId, CarbonUpdateUtil.getLatestTaskIdForSegment(segId, carbonLoadModel.getTablePath))
+      val segments = loadMetadataDetails.map(f => new Segment(f.getLoadName, f.getSegmentFile))
+      val segmentIdIndex = segments.map(_.getSegmentNo).zipWithIndex.toMap
+      val segmentId2maxTaskNo = segments.map { seg =>
+        (seg.getSegmentNo,
+          CarbonUpdateUtil.getLatestTaskIdForSegment(seg, carbonLoadModel.getTablePath))
       }.toMap
 
       class SegmentPartitioner(segIdIndex: Map[String, Int], parallelism: Int)
@@ -639,10 +692,14 @@ object CarbonDataRDDFactory {
         val partitionId = TaskContext.getPartitionId()
         val segIdIndex = partitionId / segmentUpdateParallelism
         val randomPart = partitionId - segIdIndex * segmentUpdateParallelism
-        val segId = segmentIds(segIdIndex)
-        val newTaskNo = segmentId2maxTaskNo(segId) + randomPart + 1
-        List(triggerDataLoadForSegment(carbonLoadModel, updateModel, segId, newTaskNo, partition)
-          .toList).toIterator
+        val segId = segments(segIdIndex)
+        val newTaskNo = segmentId2maxTaskNo(segId.getSegmentNo) + randomPart + 1
+        List(triggerDataLoadForSegment(
+          carbonLoadModel,
+          updateModel,
+          segId.getSegmentNo,
+          newTaskNo,
+          partition).toList).toIterator
       }.collect()
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 155bdd1..7605b9d 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -223,7 +223,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
         if (compactionType == CompactionType.IUD_UPDDEL_DELTA) {
           val segmentFilesList = loadsToMerge.asScala.map{seg =>
             val file = SegmentFileStore.writeSegmentFile(
-              carbonTable.getTablePath,
+              carbonTable,
               seg.getLoadName,
               carbonLoadModel.getFactTimeStamp.toString)
             new Segment(seg.getLoadName, file)
@@ -231,7 +231,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
           segmentFilesForIUDCompact = new util.ArrayList[Segment](segmentFilesList)
         } else {
           segmentFileName = SegmentFileStore.writeSegmentFile(
-            carbonTable.getTablePath,
+            carbonTable,
             mergedLoadNumber,
             carbonLoadModel.getFactTimeStamp.toString)
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 93c0b4a..30cb464 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -99,7 +99,7 @@ class CarbonSession(@transient val sc: SparkContext,
             trySearchMode(qe, sse)
           } catch {
             case e: Exception =>
-              logError(String.format(
+              log.error(String.format(
                 "Exception when executing search mode: %s", e.getMessage))
               throw e;
           }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 0c6d2ba..127e1b1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -40,7 +40,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum}
 import org.apache.carbondata.core.mutate.data.RowCountDetailsVO
-import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentUpdateStatusManager}
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager, SegmentUpdateStatusManager}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
 import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
@@ -68,12 +68,7 @@ object DeleteExecution {
     val database = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
     val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-    val isPartitionTable = carbonTable.isHivePartitionTable
-    val factPath = if (isPartitionTable) {
-      absoluteTableIdentifier.getTablePath
-    } else {
-      CarbonTablePath.getFactDir(absoluteTableIdentifier.getTablePath)
-    }
+    val tablePath = absoluteTableIdentifier.getTablePath
     var segmentsTobeDeleted = Seq.empty[Segment]
 
     val deleteRdd = if (isUpdateOperation) {
@@ -114,6 +109,9 @@ object DeleteExecution {
     CarbonUpdateUtil
       .createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr)
 
+    val metadataDetails = SegmentStatusManager.readTableStatusFile(
+      CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath))
+
     val rowContRdd =
       sparkSession.sparkContext.parallelize(
         blockMappingVO.getCompleteBlockRowDetailVO.asScala.toSeq,
@@ -127,12 +125,16 @@ object DeleteExecution {
           var result = List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))]()
           while (records.hasNext) {
             val ((key), (rowCountDetailsVO, groupedRows)) = records.next
+            val segmentId = key.substring(0, key.indexOf(CarbonCommonConstants.FILE_SEPARATOR))
+            val segmentFile =
+              metadataDetails.find(_.getLoadName.equals(segmentId)).get.getSegmentFile
             result = result ++
                      deleteDeltaFunc(index,
                        key,
                        groupedRows.toIterator,
                        timestamp,
-                       rowCountDetailsVO)
+                       rowCountDetailsVO,
+                       segmentFile)
           }
           result
         }
@@ -219,7 +221,8 @@ object DeleteExecution {
         key: String,
         iter: Iterator[Row],
         timestamp: String,
-        rowCountDetailsVO: RowCountDetailsVO
+        rowCountDetailsVO: RowCountDetailsVO,
+        segmentFile: String
     ): Iterator[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))] = {
 
       val result = new DeleteDelataResultImpl()
@@ -255,7 +258,7 @@ object DeleteExecution {
             countOfRows = countOfRows + 1
           }
 
-          val blockPath = CarbonUpdateUtil.getTableBlockPath(TID, factPath, isPartitionTable)
+          val blockPath = CarbonUpdateUtil.getTableBlockPath(TID, tablePath, segmentFile != null)
           val completeBlockName = CarbonTablePath
             .addDataPartPrefix(CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.BLOCK_ID) +
                                CarbonCommonConstants.FACT_FILE_EXT)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
index f3b4be7..857cd81 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
@@ -103,6 +103,9 @@ case class PreAggregateTableHelper(
       .LOAD_SORT_SCOPE_DEFAULT))
     tableProperties
       .put(CarbonCommonConstants.TABLE_BLOCKSIZE, parentTable.getBlockSizeInMB.toString)
+    tableProperties.put(CarbonCommonConstants.FLAT_FOLDER,
+      parentTable.getTableInfo.getFactTable.getTableProperties.asScala.getOrElse(
+        CarbonCommonConstants.FLAT_FOLDER, CarbonCommonConstants.DEFAULT_FLAT_FOLDER))
     val tableIdentifier =
       TableIdentifier(parentTable.getTableName + "_" + dataMapName,
         Some(parentTable.getDatabaseName))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index 7d15cc1..617f5e8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -138,6 +138,11 @@ private[sql] case class CarbonDescribeFormattedCommand(
         tblProps.get(CarbonCommonConstants.TABLE_ALLOWED_COMPACTION_DAYS),
         CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT))
     }
+    if (tblProps.containsKey(CarbonCommonConstants.FLAT_FOLDER)) {
+      results ++= Seq((CarbonCommonConstants.FLAT_FOLDER.toUpperCase,
+        tblProps.get(CarbonCommonConstants.FLAT_FOLDER),
+        CarbonCommonConstants.DEFAULT_FLAT_FOLDER))
+    }
 
     results ++= Seq(("", "", ""), ("##Detailed Column property", "", ""))
     if (colPropStr.length() > 0) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
index 0bdef8a..7cee409 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
@@ -20,13 +20,15 @@ package org.apache.carbondata.spark.testsuite.partition
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
+import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -855,15 +857,24 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
     validatePartitionTableFiles(partitions, dataFiles)
   }
 
-  def getDataFiles(carbonTable: CarbonTable, segmentId: String): Array[CarbonFile] = {
-    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
-    val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
-    val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
-      override def accept(file: CarbonFile): Boolean = {
-        return file.getName.endsWith(".carbondata")
-      }
-    })
-    dataFiles
+  def getDataFiles(carbonTable: CarbonTable, segmentId: String): Array[String] = {
+    val segment = Segment.getSegment(segmentId, carbonTable.getTablePath)
+    if (segment.getSegmentFileName != null) {
+      val sfs = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
+      sfs.readIndexFiles()
+      val indexFilesMap = sfs.getIndexFilesMap
+      val dataFiles = indexFilesMap.asScala.flatMap(_._2.asScala).map(f => new Path(f).getName)
+      dataFiles.toArray
+    } else {
+      val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
+      val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
+      val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
+        override def accept(file: CarbonFile): Boolean = {
+          return file.getName.endsWith(".carbondata")
+        }
+      })
+      dataFiles.map(_.getName)
+    }
   }
 
   /**
@@ -871,10 +882,10 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
    * @param partitions
    * @param dataFiles
    */
-  def validatePartitionTableFiles(partitions: Seq[Int], dataFiles: Array[CarbonFile]): Unit = {
+  def validatePartitionTableFiles(partitions: Seq[Int], dataFiles: Array[String]): Unit = {
     val partitionIds: ListBuffer[Int] = new ListBuffer[Int]()
     dataFiles.foreach { dataFile =>
-      val partitionId = CarbonTablePath.DataFileUtil.getTaskNo(dataFile.getName).split("_")(0).toInt
+      val partitionId = CarbonTablePath.DataFileUtil.getTaskNo(dataFile).split("_")(0).toInt
       partitionIds += partitionId
       assert(partitions.contains(partitionId))
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
index 48733dc..1c7cb10 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
@@ -43,9 +43,9 @@ class CarbonGetTableDetailCommandTestCase extends QueryTest with BeforeAndAfterA
     assertResult(2)(result.length)
     assertResult("table_info1")(result(0).getString(0))
     // 2096 is the size of carbon table
-    assertResult(2096)(result(0).getLong(1))
+    assertResult(2098)(result(0).getLong(1))
     assertResult("table_info2")(result(1).getString(0))
-    assertResult(2096)(result(1).getLong(1))
+    assertResult(2098)(result(1).getLong(1))
   }
 
   override def afterAll: Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 3dc34d3..bfa498e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -165,6 +165,7 @@ public class DataMapWriterListener {
         writer.finish();
       }
     }
+    registry.clear();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java b/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
index eb02ede..fcabef5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
@@ -170,7 +170,8 @@ public abstract class AbstractDataLoadProcessorStep {
             carbonDataFileAttributes.getTaskId(),
             bucketId,
             0,
-            String.valueOf(carbonDataFileAttributes.getFactTimeStamp())));
+            String.valueOf(carbonDataFileAttributes.getFactTimeStamp()),
+            configuration.getSegmentId()));
     return listener;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
index bc28ace..5bed8b1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
@@ -64,7 +64,7 @@ public class TableProcessingOperations {
       CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
         @Override public boolean accept(CarbonFile path) {
           String segmentId =
-              CarbonTablePath.DataFileUtil.getSegmentId(path.getAbsolutePath() + "/dummy");
+              CarbonTablePath.DataFileUtil.getSegmentIdFromPath(path.getAbsolutePath() + "/dummy");
           boolean found = false;
           for (int j = 0; j < details.length; j++) {
             if (details[j].getLoadName().equals(segmentId)) {
@@ -76,8 +76,8 @@ public class TableProcessingOperations {
         }
       });
       for (int k = 0; k < listFiles.length; k++) {
-        String segmentId =
-            CarbonTablePath.DataFileUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy");
+        String segmentId = CarbonTablePath.DataFileUtil
+            .getSegmentIdFromPath(listFiles[k].getAbsolutePath() + "/dummy");
         if (isCompactionFlow) {
           if (segmentId.contains(".")) {
             CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 90c297e..4d3f3fc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.dictionary.service.DictionaryServiceProvider;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
@@ -83,7 +84,7 @@ public class CarbonLoadModel implements Serializable {
   /**
    * load Id
    */
-  private String segmentId;
+  private Segment segment;
 
   private String allDictPath;
 
@@ -424,7 +425,7 @@ public class CarbonLoadModel implements Serializable {
     copy.blocksID = blocksID;
     copy.taskNo = taskNo;
     copy.factTimeStamp = factTimeStamp;
-    copy.segmentId = segmentId;
+    copy.segment = segment;
     copy.serializationNullFormat = serializationNullFormat;
     copy.badRecordsLoggerEnable = badRecordsLoggerEnable;
     copy.badRecordsAction = badRecordsAction;
@@ -479,7 +480,7 @@ public class CarbonLoadModel implements Serializable {
     copyObj.blocksID = blocksID;
     copyObj.taskNo = taskNo;
     copyObj.factTimeStamp = factTimeStamp;
-    copyObj.segmentId = segmentId;
+    copyObj.segment = segment;
     copyObj.serializationNullFormat = serializationNullFormat;
     copyObj.badRecordsLoggerEnable = badRecordsLoggerEnable;
     copyObj.badRecordsAction = badRecordsAction;
@@ -609,14 +610,24 @@ public class CarbonLoadModel implements Serializable {
    * @return load Id
    */
   public String getSegmentId() {
-    return segmentId;
+    if (segment != null) {
+      return segment.getSegmentNo();
+    } else {
+      return null;
+    }
   }
 
   /**
    * @param segmentId
    */
   public void setSegmentId(String segmentId) {
-    this.segmentId = segmentId;
+    if (segmentId != null) {
+      this.segment = Segment.toSegment(segmentId);
+    }
+  }
+
+  public Segment getSegment() {
+    return segment;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
index 7a11c8b..b22599d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.processing.merger;
 
+import java.io.IOException;
 import java.util.List;
 
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
@@ -46,10 +47,11 @@ public abstract class AbstractResultProcessor {
   public abstract void close();
 
   protected void setDataFileAttributesInModel(CarbonLoadModel loadModel,
-      CompactionType compactionType, CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
+      CompactionType compactionType, CarbonFactDataHandlerModel carbonFactDataHandlerModel)
+      throws IOException {
     CarbonDataFileAttributes carbonDataFileAttributes;
     if (compactionType == CompactionType.IUD_UPDDEL_DELTA) {
-      long taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegmentId(),
+      long taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegment(),
           loadModel.getTablePath());
       // Increase the Task Index as in IUD_UPDDEL_DELTA_COMPACTION the new file will
       // be written in same segment. So the TaskNo should be incremented by 1 from max val.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index fef8ab9..dde18a9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -175,7 +175,6 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
                   partitionSpec.getLocation().toString(), carbonLoadModel.getFactTimeStamp() + "",
                   partitionSpec.getPartitions());
         } catch (IOException e) {
-          isCompactionSuccess = false;
           throw e;
         }
       }
@@ -428,6 +427,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
         .getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, segmentProperties, tableName,
             tempStoreLocation, carbonStoreLocation);
+    carbonFactDataHandlerModel.setSegmentId(carbonLoadModel.getSegmentId());
     setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonFactDataHandlerModel);
     dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(carbonFactDataHandlerModel,
         CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index 9a3258e..b877d52 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -63,7 +63,8 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
 
   public RowResultMergerProcessor(String databaseName,
       String tableName, SegmentProperties segProp, String[] tempStoreLocation,
-      CarbonLoadModel loadModel, CompactionType compactionType, PartitionSpec partitionSpec) {
+      CarbonLoadModel loadModel, CompactionType compactionType, PartitionSpec partitionSpec)
+      throws IOException {
     this.segprop = segProp;
     this.partitionSpec = partitionSpec;
     this.loadModel = loadModel;
@@ -84,6 +85,7 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
             tempStoreLocation, carbonStoreLocation);
     setDataFileAttributesInModel(loadModel, compactionType, carbonFactDataHandlerModel);
     carbonFactDataHandlerModel.setCompactionFlow(true);
+    carbonFactDataHandlerModel.setSegmentId(loadModel.getSegmentId());
     dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
index 221697f..9b09269 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
@@ -59,6 +59,7 @@ public class RowResultProcessor {
     carbonFactDataHandlerModel.setBucketId(bucketId);
     //Note: set compaction flow just to convert decimal type
     carbonFactDataHandlerModel.setCompactionFlow(true);
+    carbonFactDataHandlerModel.setSegmentId(loadModel.getSegmentId());
     dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 87a6de0..27249ab 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -267,7 +267,8 @@ public class CarbonFactDataHandlerModel {
               carbonDataFileAttributes.getTaskId(),
               bucketId,
               0,
-              String.valueOf(carbonDataFileAttributes.getFactTimeStamp())));
+              String.valueOf(carbonDataFileAttributes.getFactTimeStamp()),
+              configuration.getSegmentId()));
     }
     carbonFactDataHandlerModel.dataMapWriterlistener = listener;
     carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount();
@@ -337,7 +338,8 @@ public class CarbonFactDataHandlerModel {
             CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(loadModel.getTaskNo()),
             carbonFactDataHandlerModel.getBucketId(),
             carbonFactDataHandlerModel.getTaskExtension(),
-            String.valueOf(loadModel.getFactTimeStamp())));
+            String.valueOf(loadModel.getFactTimeStamp()),
+            loadModel.getSegmentId()));
 
     carbonFactDataHandlerModel.dataMapWriterlistener = listener;
     return carbonFactDataHandlerModel;