You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/06/16 02:56:24 UTC

[1/3] carbondata git commit: [CARBONDATA-2428] Support flat folder for managed carbon table

Repository: carbondata
Updated Branches:
  refs/heads/master 181f0ac9b -> 60dfdd385


http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 9d0c933..b76722b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -279,7 +279,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
     this.carbonDataFileName = CarbonTablePath
         .getCarbonDataFileName(fileCount, model.getCarbonDataFileAttributes().getTaskId(),
             model.getBucketId(), model.getTaskExtension(),
-            "" + model.getCarbonDataFileAttributes().getFactTimeStamp());
+            "" + model.getCarbonDataFileAttributes().getFactTimeStamp(), model.getSegmentId());
     this.carbonDataFileHdfsPath = model.getCarbonDataDirectoryPath() + File.separator
         + carbonDataFileName;
     try {
@@ -368,7 +368,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
       String rawFileName = model.getCarbonDataDirectoryPath() + File.separator + CarbonTablePath
           .getCarbonIndexFileName(model.getCarbonDataFileAttributes().getTaskId(),
               model.getBucketId(), model.getTaskExtension(),
-              "" + model.getCarbonDataFileAttributes().getFactTimeStamp());
+              "" + model.getCarbonDataFileAttributes().getFactTimeStamp(), model.getSegmentId());
       indexFileName = FileFactory.getUpdatedFilePath(rawFileName, FileFactory.FileType.HDFS);
     } else {
       // randomly choose a temp location for index file
@@ -378,7 +378,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
       indexFileName = chosenTempLocation + File.separator + CarbonTablePath
           .getCarbonIndexFileName(model.getCarbonDataFileAttributes().getTaskId(),
               model.getBucketId(), model.getTaskExtension(),
-              "" + model.getCarbonDataFileAttributes().getFactTimeStamp());
+              "" + model.getCarbonDataFileAttributes().getFactTimeStamp(), model.getSegmentId());
     }
 
     CarbonIndexFileWriter writer = new CarbonIndexFileWriter();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 0ea7223..da77cf6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -601,7 +601,9 @@ public final class CarbonLoaderUtil {
     long sizePerNode = 0;
     long totalFileSize = 0;
     if (BlockAssignmentStrategy.BLOCK_NUM_FIRST == blockAssignmentStrategy) {
-      sizePerNode = blockInfos.size() / noofNodes;
+      if (blockInfos.size() > 0) {
+        sizePerNode = blockInfos.size() / noofNodes;
+      }
       sizePerNode = sizePerNode <= 0 ? 1 : sizePerNode;
     } else if (BlockAssignmentStrategy.BLOCK_SIZE_FIRST == blockAssignmentStrategy
         || BlockAssignmentStrategy.NODE_MIN_SIZE_FIRST == blockAssignmentStrategy) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
----------------------------------------------------------------------
diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
index f6406c7..4bfadce 100644
--- a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
+++ b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java
@@ -37,15 +37,18 @@ import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
-import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope;
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
 import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor;
 import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.model.QueryModelBuilder;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.CarbonTaskInfo;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
 import org.apache.carbondata.hadoop.CarbonRecordReader;
@@ -164,12 +167,15 @@ public class SearchRequestHandler {
     Objects.requireNonNull(datamap);
     List<Segment> segments = new LinkedList<>();
     HashMap<String, Integer> uniqueSegments = new HashMap<>();
+    LoadMetadataDetails[] loadMetadataDetails =
+        SegmentStatusManager.readLoadMetadata(
+            CarbonTablePath.getMetadataPath(table.getTablePath()));
     for (CarbonInputSplit split : mbSplit.getAllSplits()) {
-      String segmentId = split.getSegmentId();
+      String segmentId = Segment.getSegment(split.getSegmentId(), loadMetadataDetails).toString();
       if (uniqueSegments.get(segmentId) == null) {
-        segments.add(Segment.toSegment(
-                segmentId,
-                new LatestFilesReadCommittedScope(table.getTablePath(), segmentId)));
+        segments.add(Segment.toSegment(segmentId,
+            new TableStatusReadCommittedScope(table.getAbsoluteTableIdentifier(),
+                loadMetadataDetails)));
         uniqueSegments.put(segmentId, 1);
       } else {
         uniqueSegments.put(segmentId, uniqueSegments.get(segmentId) + 1);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
index 4653445..bd622f0 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
@@ -128,7 +128,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
 
     segmentDir = CarbonTablePath.getSegmentPath(
         carbonTable.getAbsoluteTableIdentifier().getTablePath(), segmentId);
-    fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0");
+    fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0", segmentId);
   }
 
   private void initializeAtFirstRow() throws IOException, InterruptedException {


[2/3] carbondata git commit: [CARBONDATA-2428] Support flat folder for managed carbon table

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
index 99b536c..b4937e6 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
@@ -17,12 +17,15 @@
 
 package org.apache.carbondata.spark.testsuite.datacompaction
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
-import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
 
@@ -61,7 +64,7 @@ class CarbonIndexFileMergeTestCase
       """.stripMargin)
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE indexmerge OPTIONS('header'='false', " +
         s"'GLOBAL_SORT_PARTITIONS'='100')")
-    val table = CarbonMetadata.getInstance().getCarbonTable("default","indexmerge")
+    val table = CarbonMetadata.getInstance().getCarbonTable("default", "indexmerge")
     new CarbonIndexFileMergeWriter(table)
       .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
     assert(getIndexFileCount("default_indexmerge", "0") == 0)
@@ -84,7 +87,7 @@ class CarbonIndexFileMergeTestCase
     val rows = sql("""Select count(*) from nonindexmerge""").collect()
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
-    val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
+    val table = CarbonMetadata.getInstance().getCarbonTable("default", "nonindexmerge")
     new CarbonIndexFileMergeWriter(table)
       .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
     new CarbonIndexFileMergeWriter(table)
@@ -109,7 +112,7 @@ class CarbonIndexFileMergeTestCase
     val rows = sql("""Select count(*) from nonindexmerge""").collect()
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
-    val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
+    val table = CarbonMetadata.getInstance().getCarbonTable("default", "nonindexmerge")
     new CarbonIndexFileMergeWriter(table)
       .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
     new CarbonIndexFileMergeWriter(table)
@@ -138,7 +141,7 @@ class CarbonIndexFileMergeTestCase
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
     sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
-    val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
+    val table = CarbonMetadata.getInstance().getCarbonTable("default", "nonindexmerge")
     new CarbonIndexFileMergeWriter(table)
       .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false)
     assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
@@ -167,7 +170,7 @@ class CarbonIndexFileMergeTestCase
     assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
     sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
-    val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
+    val table = CarbonMetadata.getInstance().getCarbonTable("default", "nonindexmerge")
     new CarbonIndexFileMergeWriter(table)
       .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false)
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
@@ -190,18 +193,32 @@ class CarbonIndexFileMergeTestCase
     sql("select * from mitable").show()
   }
 
-  private def getIndexFileCount(tableName: String, segment: String): Int = {
-    val table = CarbonMetadata.getInstance().getCarbonTable(tableName)
-    val path = CarbonTablePath
-      .getSegmentPath(table.getAbsoluteTableIdentifier.getTablePath, segment)
-    val carbonFiles = FileFactory.getCarbonFile(path).listFiles(new CarbonFileFilter {
-      override def accept(file: CarbonFile): Boolean = file.getName.endsWith(CarbonTablePath
-        .INDEX_FILE_EXT)
-    })
-    if (carbonFiles != null) {
-      carbonFiles.length
+  private def getIndexFileCount(tableName: String, segmentNo: String): Int = {
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableName)
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNo)
+    if (FileFactory.isFileExist(segmentDir)) {
+      val indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir)
+      indexFiles.asScala.map { f =>
+        if (f._2 == null) {
+          1
+        } else {
+          0
+        }
+      }.sum
     } else {
-      0
+      val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath)
+      if (segment != null) {
+        val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
+        store.getSegmentFile.getLocationMap.values().asScala.map { f =>
+          if (f.getMergeFileName == null) {
+            f.getFiles.size()
+          } else {
+            0
+          }
+        }.sum
+      } else {
+        0
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
index 8f891ce..d49b962 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
@@ -22,9 +22,12 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
 
 class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
   val filePath: String = s"$resourcesPath/globalsort"
@@ -527,8 +530,12 @@ class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAf
 
   private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName)
-    val store = carbonTable.getAbsoluteTableIdentifier.getTablePath + "/Fact/Part0/Segment_" +
-                segmentNo
-    new SegmentIndexFileStore().getIndexFilesFromSegment(store).size()
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNo)
+    if (FileFactory.isFileExist(segmentDir)) {
+      new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
+    } else {
+      val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath)
+      new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName).getIndexCarbonFiles.size()
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
index 2da1ada..54c19c2 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
@@ -23,8 +23,11 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 
 class CompactionSupportGlobalSortParameterTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
   val filePath: String = s"$resourcesPath/globalsort"
@@ -567,8 +570,12 @@ class CompactionSupportGlobalSortParameterTest extends QueryTest with BeforeAndA
 
   private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName)
-    val store = carbonTable.getAbsoluteTableIdentifier.getTablePath + "/Fact/Part0/Segment_" +
-                segmentNo
-    new SegmentIndexFileStore().getIndexFilesFromSegment(store).size()
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNo)
+    if (FileFactory.isFileExist(segmentDir)) {
+      new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
+    } else {
+      val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath)
+      new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName).getIndexCarbonFiles.size()
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
index f3e12d1..c695b05 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
@@ -26,8 +26,10 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.spark.sql.test.util.QueryTest
 
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.path.CarbonTablePath
 
@@ -189,12 +191,14 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll {
   }
 
   def getIndexfileCount(tableName: String, segmentNo: String = "0"): Int = {
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
-      CarbonCommonConstants.DATABASE_DEFAULT_NAME,
-      tableName
-    )
-    val segmentDir = carbonTable.getSegmentPath(segmentNo)
-    new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName)
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNo)
+    if (FileFactory.isFileExist(segmentDir)) {
+      new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
+    } else {
+      val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath)
+      new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName).getIndexCarbonFiles.size()
+    }
   }
 
   override def afterAll {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
index b9d8e12..39785a3 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.spark.testsuite.dataload
 
+import scala.collection.JavaConverters._
+
 import java.io.{File, FilenameFilter}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -26,7 +28,9 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
 
 class TestDataLoadWithFileName extends QueryTest with BeforeAndAfterAll {
   var originVersion = ""
@@ -49,12 +53,20 @@ class TestDataLoadWithFileName extends QueryTest with BeforeAndAfterAll {
     val indexReader = new CarbonIndexFileReader()
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "test_table_v3")
     val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, "0")
-    val carbonIndexPaths = new File(segmentDir)
-      .listFiles(new FilenameFilter {
-        override def accept(dir: File, name: String): Boolean = {
-          name.endsWith(CarbonTablePath.getCarbonIndexExtension)
-        }
-      })
+
+    val carbonIndexPaths = if (FileFactory.isFileExist(segmentDir)) {
+      new File(segmentDir)
+        .listFiles(new FilenameFilter {
+          override def accept(dir: File, name: String): Boolean = {
+            name.endsWith(CarbonTablePath.getCarbonIndexExtension)
+          }
+        })
+    } else {
+      val segment = Segment.getSegment("0", carbonTable.getTablePath)
+      val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
+      store.readIndexFiles()
+      store.getIndexCarbonFiles.asScala.map(f => new File(f.getAbsolutePath)).toArray
+    }
     for (carbonIndexPath <- carbonIndexPaths) {
       indexReader.openThriftReader(carbonIndexPath.getCanonicalPath)
       assert(indexReader.readIndexHeader().getVersion === 3)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index bba75ad..d7b1172 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.spark.testsuite.dataload
 
+import scala.collection.JavaConverters._
+
 import java.io.{File, FileWriter}
 
 import org.apache.commons.io.FileUtils
@@ -31,8 +33,10 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
 import org.apache.carbondata.core.util.path.CarbonTablePath
 
@@ -273,7 +277,15 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
     sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE carbon_globalsort")
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "carbon_globalsort")
     val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, "0")
-    assertResult(Math.max(7, defaultParallelism) + 1)(new File(segmentDir).listFiles().length)
+    if (FileFactory.isFileExist(segmentDir)) {
+      assertResult(Math.max(7, defaultParallelism) + 1)(new File(segmentDir).listFiles().length)
+    } else {
+      val segment = Segment.getSegment("0", carbonTable.getTablePath)
+      val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
+      store.readIndexFiles()
+      val size = store.getIndexFilesMap.asScala.map(f => f._2.size()).sum
+      assertResult(Math.max(7, defaultParallelism) + 1)(size + store.getIndexFilesMap.size())
+    }
   }
 
   test("Query with small files") {
@@ -379,6 +391,11 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
   private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName)
     val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNo)
-    new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
+    if (FileFactory.isFileExist(segmentDir)) {
+      new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
+    } else {
+      val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath)
+      new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName).getIndexCarbonFiles.size()
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index b5c3df1..074c807 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -64,8 +64,9 @@ class CGDataMapFactory(
    * Get the datamap for segmentid
    */
   override def getDataMaps(segment: Segment): java.util.List[CoarseGrainDataMap] = {
-    val path = CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo)
-    val file = FileFactory.getCarbonFile(path+ "/" +dataMapSchema.getDataMapName)
+    val path = identifier.getTablePath
+    val file = FileFactory.getCarbonFile(
+      path+ "/" +dataMapSchema.getDataMapName + "/" + segment.getSegmentNo)
 
     val files = file.listFiles()
     files.map {f =>
@@ -100,8 +101,9 @@ class CGDataMapFactory(
    * @return
    */
   override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = {
-    val path = CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo)
-    val file = FileFactory.getCarbonFile(path+ "/" +dataMapSchema.getDataMapName)
+    val path = identifier.getTablePath
+    val file = FileFactory.getCarbonFile(
+      path+ "/" +dataMapSchema.getDataMapName + "/" + segment.getSegmentNo)
 
     val files = file.listFiles()
     files.map { f =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index 2d666c3..08d8911 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -89,8 +89,9 @@ class FGDataMapFactory(carbonTable: CarbonTable,
    * @return
    */
   override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = {
-    val path = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segment.getSegmentNo)
-    val file = FileFactory.getCarbonFile(path+ "/" +dataMapSchema.getDataMapName)
+    val path = carbonTable.getTablePath
+    val file = FileFactory.getCarbonFile(
+      path+ "/" +dataMapSchema.getDataMapName + "/" + segment.getSegmentNo)
 
     val files = file.listFiles()
     files.map { f =>
@@ -416,7 +417,6 @@ class FGDataMapWriter(carbonTable: CarbonTable,
     stream.write(bytes)
     stream.writeInt(bytes.length)
     stream.close()
-//    commitFile(fgwritepath)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
index eaa2ae7..642607c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.spark.testsuite.datamap
 
+import scala.collection.JavaConverters._
+
 import java.io.{File, FilenameFilter}
 
 import org.apache.spark.sql.Row
@@ -26,7 +28,9 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.common.exceptions.MetadataProcessException
 import org.apache.carbondata.common.exceptions.sql.{MalformedDataMapCommandException, NoSuchDataMapException}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 
@@ -261,12 +265,21 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
          |    group by name
        """.stripMargin)
     assertResult(true)(new File(path).exists())
-    assertResult(true)(new File(s"${CarbonTablePath.getSegmentPath(path, "0")}")
-      .list(new FilenameFilter {
-        override def accept(dir: File, name: String): Boolean = {
-          name.contains(CarbonCommonConstants.FACT_FILE_EXT)
-        }
-      }).length > 0)
+    if (FileFactory.isFileExist(CarbonTablePath.getSegmentPath(path, "0"))) {
+      assertResult(true)(new File(s"${CarbonTablePath.getSegmentPath(path, "0")}")
+         .list(new FilenameFilter {
+           override def accept(dir: File, name: String): Boolean = {
+             name.contains(CarbonCommonConstants.FACT_FILE_EXT)
+           }
+         }).length > 0)
+    } else {
+      val segment = Segment.getSegment("0", path)
+      val store = new SegmentFileStore(path, segment.getSegmentFileName)
+      store.readIndexFiles()
+      val size = store.getIndexFilesMap.asScala.map(f => f._2.size()).sum
+      assertResult(true)(size > 0)
+    }
+
     checkAnswer(sql("select name,avg(salary) from main group by name"), Row("amy", 13.0))
     checkAnswer(sql("select * from main_preagg"), Row("amy", 26, 2))
     sql("drop datamap preagg on table main")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
new file mode 100644
index 0000000..d786d10
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.flatfolder
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class FlatFolderTableLoadingTestCase extends QueryTest with BeforeAndAfterAll {
+  override def beforeAll {
+    dropTable
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    sql(
+      """
+        | CREATE TABLE originTable (empno int, empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+  }
+
+  def validateDataFiles(tableUniqueName: String, segmentId: String): Unit = {
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
+    val files = FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles()
+    assert(files.exists(_.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT)))
+  }
+
+  test("data loading for flat folder with global sort") {
+    sql(
+      """
+        | CREATE TABLE flatfolder_gs (empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int,empno int)
+        | STORED BY 'org.apache.carbondata.format' tblproperties('sort_scope'='global_sort', 'flat_folder'='true')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE flatfolder_gs OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    validateDataFiles("default_flatfolder_gs", "0")
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from flatfolder_gs order by empno"),
+      sql("select  empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+
+  }
+
+  test("data loading for flat folder") {
+    sql(
+      """
+        | CREATE TABLE flatfolder (empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int,empno int)
+        | STORED BY 'org.apache.carbondata.format' tblproperties('flat_folder'='true')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE flatfolder OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    validateDataFiles("default_flatfolder", "0")
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from flatfolder order by empno"),
+      sql("select  empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+
+  }
+
+  test("data loading for flat folder pre-agg") {
+    sql(
+      """
+        | CREATE TABLE flatfolder_preagg (empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int,empno int)
+        | STORED BY 'org.apache.carbondata.format' tblproperties('flat_folder'='true')
+      """.stripMargin)
+    sql("create datamap p2 on table flatfolder_preagg using 'preaggregate' as select empname, designation, min(salary) from flatfolder_preagg group by empname, designation ")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE flatfolder_preagg OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    validateDataFiles("default_flatfolder_preagg", "0")
+    validateDataFiles("default_flatfolder_preagg_p2", "0")
+
+    checkAnswer(sql("select empname, designation, min(salary) from flatfolder_preagg group by empname, designation"),
+      sql("select empname, designation, min(salary) from originTable group by empname, designation"))
+
+  }
+
+  override def afterAll = {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION ,
+      CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)
+    dropTable
+  }
+
+  def dropTable = {
+    sql("drop table if exists originTable")
+    sql("drop table if exists flatfolder")
+    sql("drop table if exists flatfolder_gs")
+    sql("drop table if exists flatfolder_preagg")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index ec39f66..2432715 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -26,6 +26,9 @@ import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOp
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.spark.sql.test.util.QueryTest
 
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
 class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
   override def beforeAll {
 
@@ -689,7 +692,12 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
                      CarbonCommonConstants.FILE_SEPARATOR + "t" +
                      CarbonCommonConstants.FILE_SEPARATOR + "Fact" +
                      CarbonCommonConstants.FILE_SEPARATOR + "Part0")
-    assert(f.list().length == 2)
+    if (!FileFactory.isFileExist(
+      CarbonTablePath.getSegmentFilesLocation(
+        dblocation + CarbonCommonConstants.FILE_SEPARATOR +
+        CarbonCommonConstants.FILE_SEPARATOR + "t"))) {
+      assert(f.list().length == 2)
+    }
   }
   test("test sentences func in update statement") {
     sql("drop table if exists senten")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
index 0eaaec5..133454a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
@@ -16,17 +16,22 @@
  */
 package org.apache.carbondata.spark.testsuite.partition
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.TestQueryExecutor
 import org.scalatest.BeforeAndAfterAll
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.spark.sql.test.util.QueryTest
 
+import org.apache.carbondata.core.datamap.Segment
+
 class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll {
@@ -62,12 +67,20 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll
   def validateDataFiles(tableUniqueName: String, segmentId: String, partitions: Seq[Int]): Unit = {
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
     val segmentDir = carbonTable.getSegmentPath(segmentId)
-    val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
-    val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
-      override def accept(file: CarbonFile): Boolean = {
-        return file.getName.endsWith(".carbondata")
-      }
-    })
+
+    val dataFiles = if (FileFactory.isFileExist(segmentDir)) {
+      val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
+      carbonFile.listFiles(new CarbonFileFilter() {
+        override def accept(file: CarbonFile): Boolean = {
+          return file.getName.endsWith(".carbondata")
+        }
+      })
+    } else {
+      val segment = Segment.getSegment(segmentId, carbonTable.getTablePath)
+      val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
+      store.readIndexFiles()
+      store.getIndexFilesMap.asScala.flatMap(_._2.asScala).map(f => FileFactory.getCarbonFile(f)).toArray
+    }
 
     assert(dataFiles.size == partitions.size)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 0422239..f443214 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -41,7 +41,7 @@ import org.apache.carbondata.processing.sort.sortdata.SortParameters
 import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory}
 import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataProcessorUtil}
 import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow}
-import org.apache.carbondata.spark.util.Util
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
 
 object DataLoadProcessorStepOnSpark {
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index 2ba6e5e..3aaf0ae 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -28,6 +28,7 @@ import org.apache.spark.{Partition, SparkContext}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.execution.command.CarbonMergerMapping
 
+import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
 import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
@@ -74,7 +75,8 @@ class CarbonIUDMergerRDD[K, V](
     val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
 
     // group blocks by segment.
-    val splitsGroupedMySegment = carbonInputSplits.groupBy(_.getSegmentId)
+    val splitsGroupedMySegment =
+      carbonInputSplits.groupBy(_.getSegmentId)
 
     var i = -1
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index d29284f..2fca57e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.block._
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
@@ -133,7 +134,7 @@ class CarbonMergerRDD[K, V](
             .toList
         }
         mergeNumber = if (CompactionType.IUD_UPDDEL_DELTA == carbonMergerMapping.campactionType) {
-          tableBlockInfoList.get(0).getSegmentId
+          tableBlockInfoList.get(0).getSegment.toString
         } else {
           mergedLoadName.substring(
             mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
@@ -326,7 +327,9 @@ class CarbonMergerRDD[K, V](
         val blockInfo = new TableBlockInfo(entry.getPath.toString,
           entry.getStart, entry.getSegmentId,
           entry.getLocations, entry.getLength, entry.getVersion,
-          updateStatusManager.getDeleteDeltaFilePath(entry.getPath.toString, entry.getSegmentId)
+          updateStatusManager.getDeleteDeltaFilePath(
+            entry.getPath.toString,
+            Segment.toSegment(entry.getSegmentId).getSegmentNo)
         )
         (!updated || (updated && (!CarbonUtil
           .isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 77ff139..3995aa7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -530,6 +530,23 @@ object CommonUtil {
   }
 
   /**
+   * This method will validate the flat folder property specified by the user
+   *
+   * @param tableProperties
+   */
+  def validateFlatFolder(tableProperties: Map[String, String]): Unit = {
+    val tblPropName = CarbonCommonConstants.FLAT_FOLDER
+    if (tableProperties.get(tblPropName).isDefined) {
+      val trimStr = tableProperties(tblPropName).trim
+      if (!trimStr.equalsIgnoreCase("true") && !trimStr.equalsIgnoreCase("false")) {
+        throw new MalformedCarbonCommandException(s"Invalid $tblPropName value found: " +
+                                                  s"$trimStr, only true|false is supported.")
+      }
+      tableProperties.put(tblPropName, trimStr)
+    }
+  }
+
+  /**
    * This method will validate the compaction level threshold property specified by the user
    * the property is used while doing minor compaction
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 61a5b42..7d28790 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -301,6 +301,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
     CommonUtil.validateTableBlockSize(tableProperties)
     // validate table level properties for compaction
     CommonUtil.validateTableLevelCompactionProperties(tableProperties)
+    // validate flat folder property.
+    CommonUtil.validateFlatFolder(tableProperties)
 
     TableModel(
       ifNotExistPresent,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
index 84d9c47..fdbf400 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
@@ -25,18 +25,23 @@ import scala.collection.mutable
 import scala.collection.mutable.ListBuffer
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
 import org.apache.spark.sql.execution.command.{AlterPartitionModel, DataMapField, Field, PartitionerField}
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.block.{SegmentProperties, TableBlockInfo}
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier, SegmentFileStore}
 import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.CarbonInputSplit
@@ -192,9 +197,13 @@ object PartitionUtils {
         val batchNo = CarbonTablePath.DataFileUtil.getBatchNoFromTaskNo(taskNo)
         val taskId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(taskNo)
         val bucketNumber = CarbonTablePath.DataFileUtil.getBucketNo(path)
-        val indexFilePath = CarbonTablePath.getCarbonIndexFilePath(
-          tablePath, String.valueOf(taskId), segmentId, batchNo, String.valueOf(bucketNumber),
-          timestamp, version)
+        val indexFilePath =
+          new Path(new Path(path).getParent,
+            CarbonTablePath.getCarbonIndexFileName(taskId,
+            bucketNumber.toInt,
+            batchNo,
+            timestamp,
+            segmentId)).toString
         // indexFilePath could be duplicated when multiple data file related to one index file
         if (indexFilePath != null && !pathList.contains(indexFilePath)) {
           pathList.add(indexFilePath)
@@ -209,11 +218,13 @@ object PartitionUtils {
     CarbonUtil.deleteFiles(files.asScala.toArray)
     if (!files.isEmpty) {
       val carbonTable = alterPartitionModel.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-      val file = SegmentFileStore.writeSegmentFile(
-        identifier.getTablePath,
-        alterPartitionModel.segmentId,
-        alterPartitionModel.carbonLoadModel.getFactTimeStamp.toString)
-      val segmentFiles = Seq(new Segment(alterPartitionModel.segmentId, file, null))
+      val updatedSegFile: String = mergeAndUpdateSegmentFile(alterPartitionModel,
+        identifier,
+        segmentId,
+        carbonTable,
+        files.asScala)
+
+      val segmentFiles = Seq(new Segment(alterPartitionModel.segmentId, updatedSegFile, null))
         .asJava
       if (!CarbonUpdateUtil.updateTableMetadataStatus(
         new util.HashSet[Segment](Seq(new Segment(alterPartitionModel.segmentId,
@@ -283,4 +294,50 @@ object PartitionUtils {
     generatePartitionerField(allPartitionColumn.toList, Seq.empty)
   }
 
+
+  private def mergeAndUpdateSegmentFile(alterPartitionModel: AlterPartitionModel,
+      identifier: AbsoluteTableIdentifier,
+      segmentId: String,
+      carbonTable: CarbonTable, filesToBeDelete: Seq[File]) = {
+    val metadataDetails =
+      SegmentStatusManager.readTableStatusFile(
+        CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath))
+    val segmentFile =
+      metadataDetails.find(_.getLoadName.equals(segmentId)).get.getSegmentFile
+    var allSegmentFiles: Seq[CarbonFile] = Seq.empty[CarbonFile]
+    val file = SegmentFileStore.writeSegmentFile(
+      carbonTable,
+      alterPartitionModel.segmentId,
+      System.currentTimeMillis().toString)
+    if (segmentFile != null) {
+      allSegmentFiles ++= FileFactory.getCarbonFile(
+        SegmentFileStore.getSegmentFilePath(carbonTable.getTablePath, segmentFile)) :: Nil
+    }
+    val updatedSegFile = {
+      val carbonFile = FileFactory.getCarbonFile(
+        SegmentFileStore.getSegmentFilePath(carbonTable.getTablePath, file))
+      allSegmentFiles ++= carbonFile :: Nil
+
+      val mergedSegFileName = SegmentFileStore.genSegmentFileName(
+        segmentId,
+        alterPartitionModel.carbonLoadModel.getFactTimeStamp.toString)
+      val tmpFile = mergedSegFileName + "_tmp"
+      val segmentStoreFile = SegmentFileStore.mergeSegmentFiles(
+        tmpFile,
+        CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath),
+        allSegmentFiles.toArray)
+      val indexFiles = segmentStoreFile.getLocationMap.values().asScala.head.getFiles
+      filesToBeDelete.foreach(f => indexFiles.remove(f.getName))
+      SegmentFileStore.writeSegmentFile(
+        segmentStoreFile,
+        CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + mergedSegFileName + CarbonTablePath.SEGMENT_EXT)
+      carbonFile.delete()
+      FileFactory.getCarbonFile(
+        SegmentFileStore.getSegmentFilePath(
+          carbonTable.getTablePath, tmpFile + CarbonTablePath.SEGMENT_EXT)).delete()
+      mergedSegFileName + CarbonTablePath.SEGMENT_EXT
+    }
+    updatedSegFile
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
index 5902783..f3f2650 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -84,9 +84,7 @@ object IndexDataMapRebuildRDD {
       segmentId: String): Unit = {
 
     val dataMapStorePath =
-      CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId) +
-      File.separator +
-      dataMapName
+      CarbonTablePath.getDataMapStorePath(carbonTable.getTablePath, segmentId, dataMapName)
 
     if (!FileFactory.isFileExist(dataMapStorePath)) {
       if (FileFactory.mkdirs(dataMapStorePath, FileFactory.getFileType(dataMapStorePath))) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 21a8641..5d53ccc 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -50,6 +50,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
 import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion, SegmentFileStore}
@@ -434,13 +436,7 @@ object CarbonDataRDDFactory {
             segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName, null))
           }
         }
-        val segmentFiles = segmentDetails.asScala.map{seg =>
-          val file = SegmentFileStore.writeSegmentFile(
-            carbonTable.getTablePath,
-            seg.getSegmentNo,
-            updateModel.get.updatedTimeStamp.toString)
-          new Segment(seg.getSegmentNo, file)
-        }.filter(_.getSegmentFileName != null).asJava
+        val segmentFiles = updateSegmentFiles(carbonTable, segmentDetails, updateModel.get)
 
         // this means that the update doesnt have any records to update so no need to do table
         // status file updation.
@@ -517,9 +513,13 @@ object CarbonDataRDDFactory {
       writeDictionary(carbonLoadModel, result, writeAll = false)
 
       val segmentFileName =
-        SegmentFileStore.writeSegmentFile(carbonTable.getTablePath, carbonLoadModel.getSegmentId,
+        SegmentFileStore.writeSegmentFile(carbonTable, carbonLoadModel.getSegmentId,
           String.valueOf(carbonLoadModel.getFactTimeStamp))
 
+      SegmentFileStore.updateSegmentFile(
+        carbonTable.getTablePath,
+        carbonLoadModel.getSegmentId,
+        segmentFileName)
       operationContext.setProperty(carbonTable.getTableUniqueName + "_Segment",
         carbonLoadModel.getSegmentId)
       val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
@@ -588,6 +588,58 @@ object CarbonDataRDDFactory {
   }
 
   /**
+   * Add and update the segment files. In case of update scenario the carbonindex files are written
+   * to the same segment so we need to update old segment file. So this ethod writes the latest data
+   * to new segment file and merges this file old file to get latest updated files.
+   * @param carbonTable
+   * @param segmentDetails
+   * @return
+   */
+  private def updateSegmentFiles(
+      carbonTable: CarbonTable,
+      segmentDetails: util.HashSet[Segment],
+      updateModel: UpdateTableModel) = {
+    val metadataDetails =
+      SegmentStatusManager.readTableStatusFile(
+        CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath))
+    val segmentFiles = segmentDetails.asScala.map { seg =>
+      val segmentFile =
+        metadataDetails.find(_.getLoadName.equals(seg.getSegmentNo)).get.getSegmentFile
+      var segmentFiles: Seq[CarbonFile] = Seq.empty[CarbonFile]
+
+      val file = SegmentFileStore.writeSegmentFile(
+        carbonTable,
+        seg.getSegmentNo,
+        String.valueOf(System.currentTimeMillis()))
+
+      if (segmentFile != null) {
+        segmentFiles ++= FileFactory.getCarbonFile(
+          SegmentFileStore.getSegmentFilePath(carbonTable.getTablePath, segmentFile)) :: Nil
+      }
+      val updatedSegFile = if (file != null) {
+        val carbonFile = FileFactory.getCarbonFile(
+          SegmentFileStore.getSegmentFilePath(carbonTable.getTablePath, file))
+        segmentFiles ++= carbonFile :: Nil
+
+        val mergedSegFileName = SegmentFileStore.genSegmentFileName(
+          seg.getSegmentNo,
+          updateModel.updatedTimeStamp.toString)
+        SegmentFileStore.mergeSegmentFiles(
+          mergedSegFileName,
+          CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath),
+          segmentFiles.toArray)
+        carbonFile.delete()
+        mergedSegFileName + CarbonTablePath.SEGMENT_EXT
+      } else {
+        null
+      }
+
+      new Segment(seg.getSegmentNo, updatedSegFile)
+    }.filter(_.getSegmentFileName != null).asJava
+    segmentFiles
+  }
+
+  /**
    * If data load is triggered by UPDATE query, this func will execute the update
    * TODO: move it to a separate update command
    */
@@ -614,10 +666,11 @@ object CarbonDataRDDFactory {
         carbonTable.getMetadataPath)
         .filter(lmd => lmd.getSegmentStatus.equals(SegmentStatus.LOAD_PARTIAL_SUCCESS) ||
                        lmd.getSegmentStatus.equals(SegmentStatus.SUCCESS))
-      val segmentIds = loadMetadataDetails.map(_.getLoadName)
-      val segmentIdIndex = segmentIds.zipWithIndex.toMap
-      val segmentId2maxTaskNo = segmentIds.map { segId =>
-        (segId, CarbonUpdateUtil.getLatestTaskIdForSegment(segId, carbonLoadModel.getTablePath))
+      val segments = loadMetadataDetails.map(f => new Segment(f.getLoadName, f.getSegmentFile))
+      val segmentIdIndex = segments.map(_.getSegmentNo).zipWithIndex.toMap
+      val segmentId2maxTaskNo = segments.map { seg =>
+        (seg.getSegmentNo,
+          CarbonUpdateUtil.getLatestTaskIdForSegment(seg, carbonLoadModel.getTablePath))
       }.toMap
 
       class SegmentPartitioner(segIdIndex: Map[String, Int], parallelism: Int)
@@ -639,10 +692,14 @@ object CarbonDataRDDFactory {
         val partitionId = TaskContext.getPartitionId()
         val segIdIndex = partitionId / segmentUpdateParallelism
         val randomPart = partitionId - segIdIndex * segmentUpdateParallelism
-        val segId = segmentIds(segIdIndex)
-        val newTaskNo = segmentId2maxTaskNo(segId) + randomPart + 1
-        List(triggerDataLoadForSegment(carbonLoadModel, updateModel, segId, newTaskNo, partition)
-          .toList).toIterator
+        val segId = segments(segIdIndex)
+        val newTaskNo = segmentId2maxTaskNo(segId.getSegmentNo) + randomPart + 1
+        List(triggerDataLoadForSegment(
+          carbonLoadModel,
+          updateModel,
+          segId.getSegmentNo,
+          newTaskNo,
+          partition).toList).toIterator
       }.collect()
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 155bdd1..7605b9d 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -223,7 +223,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
         if (compactionType == CompactionType.IUD_UPDDEL_DELTA) {
           val segmentFilesList = loadsToMerge.asScala.map{seg =>
             val file = SegmentFileStore.writeSegmentFile(
-              carbonTable.getTablePath,
+              carbonTable,
               seg.getLoadName,
               carbonLoadModel.getFactTimeStamp.toString)
             new Segment(seg.getLoadName, file)
@@ -231,7 +231,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
           segmentFilesForIUDCompact = new util.ArrayList[Segment](segmentFilesList)
         } else {
           segmentFileName = SegmentFileStore.writeSegmentFile(
-            carbonTable.getTablePath,
+            carbonTable,
             mergedLoadNumber,
             carbonLoadModel.getFactTimeStamp.toString)
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 93c0b4a..30cb464 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -99,7 +99,7 @@ class CarbonSession(@transient val sc: SparkContext,
             trySearchMode(qe, sse)
           } catch {
             case e: Exception =>
-              logError(String.format(
+              log.error(String.format(
                 "Exception when executing search mode: %s", e.getMessage))
               throw e;
           }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 0c6d2ba..127e1b1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -40,7 +40,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum}
 import org.apache.carbondata.core.mutate.data.RowCountDetailsVO
-import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentUpdateStatusManager}
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager, SegmentUpdateStatusManager}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
 import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
@@ -68,12 +68,7 @@ object DeleteExecution {
     val database = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
     val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-    val isPartitionTable = carbonTable.isHivePartitionTable
-    val factPath = if (isPartitionTable) {
-      absoluteTableIdentifier.getTablePath
-    } else {
-      CarbonTablePath.getFactDir(absoluteTableIdentifier.getTablePath)
-    }
+    val tablePath = absoluteTableIdentifier.getTablePath
     var segmentsTobeDeleted = Seq.empty[Segment]
 
     val deleteRdd = if (isUpdateOperation) {
@@ -114,6 +109,9 @@ object DeleteExecution {
     CarbonUpdateUtil
       .createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr)
 
+    val metadataDetails = SegmentStatusManager.readTableStatusFile(
+      CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath))
+
     val rowContRdd =
       sparkSession.sparkContext.parallelize(
         blockMappingVO.getCompleteBlockRowDetailVO.asScala.toSeq,
@@ -127,12 +125,16 @@ object DeleteExecution {
           var result = List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))]()
           while (records.hasNext) {
             val ((key), (rowCountDetailsVO, groupedRows)) = records.next
+            val segmentId = key.substring(0, key.indexOf(CarbonCommonConstants.FILE_SEPARATOR))
+            val segmentFile =
+              metadataDetails.find(_.getLoadName.equals(segmentId)).get.getSegmentFile
             result = result ++
                      deleteDeltaFunc(index,
                        key,
                        groupedRows.toIterator,
                        timestamp,
-                       rowCountDetailsVO)
+                       rowCountDetailsVO,
+                       segmentFile)
           }
           result
         }
@@ -219,7 +221,8 @@ object DeleteExecution {
         key: String,
         iter: Iterator[Row],
         timestamp: String,
-        rowCountDetailsVO: RowCountDetailsVO
+        rowCountDetailsVO: RowCountDetailsVO,
+        segmentFile: String
     ): Iterator[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))] = {
 
       val result = new DeleteDelataResultImpl()
@@ -255,7 +258,7 @@ object DeleteExecution {
             countOfRows = countOfRows + 1
           }
 
-          val blockPath = CarbonUpdateUtil.getTableBlockPath(TID, factPath, isPartitionTable)
+          val blockPath = CarbonUpdateUtil.getTableBlockPath(TID, tablePath, segmentFile != null)
           val completeBlockName = CarbonTablePath
             .addDataPartPrefix(CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.BLOCK_ID) +
                                CarbonCommonConstants.FACT_FILE_EXT)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
index f3b4be7..857cd81 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
@@ -103,6 +103,9 @@ case class PreAggregateTableHelper(
       .LOAD_SORT_SCOPE_DEFAULT))
     tableProperties
       .put(CarbonCommonConstants.TABLE_BLOCKSIZE, parentTable.getBlockSizeInMB.toString)
+    tableProperties.put(CarbonCommonConstants.FLAT_FOLDER,
+      parentTable.getTableInfo.getFactTable.getTableProperties.asScala.getOrElse(
+        CarbonCommonConstants.FLAT_FOLDER, CarbonCommonConstants.DEFAULT_FLAT_FOLDER))
     val tableIdentifier =
       TableIdentifier(parentTable.getTableName + "_" + dataMapName,
         Some(parentTable.getDatabaseName))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index 7d15cc1..617f5e8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -138,6 +138,11 @@ private[sql] case class CarbonDescribeFormattedCommand(
         tblProps.get(CarbonCommonConstants.TABLE_ALLOWED_COMPACTION_DAYS),
         CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT))
     }
+    if (tblProps.containsKey(CarbonCommonConstants.FLAT_FOLDER)) {
+      results ++= Seq((CarbonCommonConstants.FLAT_FOLDER.toUpperCase,
+        tblProps.get(CarbonCommonConstants.FLAT_FOLDER),
+        CarbonCommonConstants.DEFAULT_FLAT_FOLDER))
+    }
 
     results ++= Seq(("", "", ""), ("##Detailed Column property", "", ""))
     if (colPropStr.length() > 0) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
index 0bdef8a..7cee409 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
@@ -20,13 +20,15 @@ package org.apache.carbondata.spark.testsuite.partition
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
+import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -855,15 +857,24 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
     validatePartitionTableFiles(partitions, dataFiles)
   }
 
-  def getDataFiles(carbonTable: CarbonTable, segmentId: String): Array[CarbonFile] = {
-    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
-    val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
-    val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
-      override def accept(file: CarbonFile): Boolean = {
-        return file.getName.endsWith(".carbondata")
-      }
-    })
-    dataFiles
+  def getDataFiles(carbonTable: CarbonTable, segmentId: String): Array[String] = {
+    val segment = Segment.getSegment(segmentId, carbonTable.getTablePath)
+    if (segment.getSegmentFileName != null) {
+      val sfs = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
+      sfs.readIndexFiles()
+      val indexFilesMap = sfs.getIndexFilesMap
+      val dataFiles = indexFilesMap.asScala.flatMap(_._2.asScala).map(f => new Path(f).getName)
+      dataFiles.toArray
+    } else {
+      val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
+      val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
+      val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
+        override def accept(file: CarbonFile): Boolean = {
+          return file.getName.endsWith(".carbondata")
+        }
+      })
+      dataFiles.map(_.getName)
+    }
   }
 
   /**
@@ -871,10 +882,10 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
    * @param partitions
    * @param dataFiles
    */
-  def validatePartitionTableFiles(partitions: Seq[Int], dataFiles: Array[CarbonFile]): Unit = {
+  def validatePartitionTableFiles(partitions: Seq[Int], dataFiles: Array[String]): Unit = {
     val partitionIds: ListBuffer[Int] = new ListBuffer[Int]()
     dataFiles.foreach { dataFile =>
-      val partitionId = CarbonTablePath.DataFileUtil.getTaskNo(dataFile.getName).split("_")(0).toInt
+      val partitionId = CarbonTablePath.DataFileUtil.getTaskNo(dataFile).split("_")(0).toInt
       partitionIds += partitionId
       assert(partitions.contains(partitionId))
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
index 48733dc..1c7cb10 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
@@ -43,9 +43,9 @@ class CarbonGetTableDetailCommandTestCase extends QueryTest with BeforeAndAfterA
     assertResult(2)(result.length)
     assertResult("table_info1")(result(0).getString(0))
     // 2096 is the size of carbon table
-    assertResult(2096)(result(0).getLong(1))
+    assertResult(2098)(result(0).getLong(1))
     assertResult("table_info2")(result(1).getString(0))
-    assertResult(2096)(result(1).getLong(1))
+    assertResult(2098)(result(1).getLong(1))
   }
 
   override def afterAll: Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 3dc34d3..bfa498e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -165,6 +165,7 @@ public class DataMapWriterListener {
         writer.finish();
       }
     }
+    registry.clear();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java b/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
index eb02ede..fcabef5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
@@ -170,7 +170,8 @@ public abstract class AbstractDataLoadProcessorStep {
             carbonDataFileAttributes.getTaskId(),
             bucketId,
             0,
-            String.valueOf(carbonDataFileAttributes.getFactTimeStamp())));
+            String.valueOf(carbonDataFileAttributes.getFactTimeStamp()),
+            configuration.getSegmentId()));
     return listener;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
index bc28ace..5bed8b1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
@@ -64,7 +64,7 @@ public class TableProcessingOperations {
       CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
         @Override public boolean accept(CarbonFile path) {
           String segmentId =
-              CarbonTablePath.DataFileUtil.getSegmentId(path.getAbsolutePath() + "/dummy");
+              CarbonTablePath.DataFileUtil.getSegmentIdFromPath(path.getAbsolutePath() + "/dummy");
           boolean found = false;
           for (int j = 0; j < details.length; j++) {
             if (details[j].getLoadName().equals(segmentId)) {
@@ -76,8 +76,8 @@ public class TableProcessingOperations {
         }
       });
       for (int k = 0; k < listFiles.length; k++) {
-        String segmentId =
-            CarbonTablePath.DataFileUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy");
+        String segmentId = CarbonTablePath.DataFileUtil
+            .getSegmentIdFromPath(listFiles[k].getAbsolutePath() + "/dummy");
         if (isCompactionFlow) {
           if (segmentId.contains(".")) {
             CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 90c297e..4d3f3fc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.dictionary.service.DictionaryServiceProvider;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
@@ -83,7 +84,7 @@ public class CarbonLoadModel implements Serializable {
   /**
    * load Id
    */
-  private String segmentId;
+  private Segment segment;
 
   private String allDictPath;
 
@@ -424,7 +425,7 @@ public class CarbonLoadModel implements Serializable {
     copy.blocksID = blocksID;
     copy.taskNo = taskNo;
     copy.factTimeStamp = factTimeStamp;
-    copy.segmentId = segmentId;
+    copy.segment = segment;
     copy.serializationNullFormat = serializationNullFormat;
     copy.badRecordsLoggerEnable = badRecordsLoggerEnable;
     copy.badRecordsAction = badRecordsAction;
@@ -479,7 +480,7 @@ public class CarbonLoadModel implements Serializable {
     copyObj.blocksID = blocksID;
     copyObj.taskNo = taskNo;
     copyObj.factTimeStamp = factTimeStamp;
-    copyObj.segmentId = segmentId;
+    copyObj.segment = segment;
     copyObj.serializationNullFormat = serializationNullFormat;
     copyObj.badRecordsLoggerEnable = badRecordsLoggerEnable;
     copyObj.badRecordsAction = badRecordsAction;
@@ -609,14 +610,24 @@ public class CarbonLoadModel implements Serializable {
    * @return load Id
    */
   public String getSegmentId() {
-    return segmentId;
+    if (segment != null) {
+      return segment.getSegmentNo();
+    } else {
+      return null;
+    }
   }
 
   /**
    * @param segmentId
    */
   public void setSegmentId(String segmentId) {
-    this.segmentId = segmentId;
+    if (segmentId != null) {
+      this.segment = Segment.toSegment(segmentId);
+    }
+  }
+
+  public Segment getSegment() {
+    return segment;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
index 7a11c8b..b22599d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.processing.merger;
 
+import java.io.IOException;
 import java.util.List;
 
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
@@ -46,10 +47,11 @@ public abstract class AbstractResultProcessor {
   public abstract void close();
 
   protected void setDataFileAttributesInModel(CarbonLoadModel loadModel,
-      CompactionType compactionType, CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
+      CompactionType compactionType, CarbonFactDataHandlerModel carbonFactDataHandlerModel)
+      throws IOException {
     CarbonDataFileAttributes carbonDataFileAttributes;
     if (compactionType == CompactionType.IUD_UPDDEL_DELTA) {
-      long taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegmentId(),
+      long taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegment(),
           loadModel.getTablePath());
       // Increase the Task Index as in IUD_UPDDEL_DELTA_COMPACTION the new file will
       // be written in same segment. So the TaskNo should be incremented by 1 from max val.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index fef8ab9..dde18a9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -175,7 +175,6 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
                   partitionSpec.getLocation().toString(), carbonLoadModel.getFactTimeStamp() + "",
                   partitionSpec.getPartitions());
         } catch (IOException e) {
-          isCompactionSuccess = false;
           throw e;
         }
       }
@@ -428,6 +427,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
         .getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, segmentProperties, tableName,
             tempStoreLocation, carbonStoreLocation);
+    carbonFactDataHandlerModel.setSegmentId(carbonLoadModel.getSegmentId());
     setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonFactDataHandlerModel);
     dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(carbonFactDataHandlerModel,
         CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index 9a3258e..b877d52 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -63,7 +63,8 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
 
   public RowResultMergerProcessor(String databaseName,
       String tableName, SegmentProperties segProp, String[] tempStoreLocation,
-      CarbonLoadModel loadModel, CompactionType compactionType, PartitionSpec partitionSpec) {
+      CarbonLoadModel loadModel, CompactionType compactionType, PartitionSpec partitionSpec)
+      throws IOException {
     this.segprop = segProp;
     this.partitionSpec = partitionSpec;
     this.loadModel = loadModel;
@@ -84,6 +85,7 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
             tempStoreLocation, carbonStoreLocation);
     setDataFileAttributesInModel(loadModel, compactionType, carbonFactDataHandlerModel);
     carbonFactDataHandlerModel.setCompactionFlow(true);
+    carbonFactDataHandlerModel.setSegmentId(loadModel.getSegmentId());
     dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
index 221697f..9b09269 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
@@ -59,6 +59,7 @@ public class RowResultProcessor {
     carbonFactDataHandlerModel.setBucketId(bucketId);
     //Note: set compaction flow just to convert decimal type
     carbonFactDataHandlerModel.setCompactionFlow(true);
+    carbonFactDataHandlerModel.setSegmentId(loadModel.getSegmentId());
     dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 87a6de0..27249ab 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -267,7 +267,8 @@ public class CarbonFactDataHandlerModel {
               carbonDataFileAttributes.getTaskId(),
               bucketId,
               0,
-              String.valueOf(carbonDataFileAttributes.getFactTimeStamp())));
+              String.valueOf(carbonDataFileAttributes.getFactTimeStamp()),
+              configuration.getSegmentId()));
     }
     carbonFactDataHandlerModel.dataMapWriterlistener = listener;
     carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount();
@@ -337,7 +338,8 @@ public class CarbonFactDataHandlerModel {
             CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(loadModel.getTaskNo()),
             carbonFactDataHandlerModel.getBucketId(),
             carbonFactDataHandlerModel.getTaskExtension(),
-            String.valueOf(loadModel.getFactTimeStamp())));
+            String.valueOf(loadModel.getFactTimeStamp()),
+            loadModel.getSegmentId()));
 
     carbonFactDataHandlerModel.dataMapWriterlistener = listener;
     return carbonFactDataHandlerModel;


[3/3] carbondata git commit: [CARBONDATA-2428] Support flat folder for managed carbon table

Posted by ja...@apache.org.
[CARBONDATA-2428] Support flat folder for managed carbon table

Currently carbondata writing happens in fixed path tablepath/Fact/Part0/Segment_NUM folder and it is not same as hive/parquet folder structure. This PR makes all files written will be inside tablepath, it does not maintain any segment folder structure. Only for partition it adds the folder.

This feature can be controlled through a table property flat_folder. The default value of it is false.

This closes #2207


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/60dfdd38
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/60dfdd38
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/60dfdd38

Branch: refs/heads/master
Commit: 60dfdd3857d037231844d9fb95967dcdb0071f40
Parents: 181f0ac
Author: ravipesala <ra...@gmail.com>
Authored: Fri Apr 20 07:34:00 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Sat Jun 16 10:56:10 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   8 ++
 .../apache/carbondata/core/datamap/Segment.java |  20 +++
 .../carbondata/core/datamap/TableDataMap.java   |   4 +-
 .../core/datamap/dev/DataMapWriter.java         |   3 +-
 .../core/datastore/block/TableBlockInfo.java    |  27 ++--
 .../blockletindex/BlockletDataMapFactory.java   |   4 +-
 .../core/metadata/SegmentFileStore.java         |  91 ++++++++++----
 .../core/metadata/schema/table/CarbonTable.java |  13 ++
 .../core/mutate/CarbonUpdateUtil.java           |  71 ++++++-----
 .../executor/impl/AbstractQueryExecutor.java    |  18 ++-
 .../core/scan/result/BlockletScannedResult.java |   3 +-
 .../scan/scanner/impl/BlockletFullScanner.java  |   6 +-
 .../SegmentUpdateStatusManager.java             |  40 +++---
 .../apache/carbondata/core/util/CarbonUtil.java |  18 +--
 .../core/util/path/CarbonTablePath.java         |  77 +++++++++---
 .../datastore/block/TableBlockInfoTest.java     |   2 +-
 .../CarbonFormatDirectoryStructureTest.java     |  22 +---
 .../bloom/BloomCoarseGrainDataMapFactory.java   |  10 +-
 .../lucene/LuceneDataMapFactoryBase.java        |  17 ++-
 .../carbondata/hadoop/CarbonInputSplit.java     |  35 ++++--
 .../hadoop/api/CarbonInputFormat.java           |   2 +-
 .../hadoop/api/CarbonTableInputFormat.java      |  12 +-
 .../sdv/generated/DataLoadingIUDTestCase.scala  |   7 +-
 .../sdv/generated/MergeIndexTestCase.scala      |  55 ++++----
 .../lucene/LuceneFineGrainDataMapSuite.scala    |  36 ++++--
 .../dataload/TestLoadDataGeneral.scala          |   9 +-
 .../InsertIntoCarbonTableTestCase.scala         |  33 ++---
 .../CarbonIndexFileMergeTestCase.scala          |  53 +++++---
 ...ompactionSupportGlobalSortFunctionTest.scala |  15 ++-
 ...mpactionSupportGlobalSortParameterTest.scala |  15 ++-
 .../dataload/TestBatchSortDataLoad.scala        |  18 +--
 .../dataload/TestDataLoadWithFileName.scala     |  26 ++--
 .../dataload/TestGlobalSortDataLoad.scala       |  23 +++-
 .../testsuite/datamap/CGDataMapTestCase.scala   |  10 +-
 .../testsuite/datamap/FGDataMapTestCase.scala   |   6 +-
 .../testsuite/datamap/TestDataMapCommand.scala  |  27 ++--
 .../FlatFolderTableLoadingTestCase.scala        | 125 +++++++++++++++++++
 .../iud/UpdateCarbonTableTestCase.scala         |  10 +-
 .../TestDataLoadingForPartitionTable.scala      |  27 ++--
 .../load/DataLoadProcessorStepOnSpark.scala     |   2 +-
 .../spark/rdd/CarbonIUDMergerRDD.scala          |   4 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   7 +-
 .../carbondata/spark/util/CommonUtil.scala      |  17 +++
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   2 +
 .../org/apache/spark/util/PartitionUtils.scala  |  73 +++++++++--
 .../datamap/IndexDataMapRebuildRDD.scala        |   4 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  89 ++++++++++---
 .../spark/rdd/CarbonTableCompactor.scala        |   4 +-
 .../org/apache/spark/sql/CarbonSession.scala    |   2 +-
 .../command/mutation/DeleteExecution.scala      |  23 ++--
 .../preaaggregate/PreAggregateTableHelper.scala |   3 +
 .../table/CarbonDescribeFormattedCommand.scala  |   5 +
 .../partition/TestAlterPartitionTable.scala     |  35 ++++--
 .../CarbonGetTableDetailComandTestCase.scala    |   4 +-
 .../datamap/DataMapWriterListener.java          |   1 +
 .../loading/AbstractDataLoadProcessorStep.java  |   3 +-
 .../loading/TableProcessingOperations.java      |   6 +-
 .../loading/model/CarbonLoadModel.java          |  21 +++-
 .../merger/AbstractResultProcessor.java         |   6 +-
 .../merger/CompactionResultSortProcessor.java   |   2 +-
 .../merger/RowResultMergerProcessor.java        |   4 +-
 .../partition/spliter/RowResultProcessor.java   |   1 +
 .../store/CarbonFactDataHandlerModel.java       |   6 +-
 .../store/writer/AbstractFactDataWriter.java    |   6 +-
 .../processing/util/CarbonLoaderUtil.java       |   4 +-
 .../store/worker/SearchRequestHandler.java      |  16 ++-
 .../streaming/CarbonStreamRecordWriter.java     |   2 +-
 67 files changed, 961 insertions(+), 389 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 19ff494..2fcf0f5 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -718,6 +718,11 @@ public final class CarbonCommonConstants {
   public static final String DEFAULT_ENABLE_AUTO_LOAD_MERGE = "false";
 
   /**
+   * DEFAULT_FLAT_FOLDER
+   */
+  public static final String DEFAULT_FLAT_FOLDER = "false";
+
+  /**
    * ZOOKEEPER_ENABLE_LOCK if this is set to true then zookeeper will be used to handle locking
    * mechanism of carbon
    */
@@ -929,6 +934,9 @@ public final class CarbonCommonConstants {
   public static final String TABLE_COMPACTION_PRESERVE_SEGMENTS = "compaction_preserve_segments";
   // table property name of allowed compaction days while compaction
   public static final String TABLE_ALLOWED_COMPACTION_DAYS = "allowed_compaction_days";
+  // Flat folder support on table. when it is true all carbondata files store directly under table
+  // path instead of sub folders.
+  public static final String FLAT_FOLDER = "flat_folder";
 
   /**
    * 16 mb size

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index 7b63b84..425cdf6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -162,6 +162,16 @@ public class Segment implements Serializable {
   }
 
   /**
+   * Converts to segment object
+   * @param segmentId
+   * @return
+   */
+  public static Segment toSegment(String segmentId) {
+    // SegmentId can be combination of segmentNo and segmentFileName.
+    return toSegment(segmentId, null);
+  }
+
+  /**
    * Read the table status and get the segment corresponding to segmentNo
    * @param segmentNo
    * @param tablePath
@@ -170,6 +180,16 @@ public class Segment implements Serializable {
   public static Segment getSegment(String segmentNo, String tablePath) {
     LoadMetadataDetails[] loadMetadataDetails =
         SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(tablePath));
+    return getSegment(segmentNo, loadMetadataDetails);
+  }
+
+  /**
+   * Get the segment object corresponding to segmentNo
+   * @param segmentNo
+   * @param loadMetadataDetails
+   * @return
+   */
+  public static Segment getSegment(String segmentNo, LoadMetadataDetails[] loadMetadataDetails) {
     for (LoadMetadataDetails details: loadMetadataDetails) {
       if (details.getLoadName().equals(segmentNo)) {
         return new Segment(details.getLoadName(), details.getSegmentFile());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 4ce0f6c..cb7ec03 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -103,7 +103,7 @@ public final class TableDataMap extends OperationEventListener {
       }
       blocklets.addAll(addSegmentId(
           blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment),
-          segment.getSegmentNo()));
+          segment.toString()));
     }
     return blocklets;
   }
@@ -182,7 +182,7 @@ public final class TableDataMap extends OperationEventListener {
         detailedBlocklet.setDataMapWriterPath(blockletwritePath);
         serializer.serializeBlocklet((FineGrainBlocklet) blocklet, blockletwritePath);
       }
-      detailedBlocklet.setSegmentId(distributable.getSegment().getSegmentNo());
+      detailedBlocklet.setSegmentId(distributable.getSegment().toString());
       detailedBlocklets.add(detailedBlocklet);
     }
     return detailedBlocklets;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
index 89d5d76..8c8d2d8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
@@ -16,7 +16,6 @@
  */
 package org.apache.carbondata.core.datamap.dev;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.List;
 
@@ -133,7 +132,7 @@ public abstract class DataMapWriter {
    */
   public static String getDefaultDataMapPath(
       String tablePath, String segmentId, String dataMapName) {
-    return CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator + dataMapName;
+    return CarbonTablePath.getDataMapStorePath(tablePath, segmentId, dataMapName);
   }
 
   public boolean isWritingFinished() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index c0cebe0..34d406b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
@@ -64,7 +65,7 @@ public class TableBlockInfo implements Distributable, Serializable {
   /**
    * id of the segment this will be used to sort the blocks
    */
-  private String segmentId;
+  private Segment segment;
 
   /**
    * id of the Blocklet.
@@ -120,7 +121,7 @@ public class TableBlockInfo implements Distributable, Serializable {
     this.filePath = FileFactory.getUpdatedFilePath(filePath);
     this.blockletId = "0";
     this.blockOffset = blockOffset;
-    this.segmentId = segmentId;
+    this.segment = Segment.toSegment(segmentId);
     this.locations = locations;
     this.blockLength = blockLength;
     this.version = version;
@@ -196,7 +197,7 @@ public class TableBlockInfo implements Distributable, Serializable {
     info.filePath = filePath;
     info.blockOffset = blockOffset;
     info.blockLength = blockLength;
-    info.segmentId = segmentId;
+    info.segment = segment;
     info.blockletId = blockletId;
     info.locations = locations;
     info.version = version;
@@ -229,7 +230,15 @@ public class TableBlockInfo implements Distributable, Serializable {
    * @return the segmentId
    */
   public String getSegmentId() {
-    return segmentId;
+    if (segment == null) {
+      return null;
+    } else {
+      return segment.getSegmentNo();
+    }
+  }
+
+  public Segment getSegment() {
+    return segment;
   }
 
   /**
@@ -264,7 +273,7 @@ public class TableBlockInfo implements Distributable, Serializable {
       return false;
     }
     TableBlockInfo other = (TableBlockInfo) obj;
-    if (!segmentId.equals(other.segmentId)) {
+    if (!segment.equals(other.segment)) {
       return false;
     }
     if (blockOffset != other.blockOffset) {
@@ -300,8 +309,8 @@ public class TableBlockInfo implements Distributable, Serializable {
     // get the segment id
     // converr seg ID to double.
 
-    double seg1 = Double.parseDouble(segmentId);
-    double seg2 = Double.parseDouble(((TableBlockInfo) other).segmentId);
+    double seg1 = Double.parseDouble(segment.getSegmentNo());
+    double seg2 = Double.parseDouble(((TableBlockInfo) other).segment.getSegmentNo());
     if (seg1 - seg2 < 0) {
       return -1;
     }
@@ -358,7 +367,7 @@ public class TableBlockInfo implements Distributable, Serializable {
     int result = filePath.hashCode();
     result = 31 * result + (int) (blockOffset ^ (blockOffset >>> 32));
     result = 31 * result + (int) (blockLength ^ (blockLength >>> 32));
-    result = 31 * result + segmentId.hashCode();
+    result = 31 * result + segment.hashCode();
     result = 31 * result + blockletInfos.getStartBlockletNumber();
     return result;
   }
@@ -457,7 +466,7 @@ public class TableBlockInfo implements Distributable, Serializable {
     sb.append("filePath='").append(filePath).append('\'');
     sb.append(", blockOffset=").append(blockOffset);
     sb.append(", blockLength=").append(blockLength);
-    sb.append(", segmentId='").append(segmentId).append('\'');
+    sb.append(", segment='").append(segment.toString()).append('\'');
     sb.append(", blockletId='").append(blockletId).append('\'');
     sb.append(", locations=").append(Arrays.toString(locations));
     sb.append('}');

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index c434e2e..65fcb4b 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -122,7 +122,9 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
     if (tableBlockIndexUniqueIdentifiers == null) {
       tableBlockIndexUniqueIdentifiers =
           BlockletDataMapUtil.getTableBlockUniqueIdentifiers(segment);
-      segmentMap.put(segment.getSegmentNo(), tableBlockIndexUniqueIdentifiers);
+      if (tableBlockIndexUniqueIdentifiers.size() > 0) {
+        segmentMap.put(segment.getSegmentNo(), tableBlockIndexUniqueIdentifiers);
+      }
     }
     return tableBlockIndexUniqueIdentifiers;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index acfc145..0b1c1e3 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -140,13 +140,16 @@ public class SegmentFileStore {
 
   /**
    * Write segment file to the metadata folder of the table
-   * @param tablePath table path
+   *
+   * @param carbonTable CarbonTable
    * @param segmentId segment id
-   * @param UUID a UUID string used to construct the segment file name
+   * @param UUID      a UUID string used to construct the segment file name
    * @return segment file name
    */
-  public static String writeSegmentFile(String tablePath, String segmentId, String UUID)
+  public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String UUID)
       throws IOException {
+    String tablePath = carbonTable.getTablePath();
+    boolean supportFlatFolder = carbonTable.isSupportFlatFolder();
     String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
     CarbonFile segmentFolder = FileFactory.getCarbonFile(segmentPath);
     CarbonFile[] indexFiles = segmentFolder.listFiles(new CarbonFileFilter() {
@@ -167,9 +170,12 @@ public class SegmentFileStore {
           folderDetails.getFiles().add(file.getName());
         }
       }
-      String segmentRelativePath = segmentPath.substring(tablePath.length(), segmentPath.length());
+      String segmentRelativePath = "/";
+      if (!supportFlatFolder) {
+        segmentRelativePath = segmentPath.substring(tablePath.length(), segmentPath.length());
+      }
       segmentFile.addPath(segmentRelativePath, folderDetails);
-      String segmentFileFolder =  CarbonTablePath.getSegmentFilesLocation(tablePath);
+      String segmentFileFolder = CarbonTablePath.getSegmentFilesLocation(tablePath);
       CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFileFolder);
       if (!carbonFile.exists()) {
         carbonFile.mkdirs(segmentFileFolder, FileFactory.getFileType(segmentFileFolder));
@@ -177,12 +183,31 @@ public class SegmentFileStore {
       String segmentFileName = genSegmentFileName(segmentId, UUID) + CarbonTablePath.SEGMENT_EXT;
       // write segment info to new file.
       writeSegmentFile(segmentFile, segmentFileFolder + File.separator + segmentFileName);
+
+      // Move all files to table path from segment folder.
+      if (supportFlatFolder) {
+        moveFromTempFolder(segmentPath, tablePath);
+      }
+
       return segmentFileName;
     }
     return null;
   }
 
   /**
+   * Move the loaded data from source folder to destination folder.
+   */
+  private static void moveFromTempFolder(String source, String dest) {
+
+    CarbonFile oldFolder = FileFactory.getCarbonFile(source);
+    CarbonFile[] oldFiles = oldFolder.listFiles();
+    for (CarbonFile file : oldFiles) {
+      file.renameForce(dest + CarbonCommonConstants.FILE_SEPARATOR + file.getName());
+    }
+    oldFolder.delete();
+  }
+
+  /**
    * Writes the segment file in json format
    * @param segmentFile
    * @param path
@@ -218,26 +243,37 @@ public class SegmentFileStore {
       throws IOException {
     CarbonFile[] segmentFiles = getSegmentFiles(readPath);
     if (segmentFiles != null && segmentFiles.length > 0) {
-      SegmentFile segmentFile = null;
-      for (CarbonFile file : segmentFiles) {
-        SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath());
-        if (segmentFile == null && localSegmentFile != null) {
-          segmentFile = localSegmentFile;
-        }
-        if (localSegmentFile != null) {
-          segmentFile = segmentFile.merge(localSegmentFile);
-        }
-      }
-      if (segmentFile != null) {
-        String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT;
-        writeSegmentFile(segmentFile, path);
-        FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath));
-      }
+      SegmentFile segmentFile = mergeSegmentFiles(mergeFileName, writePath, segmentFiles);
+      FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath));
       return segmentFile;
     }
     return null;
   }
 
+  public static SegmentFile mergeSegmentFiles(String mergeFileName, String writePath,
+      CarbonFile[] segmentFiles) throws IOException {
+    SegmentFile segmentFile = null;
+    for (CarbonFile file : segmentFiles) {
+      SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath());
+      if (segmentFile == null && localSegmentFile != null) {
+        segmentFile = localSegmentFile;
+      }
+      if (localSegmentFile != null) {
+        segmentFile = segmentFile.merge(localSegmentFile);
+      }
+    }
+    if (segmentFile != null) {
+      String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT;
+      writeSegmentFile(segmentFile, path);
+    }
+    return segmentFile;
+  }
+
+  public static String getSegmentFilePath(String tablePath, String segmentFileName) {
+    return CarbonTablePath.getSegmentFilesLocation(tablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + segmentFileName;
+  }
+
   /**
    * This API will update the segmentFile of a passed segment.
    *
@@ -248,6 +284,9 @@ public class SegmentFileStore {
       throws IOException {
     boolean status = false;
     String tableStatusPath = CarbonTablePath.getTableStatusFilePath(tablePath);
+    if (!FileFactory.isFileExist(tableStatusPath)) {
+      return status;
+    }
     String metadataPath = CarbonTablePath.getMetadataPath(tablePath);
     AbsoluteTableIdentifier absoluteTableIdentifier =
         AbsoluteTableIdentifier.from(tablePath, null, null);
@@ -654,7 +693,10 @@ public class SegmentFileStore {
             new SegmentFileStore(table.getTablePath(), segment.getSegmentFile());
         fileStore.readIndexFiles(SegmentStatus.MARKED_FOR_DELETE, false);
         if (forceDelete) {
-          deletePhysicalPartition(partitionSpecs, fileStore.getIndexFilesMap());
+          deletePhysicalPartition(
+              partitionSpecs,
+              fileStore.getIndexFilesMap(),
+              table.getTablePath());
         }
         for (Map.Entry<String, List<String>> entry : fileStore.indexFilesMap.entrySet()) {
           String indexFile = entry.getKey();
@@ -698,7 +740,7 @@ public class SegmentFileStore {
         FileFactory.deleteFile(file, FileFactory.getFileType(file));
       }
     }
-    deletePhysicalPartition(partitionSpecs, indexFilesMap);
+    deletePhysicalPartition(partitionSpecs, indexFilesMap, tablePath);
     String segmentFilePath =
         CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
             + segmentFile;
@@ -713,7 +755,7 @@ public class SegmentFileStore {
    * If partition specs are null, then directly delete parent directory in locationMap.
    */
   private static void deletePhysicalPartition(List<PartitionSpec> partitionSpecs,
-      Map<String, List<String>> locationMap) throws IOException {
+      Map<String, List<String>> locationMap, String tablePath) throws IOException {
     for (Map.Entry<String, List<String>> entry : locationMap.entrySet()) {
       if (partitionSpecs != null) {
         Path location = new Path(entry.getKey());
@@ -733,7 +775,8 @@ public class SegmentFileStore {
         Path location = new Path(entry.getKey()).getParent();
         // delete the segment folder
         CarbonFile segmentPath = FileFactory.getCarbonFile(location.toString());
-        if (null != segmentPath) {
+        if (null != segmentPath && segmentPath.exists() &&
+            !new Path(tablePath).equals(new Path(segmentPath.getAbsolutePath()))) {
           FileFactory.deleteAllCarbonFilesOfDir(segmentPath);
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 20bc7a1..f48ada0 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -1008,6 +1008,19 @@ public class CarbonTable implements Serializable {
   }
 
   /**
+   * Whether this table supports flat folder structure, it means all data files directly written
+   * under table path
+   */
+  public boolean isSupportFlatFolder() {
+    boolean supportFlatFolder = Boolean.parseBoolean(CarbonCommonConstants.DEFAULT_FLAT_FOLDER);
+    Map<String, String> tblProps = getTableInfo().getFactTable().getTableProperties();
+    if (tblProps.containsKey(CarbonCommonConstants.FLAT_FOLDER)) {
+      supportFlatFolder = tblProps.get(CarbonCommonConstants.FLAT_FOLDER).equalsIgnoreCase("true");
+    }
+    return supportFlatFolder;
+  }
+
+  /**
    * update the carbon table by using the passed tableInfo
    *
    * @param table

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index 1d0ef44..40d498c 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -47,6 +47,8 @@ import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
+import org.apache.hadoop.fs.Path;
+
 /**
  * This class contains all update utility methods
  */
@@ -78,20 +80,17 @@ public class CarbonUpdateUtil {
 
   /**
    * Returns block path from tuple id
-   *
-   * @param tid
-   * @param factPath
-   * @return
    */
-  public static String getTableBlockPath(String tid, String factPath, boolean isPartitionTable) {
+  public static String getTableBlockPath(String tid, String tablePath, boolean isSegmentFile) {
     String partField = getRequiredFieldFromTID(tid, TupleIdEnum.PART_ID);
-    if (isPartitionTable) {
-      return factPath + CarbonCommonConstants.FILE_SEPARATOR + partField;
+    // If it has segment file then partfield can be appended directly to table path
+    if (isSegmentFile) {
+      return tablePath + CarbonCommonConstants.FILE_SEPARATOR + partField.replace("#", "/");
     }
     String part = CarbonTablePath.addPartPrefix(partField);
     String segment =
             CarbonTablePath.addSegmentPrefix(getRequiredFieldFromTID(tid, TupleIdEnum.SEGMENT_ID));
-    return factPath + CarbonCommonConstants.FILE_SEPARATOR + part
+    return CarbonTablePath.getFactDir(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + part
             + CarbonCommonConstants.FILE_SEPARATOR + segment;
   }
 
@@ -386,29 +385,45 @@ public class CarbonUpdateUtil {
     return segmentName.split(CarbonCommonConstants.UNDERSCORE)[1];
   }
 
-  public static long getLatestTaskIdForSegment(String segmentId, String tablePath) {
-    String segmentDirPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
+  public static long getLatestTaskIdForSegment(Segment segment, String tablePath)
+      throws IOException {
+    long max = 0;
+    List<String> dataFiles = new ArrayList<>();
+    if (segment.getSegmentFileName() != null) {
+      SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName());
+      fileStore.readIndexFiles();
+      Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+      List<String> dataFilePaths = new ArrayList<>();
+      for (List<String> paths : indexFilesMap.values()) {
+        dataFilePaths.addAll(paths);
+      }
+      for (String dataFilePath : dataFilePaths) {
+        dataFiles.add(new Path(dataFilePath).getName());
+      }
 
-    // scan all the carbondata files and get the latest task ID.
-    CarbonFile segment =
-            FileFactory.getCarbonFile(segmentDirPath, FileFactory.getFileType(segmentDirPath));
-    CarbonFile[] dataFiles = segment.listFiles(new CarbonFileFilter() {
-      @Override public boolean accept(CarbonFile file) {
+    } else {
+      String segmentDirPath = CarbonTablePath.getSegmentPath(tablePath, segment.getSegmentNo());
+      // scan all the carbondata files and get the latest task ID.
+      CarbonFile segmentDir =
+          FileFactory.getCarbonFile(segmentDirPath, FileFactory.getFileType(segmentDirPath));
+      CarbonFile[] carbonDataFiles = segmentDir.listFiles(new CarbonFileFilter() {
+        @Override public boolean accept(CarbonFile file) {
 
-        if (file.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
-          return true;
+          if (file.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
+            return true;
+          }
+          return false;
         }
-        return false;
+      });
+      for (CarbonFile carbonDataFile : carbonDataFiles) {
+        dataFiles.add(carbonDataFile.getName());
       }
-    });
-    long max = 0;
-    if (null != dataFiles) {
-      for (CarbonFile file : dataFiles) {
-        long taskNumber =
-            Long.parseLong(CarbonTablePath.DataFileUtil.getTaskNo(file.getName()).split("_")[0]);
-        if (taskNumber > max) {
-          max = taskNumber;
-        }
+    }
+    for (String name : dataFiles) {
+      long taskNumber =
+          Long.parseLong(CarbonTablePath.DataFileUtil.getTaskNo(name).split("_")[0]);
+      if (taskNumber > max) {
+        max = taskNumber;
       }
     }
     // return max task No
@@ -562,7 +577,7 @@ public class CarbonUpdateUtil {
     List<Segment> segmentFilesToBeUpdatedLatest = new ArrayList<>();
     for (Segment segment : segmentFilesToBeUpdated) {
       String file =
-          SegmentFileStore.writeSegmentFile(table.getTablePath(), segment.getSegmentNo(), UUID);
+          SegmentFileStore.writeSegmentFile(table, segment.getSegmentNo(), UUID);
       segmentFilesToBeUpdatedLatest.add(new Segment(segment.getSegmentNo(), file));
     }
     if (segmentFilesToBeUpdated.size() > 0) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index ff0e5ce..2bbe75c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.common.logging.impl.StandardLogService;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.IndexKey;
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
@@ -68,6 +69,7 @@ import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import org.apache.commons.lang3.ArrayUtils;
 
@@ -256,7 +258,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
               dataRefNode.numberOfNodes(),
               dataRefNode.getBlockInfos().get(0).getFilePath(),
               dataRefNode.getBlockInfos().get(0).getDeletedDeltaFilePath(),
-              dataRefNode.getBlockInfos().get(0).getSegmentId()));
+              dataRefNode.getBlockInfos().get(0).getSegment()));
     }
     if (null != queryModel.getStatisticsRecorder()) {
       QueryStatistic queryStatistic = new QueryStatistic();
@@ -278,7 +280,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
    */
   private BlockExecutionInfo getBlockExecutionInfoForBlock(QueryModel queryModel,
       AbstractIndex blockIndex, int startBlockletIndex, int numberOfBlockletToScan, String filePath,
-      String[] deleteDeltaFiles, String segmentId)
+      String[] deleteDeltaFiles, Segment segment)
       throws QueryExecutionException {
     BlockExecutionInfo blockExecutionInfo = new BlockExecutionInfo();
     SegmentProperties segmentProperties = blockIndex.getSegmentProperties();
@@ -291,9 +293,15 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
             queryModel.getProjectionDimensions(), tableBlockDimensions,
             segmentProperties.getComplexDimensions(), queryModel.getProjectionMeasures().size(),
             queryModel.getTable().getTableInfo().isTransactionalTable());
-    blockExecutionInfo.setBlockId(
-        CarbonUtil.getBlockId(queryModel.getAbsoluteTableIdentifier(), filePath, segmentId,
-            queryModel.getTable().getTableInfo().isTransactionalTable()));
+    String blockId = CarbonUtil
+        .getBlockId(queryModel.getAbsoluteTableIdentifier(), filePath, segment.getSegmentNo(),
+            queryModel.getTable().getTableInfo().isTransactionalTable(),
+            segment.getSegmentFileName() != null);
+    if (segment.getSegmentFileName() != null) {
+      blockExecutionInfo.setBlockId(CarbonTablePath.getShortBlockIdForPartitionTable(blockId));
+    } else {
+      blockExecutionInfo.setBlockId(CarbonTablePath.getShortBlockId(blockId));
+    }
     blockExecutionInfo.setDeleteDeltaFilePath(deleteDeltaFiles);
     blockExecutionInfo.setStartBlockletIndex(startBlockletIndex);
     blockExecutionInfo.setNumberOfBlockletToScan(numberOfBlockletToScan);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
index b85945f..eadd502 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
@@ -45,7 +45,6 @@ import org.apache.carbondata.core.stats.QueryStatistic;
 import org.apache.carbondata.core.stats.QueryStatisticsConstants;
 import org.apache.carbondata.core.stats.QueryStatisticsModel;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 /**
  * Scanned result class which will store and provide the result on request
@@ -525,7 +524,7 @@ public abstract class BlockletScannedResult {
    * "Part0/Segment_0/part-0-0_batchno0-0-1517155583332.carbondata/0"
    */
   public void setBlockletId(String blockletId) {
-    this.blockletId = CarbonTablePath.getShortBlockId(blockletId);
+    this.blockletId = blockletId;
     blockletNumber = CarbonUpdateUtil.getRequiredFieldFromTID(blockletId, TupleIdEnum.BLOCKLET_ID);
     // if deleted recors map is present for this block
     // then get the first page deleted vo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
index a48804c..1665ce6 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
@@ -82,9 +82,9 @@ public class BlockletFullScanner implements BlockletScanner {
         .get(QueryStatisticsConstants.TOTAL_PAGE_SCANNED);
     totalPagesScanned.addCountStatistic(QueryStatisticsConstants.TOTAL_PAGE_SCANNED,
         totalPagesScanned.getCount() + rawBlockletColumnChunks.getDataBlock().numberOfPages());
-    scannedResult.setBlockletId(
-        blockExecutionInfo.getBlockIdString() + CarbonCommonConstants.FILE_SEPARATOR +
-            rawBlockletColumnChunks.getDataBlock().blockletIndex());
+    String blockletId = blockExecutionInfo.getBlockIdString() + CarbonCommonConstants.FILE_SEPARATOR
+        + rawBlockletColumnChunks.getDataBlock().blockletIndex();
+    scannedResult.setBlockletId(blockletId);
     if (!blockExecutionInfo.isPrefetchBlocklet()) {
       readBlocklet(rawBlockletColumnChunks);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index c2faadc..eb850e4 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -69,7 +69,6 @@ public class SegmentUpdateStatusManager {
   private LoadMetadataDetails[] segmentDetails;
   private SegmentUpdateDetails[] updateDetails;
   private Map<String, SegmentUpdateDetails> blockAndDetailsMap;
-  private boolean isPartitionTable;
 
   public SegmentUpdateStatusManager(CarbonTable table,
       LoadMetadataDetails[] segmentDetails) {
@@ -77,7 +76,6 @@ public class SegmentUpdateStatusManager {
     // current it is used only for read function scenarios, as file update always requires to work
     // on latest file status.
     this.segmentDetails = segmentDetails;
-    isPartitionTable = table.isHivePartitionTable();
     updateDetails = readLoadMetadata();
     populateMap();
   }
@@ -93,7 +91,6 @@ public class SegmentUpdateStatusManager {
       segmentDetails = SegmentStatusManager.readLoadMetadata(
           CarbonTablePath.getMetadataPath(identifier.getTablePath()));
     }
-    isPartitionTable = table.isHivePartitionTable();
     if (segmentDetails.length != 0) {
       updateDetails = readLoadMetadata();
     } else {
@@ -249,27 +246,37 @@ public class SegmentUpdateStatusManager {
    * @throws Exception
    */
   public String[] getDeleteDeltaFilePath(String blockFilePath, String segmentId) throws Exception {
-    String blockId = CarbonUtil.getBlockId(identifier, blockFilePath, segmentId, true);
+    String segmentFile = null;
+    for (LoadMetadataDetails segmentDetail : segmentDetails) {
+      if (segmentDetail.getLoadName().equals(segmentId)) {
+        segmentFile = segmentDetail.getSegmentFile();
+        break;
+      }
+    }
+    String blockId =
+        CarbonUtil.getBlockId(identifier, blockFilePath, segmentId, true, segmentFile != null);
     String tupleId;
-    if (isPartitionTable) {
+    if (segmentFile != null) {
       tupleId = CarbonTablePath.getShortBlockIdForPartitionTable(blockId);
     } else {
       tupleId = CarbonTablePath.getShortBlockId(blockId);
     }
-    return getDeltaFiles(tupleId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
+    return getDeltaFiles(tupleId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT, segmentFile)
         .toArray(new String[0]);
   }
 
   /**
    * Returns all delta file paths of specified block
    */
-  private List<String> getDeltaFiles(String tupleId, String extension) throws Exception {
+  private List<String> getDeltaFiles(String tupleId, String extension, String segmentFile)
+      throws Exception {
     String segment = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID);
     String completeBlockName = CarbonTablePath.addDataPartPrefix(
         CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.BLOCK_ID)
             + CarbonCommonConstants.FACT_FILE_EXT);
+
     String blockPath;
-    if (isPartitionTable) {
+    if (segmentFile != null) {
       blockPath = identifier.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR
           + CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.PART_ID)
           .replace("#", "/") + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName;
@@ -283,7 +290,8 @@ public class SegmentUpdateStatusManager {
     if (!file.exists()) {
       throw new Exception("Invalid tuple id " + tupleId);
     }
-    String blockNameWithoutExtn = completeBlockName.substring(0, completeBlockName.indexOf('.'));
+    String blockNameWithoutExtn =
+        completeBlockName.substring(0, completeBlockName.lastIndexOf('.'));
     //blockName without timestamp
     final String blockNameFromTuple =
         blockNameWithoutExtn.substring(0, blockNameWithoutExtn.lastIndexOf("-"));
@@ -363,15 +371,15 @@ public class SegmentUpdateStatusManager {
         public boolean accept(CarbonFile pathName) {
           String fileName = pathName.getName();
           if (fileName.endsWith(extension) && pathName.getSize() > 0) {
-            String firstPart = fileName.substring(0, fileName.indexOf('.'));
+            String firstPart = fileName.substring(0, fileName.lastIndexOf('.'));
             String blockName =
-                    firstPart.substring(0, firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN));
+                firstPart.substring(0, firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN));
             long timestamp = Long.parseLong(firstPart
-                    .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
-                            firstPart.length()));
+                .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
+                    firstPart.length()));
             if (blockNameFromTuple.equals(blockName) && (
-                    (Long.compare(timestamp, deltaEndTimeStamp) <= 0) && (
-                            Long.compare(timestamp, deltaStartTimestamp) >= 0))) {
+                (Long.compare(timestamp, deltaEndTimeStamp) <= 0) && (
+                    Long.compare(timestamp, deltaStartTimestamp) >= 0))) {
               return true;
             }
           }
@@ -479,7 +487,7 @@ public class SegmentUpdateStatusManager {
 
       String fileName = eachFile.getName();
       if (fileName.endsWith(fileExtension)) {
-        String firstPart = fileName.substring(0, fileName.indexOf('.'));
+        String firstPart = fileName.substring(0, fileName.lastIndexOf('.'));
 
         long timestamp = Long.parseLong(firstPart
             .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 2aa4a05..836b193 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2973,24 +2973,28 @@ public final class CarbonUtil {
    * @return
    */
   public static String getBlockId(AbsoluteTableIdentifier identifier, String filePath,
-      String segmentId, boolean isTransactionalTable) {
+      String segmentId, boolean isTransactionalTable, boolean hasSegmentFile) {
     String blockId;
     String blockName = filePath.substring(filePath.lastIndexOf("/") + 1, filePath.length());
     String tablePath = identifier.getTablePath();
 
     if (filePath.startsWith(tablePath)) {
-      String factDir = CarbonTablePath.getFactDir(tablePath);
-      if (filePath.startsWith(factDir) || !isTransactionalTable) {
+      if (!isTransactionalTable || !hasSegmentFile) {
         blockId = "Part0" + CarbonCommonConstants.FILE_SEPARATOR + "Segment_" + segmentId
             + CarbonCommonConstants.FILE_SEPARATOR + blockName;
       } else {
         // This is the case with partition table.
-        String partitionDir =
-            filePath.substring(tablePath.length() + 1, filePath.length() - blockName.length() - 1);
-
+        String partitionDir;
+        if (tablePath.length() + 1 < filePath.length() - blockName.length() - 1) {
+          partitionDir =
+              filePath.substring(tablePath.length() + 1,
+                  filePath.length() - blockName.length() - 1);
+        } else {
+          partitionDir = "";
+        }
         // Replace / with # on partition director to support multi level partitioning. And access
         // them all as a single entity.
-        blockId = partitionDir.replace("/", "#") + CarbonCommonConstants.FILE_SEPARATOR + "Segment_"
+        blockId = partitionDir.replace("/", "#") + CarbonCommonConstants.FILE_SEPARATOR
             + segmentId + CarbonCommonConstants.FILE_SEPARATOR + blockName;
       }
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index e8a121c..fe68adf 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -228,7 +228,7 @@ public class CarbonTablePath {
   public static String getCarbonDataFilePath(String tablePath, String segmentId, Integer filePartNo,
       Long taskNo, int batchNo, int bucketNumber, String factUpdateTimeStamp) {
     return getSegmentPath(tablePath, segmentId) + File.separator + getCarbonDataFileName(
-        filePartNo, taskNo, bucketNumber, batchNo, factUpdateTimeStamp);
+        filePartNo, taskNo, bucketNumber, batchNo, factUpdateTimeStamp, segmentId);
   }
 
   /**
@@ -283,7 +283,7 @@ public class CarbonTablePath {
       default:
         String segmentDir = getSegmentPath(tablePath, segmentId);
         return segmentDir + File.separator + getCarbonIndexFileName(taskId,
-            Integer.parseInt(bucketNumber), timeStamp);
+            Integer.parseInt(bucketNumber), timeStamp, segmentId);
     }
   }
 
@@ -297,16 +297,17 @@ public class CarbonTablePath {
       default:
         String segmentDir = getSegmentPath(tablePath, segmentId);
         return segmentDir + File.separator + getCarbonIndexFileName(Long.parseLong(taskId),
-            Integer.parseInt(bucketNumber), batchNo, timeStamp);
+            Integer.parseInt(bucketNumber), batchNo, timeStamp, segmentId);
     }
   }
 
   private static String getCarbonIndexFileName(String taskNo, int bucketNumber,
-      String factUpdatedtimeStamp) {
+      String factUpdatedtimeStamp, String segmentNo) {
     if (bucketNumber == -1) {
-      return taskNo + "-" + factUpdatedtimeStamp + INDEX_FILE_EXT;
+      return taskNo + "-" + segmentNo + "-" + factUpdatedtimeStamp + INDEX_FILE_EXT;
     }
-    return taskNo + "-" + bucketNumber + "-" + factUpdatedtimeStamp + INDEX_FILE_EXT;
+    return taskNo + "-" + bucketNumber + "-" + segmentNo + "-" + factUpdatedtimeStamp
+        + INDEX_FILE_EXT;
   }
 
   /**
@@ -325,14 +326,15 @@ public class CarbonTablePath {
    * @return gets data file name only with out path
    */
   public static String getCarbonDataFileName(Integer filePartNo, Long taskNo, int bucketNumber,
-      int batchNo, String factUpdateTimeStamp) {
+      int batchNo, String factUpdateTimeStamp, String segmentNo) {
     return DATA_PART_PREFIX + filePartNo + "-" + taskNo + BATCH_PREFIX + batchNo + "-"
-        + bucketNumber + "-" + factUpdateTimeStamp + CARBON_DATA_EXT;
+        + bucketNumber + "-" + segmentNo + "-" + factUpdateTimeStamp + CARBON_DATA_EXT;
   }
 
   public static String getShardName(Long taskNo, int bucketNumber, int batchNo,
-      String factUpdateTimeStamp) {
-    return taskNo + BATCH_PREFIX + batchNo + "-" + bucketNumber + "-" + factUpdateTimeStamp;
+      String factUpdateTimeStamp, String segmentNo) {
+    return taskNo + BATCH_PREFIX + batchNo + "-" + bucketNumber + "-" + segmentNo + "-"
+        + factUpdateTimeStamp;
   }
 
   /**
@@ -343,13 +345,13 @@ public class CarbonTablePath {
    * @return filename
    */
   public static String getCarbonIndexFileName(long taskNo, int bucketNumber, int batchNo,
-      String factUpdatedTimeStamp) {
-    return taskNo + BATCH_PREFIX + batchNo + "-" + bucketNumber + "-" + factUpdatedTimeStamp
+      String factUpdatedTimeStamp, String segmentNo) {
+    return getShardName(taskNo, bucketNumber, batchNo, factUpdatedTimeStamp, segmentNo)
         + INDEX_FILE_EXT;
   }
 
   public static String getCarbonStreamIndexFileName() {
-    return getCarbonIndexFileName(0, 0, 0, "0");
+    return getCarbonIndexFileName(0, 0, 0, "0", "0");
   }
 
   public static String getCarbonStreamIndexFilePath(String segmentDir) {
@@ -408,11 +410,25 @@ public class CarbonTablePath {
   public static String getDataMapStorePathOnShardName(String tablePath, String segmentId,
       String dataMapName, String shardName) {
     return new StringBuilder()
-        .append(getSegmentPath(tablePath, segmentId))
+        .append(getDataMapStorePath(tablePath, segmentId, dataMapName))
+        .append(File.separator)
+        .append(shardName)
+        .toString();
+  }
+
+  /**
+   * Return store path for datamap based on the dataMapName,
+   *
+   * @return store path based on datamapname
+   */
+  public static String getDataMapStorePath(String tablePath, String segmentId,
+      String dataMapName) {
+    return new StringBuilder()
+        .append(tablePath)
         .append(File.separator)
         .append(dataMapName)
         .append(File.separator)
-        .append(shardName)
+        .append(segmentId)
         .toString();
   }
 
@@ -517,6 +533,29 @@ public class CarbonTablePath {
     }
 
     /**
+     * Return the updated timestamp information from given carbon data file name
+     */
+    public static String getSegmentNo(String carbonDataFileName) {
+      // Get the file name from path
+      String fileName = getFileName(carbonDataFileName);
+      // + 1 for size of "-"
+      int firstDashPos = fileName.indexOf("-");
+      int startIndex1 = fileName.indexOf("-", firstDashPos + 1) + 1;
+      int endIndex1 = fileName.indexOf("-", startIndex1);
+      int startIndex = fileName.indexOf("-", endIndex1 + 1);
+      if (startIndex > -1) {
+        startIndex += 1;
+        int endIndex = fileName.indexOf("-", startIndex);
+        if (endIndex == -1) {
+          return null;
+        }
+        return fileName.substring(startIndex, endIndex);
+      } else {
+        return null;
+      }
+    }
+
+    /**
      * Return the taskId part from taskNo(include taskId + batchNo)
      */
     public static long getTaskIdFromTaskNo(String taskNo) {
@@ -545,7 +584,7 @@ public class CarbonTablePath {
     /**
      * gets segement id from given absolute data file path
      */
-    public static String getSegmentId(String dataFileAbsolutePath) {
+    public static String getSegmentIdFromPath(String dataFileAbsolutePath) {
       // find segment id from last of data file path
       String tempdataFileAbsolutePath = dataFileAbsolutePath.replace(
           CarbonCommonConstants.WINDOWS_FILE_SEPARATOR, CarbonCommonConstants.FILE_SEPARATOR);
@@ -647,8 +686,7 @@ public class CarbonTablePath {
    * @return shortBlockId
    */
   public static String getShortBlockIdForPartitionTable(String blockId) {
-    return blockId.replace(SEGMENT_PREFIX, "")
-        .replace(DATA_PART_PREFIX, "")
+    return blockId.replace(DATA_PART_PREFIX, "")
         .replace(CARBON_DATA_EXT, "");
   }
 
@@ -687,7 +725,8 @@ public class CarbonTablePath {
    */
   public static String getShardName(String actualBlockName) {
     return DataFileUtil.getTaskNo(actualBlockName) + "-" + DataFileUtil.getBucketNo(actualBlockName)
-        + "-" + DataFileUtil.getTimeStampFromFileName(actualBlockName);
+        + "-" + DataFileUtil.getSegmentNo(actualBlockName) + "-" + DataFileUtil
+        .getTimeStampFromFileName(actualBlockName);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java
index ecdaf3d..e61ea6f 100644
--- a/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java
@@ -169,7 +169,7 @@ public class TableBlockInfoTest {
 
   @Test public void hashCodeTest() {
     int res = tableBlockInfo.hashCode();
-    int expectedResult = 1041505621;
+    int expectedResult = 1041506582;
     assertEquals(res, expectedResult);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java b/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
index 4293536..e52c737 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
@@ -34,26 +34,6 @@ public class CarbonFormatDirectoryStructureTest {
 
   private final String CARBON_STORE = "/opt/carbonstore";
 
-  /**
-   * test table path methods
-   */
-  @Test public void testTablePathStructure() throws IOException {
-    CarbonTableIdentifier tableIdentifier = new CarbonTableIdentifier("d1", "t1", UUID.randomUUID().toString());
-    AbsoluteTableIdentifier identifier =
-        AbsoluteTableIdentifier.from(CARBON_STORE + "/d1/t1", tableIdentifier);
-    assertTrue(identifier.getTablePath().replace("\\", "/").equals(CARBON_STORE + "/d1/t1"));
-    assertTrue(CarbonTablePath.getSchemaFilePath(identifier.getTablePath()).replace("\\", "/").equals(CARBON_STORE + "/d1/t1/Metadata/schema"));
-    assertTrue(CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()).replace("\\", "/")
-        .equals(CARBON_STORE + "/d1/t1/Metadata/tablestatus"));
-    assertTrue(CarbonTablePath.getDictionaryFilePath(identifier.getTablePath(), "t1_c1").replace("\\", "/")
-        .equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.dict"));
-    assertTrue(CarbonTablePath.getDictionaryMetaFilePath(identifier.getTablePath(), "t1_c1").replace("\\", "/")
-        .equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.dictmeta"));
-    assertTrue(CarbonTablePath.getSortIndexFilePath(identifier.getTablePath(),"t1_c1").replace("\\", "/")
-        .equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.sortindex"));
-    assertTrue(CarbonTablePath.getCarbonDataFilePath(identifier.getTablePath(), "2", 3, 4L,  0, 0, "999").replace("\\", "/")
-        .equals(CARBON_STORE + "/d1/t1/Fact/Part0/Segment_2/part-3-4_batchno0-0-999.carbondata"));
-  }
 
   /**
    * test data file name
@@ -67,5 +47,7 @@ public class CarbonFormatDirectoryStructureTest {
     assertTrue(CarbonTablePath.DataFileUtil.getTaskNo("/opt/apache-carbon/part-3-4-999.carbondata").equals("4"));
     assertTrue(
         CarbonTablePath.DataFileUtil.getTimeStampFromFileName("/opt/apache-carbon/part-3-4-999.carbondata").equals("999"));
+    assertTrue(CarbonTablePath.DataFileUtil.getSegmentNo("part-3-4-0-999.carbondata") == null);
+    assertTrue(CarbonTablePath.DataFileUtil.getSegmentNo("part-3-4-0-0-999.carbondata").equals("0"));
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
index 3231551..cda49b3 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
@@ -16,7 +16,6 @@
  */
 package org.apache.carbondata.datamap.bloom;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -259,8 +258,8 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
     if (dataMaps.size() > 0) {
       for (TableDataMap dataMap : dataMaps) {
         List<CarbonFile> indexFiles;
-        String dmPath = CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator
-            + dataMap.getDataMapSchema().getDataMapName();
+        String dmPath = CarbonTablePath
+            .getDataMapStorePath(tablePath, segmentId, dataMap.getDataMapSchema().getDataMapName());
         FileFactory.FileType fileType = FileFactory.getFileType(dmPath);
         final CarbonFile dirPath = FileFactory.getCarbonFile(dmPath, fileType);
         indexFiles = Arrays.asList(dirPath.listFiles(new CarbonFileFilter() {
@@ -323,9 +322,8 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
       List<Segment> validSegments = ssm.getValidAndInvalidSegments().getValidSegments();
       for (Segment segment : validSegments) {
         String segmentId = segment.getSegmentNo();
-        String datamapPath = CarbonTablePath.getSegmentPath(
-            getCarbonTable().getAbsoluteTableIdentifier().getTablePath(), segmentId)
-            + File.separator + dataMapName;
+        String datamapPath = CarbonTablePath
+            .getDataMapStorePath(getCarbonTable().getTablePath(), segmentId, dataMapName);
         if (FileFactory.isFileExist(datamapPath)) {
           CarbonFile file = FileFactory.getCarbonFile(datamapPath,
               FileFactory.getFileType(datamapPath));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
index 1da8edd..cc14dc4 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
@@ -17,7 +17,6 @@
 
 package org.apache.carbondata.datamap.lucene;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -182,9 +181,8 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> extends DataMapFactor
       List<Segment> validSegments = ssm.getValidAndInvalidSegments().getValidSegments();
       for (Segment segment : validSegments) {
         String segmentId = segment.getSegmentNo();
-        String datamapPath =
-            CarbonTablePath.getSegmentPath(tableIdentifier.getTablePath(), segmentId)
-                + File.separator + dataMapName;
+        String datamapPath = CarbonTablePath
+            .getDataMapStorePath(tableIdentifier.getTablePath(), segmentId, dataMapName);
         if (FileFactory.isFileExist(datamapPath)) {
           CarbonFile file =
               FileFactory.getCarbonFile(datamapPath, FileFactory.getFileType(datamapPath));
@@ -227,9 +225,9 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> extends DataMapFactor
         getAllIndexDirs(tableIdentifier.getTablePath(), segment.getSegmentNo());
     if (segment.getFilteredIndexShardNames().size() == 0) {
       for (CarbonFile indexDir : indexDirs) {
-        DataMapDistributable luceneDataMapDistributable = new LuceneDataMapDistributable(
-            CarbonTablePath.getSegmentPath(tableIdentifier.getTablePath(), segment.getSegmentNo()),
-            indexDir.getAbsolutePath());
+        DataMapDistributable luceneDataMapDistributable =
+            new LuceneDataMapDistributable(tableIdentifier.getTablePath(),
+                indexDir.getAbsolutePath());
         lstDataMapDistribute.add(luceneDataMapDistributable);
       }
       return lstDataMapDistribute;
@@ -303,9 +301,8 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> extends DataMapFactor
     if (dataMaps.size() > 0) {
       for (TableDataMap dataMap : dataMaps) {
         List<CarbonFile> indexFiles;
-        String dmPath =
-            CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator + dataMap
-                .getDataMapSchema().getDataMapName();
+        String dmPath = CarbonTablePath
+            .getDataMapStorePath(tablePath, segmentId, dataMap.getDataMapSchema().getDataMapName());
         FileFactory.FileType fileType = FileFactory.getFileType(dmPath);
         final CarbonFile dirPath = FileFactory.getCarbonFile(dmPath, fileType);
         indexFiles = Arrays.asList(dirPath.listFiles(new CarbonFileFilter() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index 02d272e..405ff53 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -54,7 +54,7 @@ public class CarbonInputSplit extends FileSplit
   private static final long serialVersionUID = 3520344046772190207L;
   public String taskId;
 
-  private String segmentId;
+  private Segment segment;
 
   private String bucketId;
 
@@ -91,7 +91,7 @@ public class CarbonInputSplit extends FileSplit
   private String dataMapWritePath;
 
   public CarbonInputSplit() {
-    segmentId = null;
+    segment = null;
     taskId = "0";
     bucketId = "0";
     blockletId = "0";
@@ -104,7 +104,7 @@ public class CarbonInputSplit extends FileSplit
       String[] locations, ColumnarFormatVersion version, String[] deleteDeltaFiles,
       String dataMapWritePath) {
     super(path, start, length, locations);
-    this.segmentId = segmentId;
+    this.segment = Segment.toSegment(segmentId);
     String taskNo = CarbonTablePath.DataFileUtil.getTaskNo(path.getName());
     if (taskNo.contains("_")) {
       taskNo = taskNo.split("_")[0];
@@ -128,7 +128,7 @@ public class CarbonInputSplit extends FileSplit
   public CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations,
       FileFormat fileFormat) {
     super(path, start, length, locations);
-    this.segmentId = segmentId;
+    this.segment = Segment.toSegment(segmentId);
     this.fileFormat = fileFormat;
     taskId = "0";
     bucketId = "0";
@@ -141,7 +141,7 @@ public class CarbonInputSplit extends FileSplit
   public CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations,
       String[] inMemoryHosts, FileFormat fileFormat) {
     super(path, start, length, locations, inMemoryHosts);
-    this.segmentId = segmentId;
+    this.segment = Segment.toSegment(segmentId);
     this.fileFormat = fileFormat;
     taskId = "0";
     bucketId = "0";
@@ -184,8 +184,8 @@ public class CarbonInputSplit extends FileSplit
       try {
         TableBlockInfo blockInfo =
             new TableBlockInfo(split.getPath().toString(), split.blockletId, split.getStart(),
-                split.getSegmentId(), split.getLocations(), split.getLength(), blockletInfos,
-                split.getVersion(), split.getDeleteDeltaFiles());
+                split.getSegment().toString(), split.getLocations(), split.getLength(),
+                blockletInfos, split.getVersion(), split.getDeleteDeltaFiles());
         blockInfo.setDetailInfo(split.getDetailInfo());
         blockInfo.setDataMapWriterPath(split.dataMapWritePath);
         blockInfo.setBlockOffset(split.getDetailInfo().getBlockFooterOffset());
@@ -203,7 +203,7 @@ public class CarbonInputSplit extends FileSplit
     try {
       TableBlockInfo blockInfo =
           new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.blockletId,
-              inputSplit.getStart(), inputSplit.getSegmentId(), inputSplit.getLocations(),
+              inputSplit.getStart(), inputSplit.getSegment().toString(), inputSplit.getLocations(),
               inputSplit.getLength(), blockletInfos, inputSplit.getVersion(),
               inputSplit.getDeleteDeltaFiles());
       blockInfo.setDetailInfo(inputSplit.getDetailInfo());
@@ -215,12 +215,21 @@ public class CarbonInputSplit extends FileSplit
   }
 
   public String getSegmentId() {
-    return segmentId;
+    if (segment != null) {
+      return segment.getSegmentNo();
+    } else {
+      return null;
+    }
   }
 
+  public Segment getSegment() {
+    return segment;
+  }
+
+
   @Override public void readFields(DataInput in) throws IOException {
     super.readFields(in);
-    this.segmentId = in.readUTF();
+    this.segment = Segment.toSegment(in.readUTF());
     this.version = ColumnarFormatVersion.valueOf(in.readShort());
     this.bucketId = in.readUTF();
     this.blockletId = in.readUTF();
@@ -247,7 +256,7 @@ public class CarbonInputSplit extends FileSplit
 
   @Override public void write(DataOutput out) throws IOException {
     super.write(out);
-    out.writeUTF(segmentId);
+    out.writeUTF(segment.toString());
     out.writeShort(version.number());
     out.writeUTF(bucketId);
     out.writeUTF(blockletId);
@@ -323,7 +332,7 @@ public class CarbonInputSplit extends FileSplit
     // get the segment id
     // converr seg ID to double.
 
-    double seg1 = Double.parseDouble(segmentId);
+    double seg1 = Double.parseDouble(segment.getSegmentNo());
     double seg2 = Double.parseDouble(other.getSegmentId());
     if (seg1 - seg2 < 0) {
       return -1;
@@ -381,7 +390,7 @@ public class CarbonInputSplit extends FileSplit
 
   @Override public int hashCode() {
     int result = taskId.hashCode();
-    result = 31 * result + segmentId.hashCode();
+    result = 31 * result + segment.hashCode();
     result = 31 * result + bucketId.hashCode();
     result = 31 * result + invalidSegments.hashCode();
     result = 31 * result + numberOfBlocklets;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 485b087..3688026 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -488,7 +488,7 @@ m filterExpression
       segment.getFilteredIndexShardNames().clear();
       // Check the segment exist in any of the pruned blocklets.
       for (ExtendedBlocklet blocklet : prunedBlocklets) {
-        if (blocklet.getSegmentId().equals(segment.getSegmentNo())) {
+        if (blocklet.getSegmentId().equals(segment.toString())) {
           found = true;
           // Set the pruned index file to the segment for further pruning.
           String shardName = CarbonTablePath.getShardName(blocklet.getFilePath());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 4feb044..b549b16 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -420,6 +420,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     List<Segment> invalidSegments = new ArrayList<>();
     List<UpdateVO> invalidTimestampsList = new ArrayList<>();
 
+
     try {
       carbonTable = getOrCreateCarbonTable(job.getConfiguration());
       ReadCommittedScope readCommittedScope =
@@ -427,7 +428,9 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
       this.readCommittedScope = readCommittedScope;
 
       List<Segment> segmentList = new ArrayList<>();
-      segmentList.add(new Segment(targetSegment, null, readCommittedScope));
+      Segment segment = Segment.getSegment(targetSegment, carbonTable.getTablePath());
+      segmentList.add(
+          new Segment(segment.getSegmentNo(), segment.getSegmentFileName(), readCommittedScope));
       setSegmentsToAccess(job.getConfiguration(), segmentList);
 
       // process and resolve the expression
@@ -599,7 +602,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
 
       long rowCount = blocklet.getDetailInfo().getRowCount();
 
-      String key = CarbonUpdateUtil.getSegmentBlockNameKey(blocklet.getSegmentId(), blockName);
+      String segmentId = Segment.toSegment(blocklet.getSegmentId()).getSegmentNo();
+      String key = CarbonUpdateUtil.getSegmentBlockNameKey(segmentId, blockName);
 
       // if block is invalid then dont add the count
       SegmentUpdateDetails details = updateStatusManager.getDetailsForABlock(key);
@@ -608,11 +612,11 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
         Long blockCount = blockRowCountMapping.get(key);
         if (blockCount == null) {
           blockCount = 0L;
-          Long count = segmentAndBlockCountMapping.get(blocklet.getSegmentId());
+          Long count = segmentAndBlockCountMapping.get(segmentId);
           if (count == null) {
             count = 0L;
           }
-          segmentAndBlockCountMapping.put(blocklet.getSegmentId(), count + 1);
+          segmentAndBlockCountMapping.put(segmentId, count + 1);
         }
         blockCount += rowCount;
         blockRowCountMapping.put(key, blockCount);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingIUDTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingIUDTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingIUDTestCase.scala
index 79458f5..f4d7034 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingIUDTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingIUDTestCase.scala
@@ -3301,7 +3301,12 @@ test("IUD-01-01-02_023-67", Include) {
 //Delete the uniqdata table 
 test("IUD-01-01-02_023-68", Include) {
    sql(s"""use default""").collect
- sql(s"""delete from table uniqdata where segment.id IN(0)""").collect
+ try {
+   sql(s"""delete from table uniqdata where segment.id IN(0)""").collect
+ } catch {
+   case e: Exception =>
+     // ignore as data is already deleted in segment 0
+ }
   checkAnswer(s"""select DOJ from uniqdata where CUST_ID=9001""",
     Seq(Row(Timestamp.valueOf("2012-01-12 03:14:05.0"))), "DataLoadingIUDTestCase_IUD-01-01-02_023-68")
   

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
index b027ce2..99a537a 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
@@ -18,12 +18,16 @@
 
 package org.apache.carbondata.cluster.sdv.generated
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql.common.util._
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
 import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
 import org.apache.carbondata.core.util.path.CarbonTablePath
 
@@ -93,35 +97,36 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll {
     val table = CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_nonmerge")
     new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0.1") == 0)
-    assert(getMergedIndexFileCount("default", "carbon_automation_nonmerge", "0.1") == 1)
+    assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0.1", true) >= 1)
     checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""), rows)
   }
 
-  private def getIndexFileCount(dbName: String, tableName: String, segment: String): Int = {
-    getFileCount(dbName, tableName, segment, CarbonTablePath.INDEX_FILE_EXT)
-  }
-
-  private def getMergedIndexFileCount(dbName: String, tableName: String, segment: String): Int = {
-    getFileCount(dbName, tableName, segment, CarbonTablePath.MERGE_INDEX_FILE_EXT)
-  }
-
-  private def getFileCount(dbName: String,
-      tableName: String,
-      segment: String,
-      suffix: String): Int = {
+  private def getIndexFileCount(dbName: String, tableName: String, segmentNo: String, mergeIndexCount: Boolean = false): Int = {
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName, tableName)
-    val identifier = carbonTable.getAbsoluteTableIdentifier
-    val path = CarbonTablePath
-      .getSegmentPath(identifier.getTablePath, segment)
-    val carbonFiles = FileFactory.getCarbonFile(path).listFiles(new CarbonFileFilter {
-      override def accept(file: CarbonFile): Boolean = {
-        file.getName.endsWith(suffix)
-      }
-    })
-    if (carbonFiles != null) {
-      carbonFiles.length
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNo)
+    if (FileFactory.isFileExist(segmentDir)) {
+      val map = new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir)
+      map.asScala.map { f =>
+        if (f._2 == null) {
+          1
+        } else {
+          if (mergeIndexCount) 1 else 0
+        }
+      }.sum
     } else {
-      0
+      val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath)
+      if (segment != null) {
+        val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
+        store.getSegmentFile.getLocationMap.values().asScala.map { f =>
+          if (f.getMergeFileName == null) {
+            f.getFiles.size()
+          } else {
+            if (mergeIndexCount) 1 else 0
+          }
+        }.sum
+      } else {
+        0
+      }
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
index f64a349..6530ec0 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
@@ -541,37 +541,51 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test lucene fine grain data map with text-match limit") {
-
+    sql("DROP TABLE IF EXISTS datamap_test_limit")
+    sql(
+      """
+        | CREATE TABLE datamap_test_limit(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
     sql(
       s"""
-         | CREATE DATAMAP dm ON TABLE datamap_test
+         | CREATE DATAMAP dm ON TABLE datamap_test_limit
          | USING 'lucene'
          | DMProperties('INDEX_COLUMNS'='name , city')
       """.stripMargin)
 
-    checkAnswer(sql("select count(*) from datamap_test where TEXT_MATCH_WITH_LIMIT('name:n10*',10)"),Seq(Row(10)))
-    checkAnswer(sql("select count(*) from datamap_test where TEXT_MATCH_WITH_LIMIT('name:n10*',50)"),Seq(Row(50)))
-    sql("drop datamap dm on table datamap_test")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_limit OPTIONS('header'='false')")
+    checkAnswer(sql("select count(*) from datamap_test_limit where TEXT_MATCH_WITH_LIMIT('name:n10*',10)"),Seq(Row(10)))
+    checkAnswer(sql("select count(*) from datamap_test_limit where TEXT_MATCH_WITH_LIMIT('name:n10*',50)"),Seq(Row(50)))
+    sql("drop datamap dm on table datamap_test_limit")
   }
 
   test("test lucene fine grain data map with InsertOverwrite") {
+    sql("DROP TABLE IF EXISTS datamap_test_overwrite")
+    sql(
+      """
+        | CREATE TABLE datamap_test_overwrite(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
     sql(
       s"""
-         | CREATE DATAMAP dm ON TABLE datamap_test
+         | CREATE DATAMAP dm ON TABLE datamap_test_overwrite
          | USING 'lucene'
          | DMProperties('INDEX_COLUMNS'='name , city')
       """.stripMargin)
 
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_overwrite OPTIONS('header'='false')")
     sql(
       """
         | CREATE TABLE table1(id INT, name STRING, city STRING, age INT)
         | STORED BY 'carbondata'
         | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
       """.stripMargin)
-    sql("INSERT OVERWRITE TABLE table1 select * from datamap_test where TEXT_MATCH('name:n*')")
+    sql("INSERT OVERWRITE TABLE table1 select *from datamap_test_overwrite where TEXT_MATCH('name:n*')")
     checkAnswer(sql("select count(*) from table1"),Seq(Row(10000)))
-    sql("drop datamap dm on table datamap_test")
-    sql("drop table table1")
+    sql("drop datamap dm on table datamap_test_overwrite")
   }
 
   test("explain query with lucene datamap") {
@@ -715,7 +729,7 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
       sql(s"SELECT * FROM datamap_test5 WHERE city='c020'"))
     sql("DROP TABLE IF EXISTS datamap_test5")
   }
-  
+
   test("test text_match on normal table") {
     sql("DROP TABLE IF EXISTS table1")
     sql(
@@ -731,7 +745,7 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
     assert(msg.getCause.getMessage.contains("TEXT_MATCH is not supported on table"))
     sql("DROP TABLE table1")
   }
-  
+
   test("test lucene with flush_cache as true") {
     sql("DROP TABLE IF EXISTS datamap_test_table")
     sql(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index 43b215e..688928f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -26,10 +26,11 @@ import org.scalatest.BeforeAndAfterEach
 
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
 import org.apache.spark.sql.test.util.QueryTest
 
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.util.CarbonProperties
 
 class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
@@ -53,10 +54,8 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
     val fileType: FileFactory.FileType = FileFactory.getFileType(partitionPath)
     val carbonFile = FileFactory.getCarbonFile(partitionPath, fileType)
     val segments: ArrayBuffer[String] = ArrayBuffer()
-    carbonFile.listFiles.foreach { file =>
-      segments += CarbonTablePath.DataFileUtil.getSegmentId(file.getAbsolutePath + "/dummy")
-    }
-    segments.contains(segmentId)
+    val segment = Segment.getSegment(segmentId, carbonTable.getAbsoluteTableIdentifier.getTablePath)
+    segment != null
   }
 
   test("test data loading CSV file") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
index 4860b32..8487b9e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
@@ -16,17 +16,13 @@
  */
 package org.apache.carbondata.spark.testsuite.allqueries
 
-import java.io.File
-
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.CarbonMetadata
-import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
 
 class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
   var timeStampPropOrig: String = _
@@ -227,11 +223,7 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql("insert overwrite table CarbonOverwrite select * from THive")
     sql("insert overwrite table HiveOverwrite select * from THive")
     checkAnswer(sql("select count(*) from CarbonOverwrite"), sql("select count(*) from HiveOverwrite"))
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "carbonoverwrite")
-    val partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getAbsoluteTableIdentifier.getTablePath)
-    val folder = new File(partitionPath)
-    assert(folder.isDirectory)
-    assert(folder.list().length == 1)
+    assert(checkSegment("CarbonOverwrite"))
   }
 
   test("Load overwrite") {
@@ -249,12 +241,7 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql("LOAD DATA INPATH '" + resourcesPath + "/100_olap.csv' overwrite INTO table TCarbonSourceOverwrite options ('DELIMITER'=',', 'QUOTECHAR'='\', 'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVe
 rsion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber')")
     sql(s"LOAD DATA local INPATH '$resourcesPath/100_olap.csv' overwrite INTO TABLE HiveOverwrite")
     checkAnswer(sql("select count(*) from TCarbonSourceOverwrite"), sql("select count(*) from HiveOverwrite"))
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "tcarbonsourceoverwrite")
-    val partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getAbsoluteTableIdentifier.getTablePath)
-    val folder = new File(partitionPath)
-
-    assert(folder.isDirectory)
-    assert(folder.list().length == 1)
+    assert(checkSegment("TCarbonSourceOverwrite"))
   }
 
   test("Load overwrite fail handle") {
@@ -379,15 +366,9 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
 
 
   private def checkSegment(tableName: String) : Boolean ={
-    val storePath_t1 = s"$storeLocation/${tableName.toLowerCase()}/Fact/Part0"
-    val carbonFile_t1: CarbonFile = FileFactory
-      .getCarbonFile(storePath_t1, FileFactory.getFileType(storePath_t1))
-    var exists: Boolean = carbonFile_t1.exists()
-    if (exists) {
-      val listFiles: Array[CarbonFile] = carbonFile_t1.listFiles()
-      exists = listFiles.size > 0
-    }
-    exists
+    val storePath_t1 = s"$storeLocation/${tableName.toLowerCase()}"
+    val detailses = SegmentStatusManager.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(storePath_t1))
+    detailses.map(_.getSegmentStatus == SegmentStatus.SUCCESS).exists(f => f)
   }
 
   test("test show segments after clean files for insert overwrite") {