You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/06/16 02:56:26 UTC

[3/3] carbondata git commit: [CARBONDATA-2428] Support flat folder for managed carbon table

[CARBONDATA-2428] Support flat folder for managed carbon table

Currently carbondata writing happens in fixed path tablepath/Fact/Part0/Segment_NUM folder and it is not same as hive/parquet folder structure. This PR makes all files written will be inside tablepath, it does not maintain any segment folder structure. Only for partition it adds the folder.

This feature can be controlled through a table property flat_folder. The default value of it is false.

This closes #2207


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/60dfdd38
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/60dfdd38
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/60dfdd38

Branch: refs/heads/master
Commit: 60dfdd3857d037231844d9fb95967dcdb0071f40
Parents: 181f0ac
Author: ravipesala <ra...@gmail.com>
Authored: Fri Apr 20 07:34:00 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Sat Jun 16 10:56:10 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   8 ++
 .../apache/carbondata/core/datamap/Segment.java |  20 +++
 .../carbondata/core/datamap/TableDataMap.java   |   4 +-
 .../core/datamap/dev/DataMapWriter.java         |   3 +-
 .../core/datastore/block/TableBlockInfo.java    |  27 ++--
 .../blockletindex/BlockletDataMapFactory.java   |   4 +-
 .../core/metadata/SegmentFileStore.java         |  91 ++++++++++----
 .../core/metadata/schema/table/CarbonTable.java |  13 ++
 .../core/mutate/CarbonUpdateUtil.java           |  71 ++++++-----
 .../executor/impl/AbstractQueryExecutor.java    |  18 ++-
 .../core/scan/result/BlockletScannedResult.java |   3 +-
 .../scan/scanner/impl/BlockletFullScanner.java  |   6 +-
 .../SegmentUpdateStatusManager.java             |  40 +++---
 .../apache/carbondata/core/util/CarbonUtil.java |  18 +--
 .../core/util/path/CarbonTablePath.java         |  77 +++++++++---
 .../datastore/block/TableBlockInfoTest.java     |   2 +-
 .../CarbonFormatDirectoryStructureTest.java     |  22 +---
 .../bloom/BloomCoarseGrainDataMapFactory.java   |  10 +-
 .../lucene/LuceneDataMapFactoryBase.java        |  17 ++-
 .../carbondata/hadoop/CarbonInputSplit.java     |  35 ++++--
 .../hadoop/api/CarbonInputFormat.java           |   2 +-
 .../hadoop/api/CarbonTableInputFormat.java      |  12 +-
 .../sdv/generated/DataLoadingIUDTestCase.scala  |   7 +-
 .../sdv/generated/MergeIndexTestCase.scala      |  55 ++++----
 .../lucene/LuceneFineGrainDataMapSuite.scala    |  36 ++++--
 .../dataload/TestLoadDataGeneral.scala          |   9 +-
 .../InsertIntoCarbonTableTestCase.scala         |  33 ++---
 .../CarbonIndexFileMergeTestCase.scala          |  53 +++++---
 ...ompactionSupportGlobalSortFunctionTest.scala |  15 ++-
 ...mpactionSupportGlobalSortParameterTest.scala |  15 ++-
 .../dataload/TestBatchSortDataLoad.scala        |  18 +--
 .../dataload/TestDataLoadWithFileName.scala     |  26 ++--
 .../dataload/TestGlobalSortDataLoad.scala       |  23 +++-
 .../testsuite/datamap/CGDataMapTestCase.scala   |  10 +-
 .../testsuite/datamap/FGDataMapTestCase.scala   |   6 +-
 .../testsuite/datamap/TestDataMapCommand.scala  |  27 ++--
 .../FlatFolderTableLoadingTestCase.scala        | 125 +++++++++++++++++++
 .../iud/UpdateCarbonTableTestCase.scala         |  10 +-
 .../TestDataLoadingForPartitionTable.scala      |  27 ++--
 .../load/DataLoadProcessorStepOnSpark.scala     |   2 +-
 .../spark/rdd/CarbonIUDMergerRDD.scala          |   4 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   7 +-
 .../carbondata/spark/util/CommonUtil.scala      |  17 +++
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   2 +
 .../org/apache/spark/util/PartitionUtils.scala  |  73 +++++++++--
 .../datamap/IndexDataMapRebuildRDD.scala        |   4 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  89 ++++++++++---
 .../spark/rdd/CarbonTableCompactor.scala        |   4 +-
 .../org/apache/spark/sql/CarbonSession.scala    |   2 +-
 .../command/mutation/DeleteExecution.scala      |  23 ++--
 .../preaaggregate/PreAggregateTableHelper.scala |   3 +
 .../table/CarbonDescribeFormattedCommand.scala  |   5 +
 .../partition/TestAlterPartitionTable.scala     |  35 ++++--
 .../CarbonGetTableDetailComandTestCase.scala    |   4 +-
 .../datamap/DataMapWriterListener.java          |   1 +
 .../loading/AbstractDataLoadProcessorStep.java  |   3 +-
 .../loading/TableProcessingOperations.java      |   6 +-
 .../loading/model/CarbonLoadModel.java          |  21 +++-
 .../merger/AbstractResultProcessor.java         |   6 +-
 .../merger/CompactionResultSortProcessor.java   |   2 +-
 .../merger/RowResultMergerProcessor.java        |   4 +-
 .../partition/spliter/RowResultProcessor.java   |   1 +
 .../store/CarbonFactDataHandlerModel.java       |   6 +-
 .../store/writer/AbstractFactDataWriter.java    |   6 +-
 .../processing/util/CarbonLoaderUtil.java       |   4 +-
 .../store/worker/SearchRequestHandler.java      |  16 ++-
 .../streaming/CarbonStreamRecordWriter.java     |   2 +-
 67 files changed, 961 insertions(+), 389 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 19ff494..2fcf0f5 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -718,6 +718,11 @@ public final class CarbonCommonConstants {
   public static final String DEFAULT_ENABLE_AUTO_LOAD_MERGE = "false";
 
   /**
+   * DEFAULT_FLAT_FOLDER
+   */
+  public static final String DEFAULT_FLAT_FOLDER = "false";
+
+  /**
    * ZOOKEEPER_ENABLE_LOCK if this is set to true then zookeeper will be used to handle locking
    * mechanism of carbon
    */
@@ -929,6 +934,9 @@ public final class CarbonCommonConstants {
   public static final String TABLE_COMPACTION_PRESERVE_SEGMENTS = "compaction_preserve_segments";
   // table property name of allowed compaction days while compaction
   public static final String TABLE_ALLOWED_COMPACTION_DAYS = "allowed_compaction_days";
+  // Flat folder support on table. when it is true all carbondata files store directly under table
+  // path instead of sub folders.
+  public static final String FLAT_FOLDER = "flat_folder";
 
   /**
    * 16 mb size

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index 7b63b84..425cdf6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -162,6 +162,16 @@ public class Segment implements Serializable {
   }
 
   /**
+   * Converts to segment object
+   * @param segmentId
+   * @return
+   */
+  public static Segment toSegment(String segmentId) {
+    // SegmentId can be combination of segmentNo and segmentFileName.
+    return toSegment(segmentId, null);
+  }
+
+  /**
    * Read the table status and get the segment corresponding to segmentNo
    * @param segmentNo
    * @param tablePath
@@ -170,6 +180,16 @@ public class Segment implements Serializable {
   public static Segment getSegment(String segmentNo, String tablePath) {
     LoadMetadataDetails[] loadMetadataDetails =
         SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(tablePath));
+    return getSegment(segmentNo, loadMetadataDetails);
+  }
+
+  /**
+   * Get the segment object corresponding to segmentNo
+   * @param segmentNo
+   * @param loadMetadataDetails
+   * @return
+   */
+  public static Segment getSegment(String segmentNo, LoadMetadataDetails[] loadMetadataDetails) {
     for (LoadMetadataDetails details: loadMetadataDetails) {
       if (details.getLoadName().equals(segmentNo)) {
         return new Segment(details.getLoadName(), details.getSegmentFile());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 4ce0f6c..cb7ec03 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -103,7 +103,7 @@ public final class TableDataMap extends OperationEventListener {
       }
       blocklets.addAll(addSegmentId(
           blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment),
-          segment.getSegmentNo()));
+          segment.toString()));
     }
     return blocklets;
   }
@@ -182,7 +182,7 @@ public final class TableDataMap extends OperationEventListener {
         detailedBlocklet.setDataMapWriterPath(blockletwritePath);
         serializer.serializeBlocklet((FineGrainBlocklet) blocklet, blockletwritePath);
       }
-      detailedBlocklet.setSegmentId(distributable.getSegment().getSegmentNo());
+      detailedBlocklet.setSegmentId(distributable.getSegment().toString());
       detailedBlocklets.add(detailedBlocklet);
     }
     return detailedBlocklets;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
index 89d5d76..8c8d2d8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
@@ -16,7 +16,6 @@
  */
 package org.apache.carbondata.core.datamap.dev;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.List;
 
@@ -133,7 +132,7 @@ public abstract class DataMapWriter {
    */
   public static String getDefaultDataMapPath(
       String tablePath, String segmentId, String dataMapName) {
-    return CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator + dataMapName;
+    return CarbonTablePath.getDataMapStorePath(tablePath, segmentId, dataMapName);
   }
 
   public boolean isWritingFinished() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index c0cebe0..34d406b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
@@ -64,7 +65,7 @@ public class TableBlockInfo implements Distributable, Serializable {
   /**
    * id of the segment this will be used to sort the blocks
    */
-  private String segmentId;
+  private Segment segment;
 
   /**
    * id of the Blocklet.
@@ -120,7 +121,7 @@ public class TableBlockInfo implements Distributable, Serializable {
     this.filePath = FileFactory.getUpdatedFilePath(filePath);
     this.blockletId = "0";
     this.blockOffset = blockOffset;
-    this.segmentId = segmentId;
+    this.segment = Segment.toSegment(segmentId);
     this.locations = locations;
     this.blockLength = blockLength;
     this.version = version;
@@ -196,7 +197,7 @@ public class TableBlockInfo implements Distributable, Serializable {
     info.filePath = filePath;
     info.blockOffset = blockOffset;
     info.blockLength = blockLength;
-    info.segmentId = segmentId;
+    info.segment = segment;
     info.blockletId = blockletId;
     info.locations = locations;
     info.version = version;
@@ -229,7 +230,15 @@ public class TableBlockInfo implements Distributable, Serializable {
    * @return the segmentId
    */
   public String getSegmentId() {
-    return segmentId;
+    if (segment == null) {
+      return null;
+    } else {
+      return segment.getSegmentNo();
+    }
+  }
+
+  public Segment getSegment() {
+    return segment;
   }
 
   /**
@@ -264,7 +273,7 @@ public class TableBlockInfo implements Distributable, Serializable {
       return false;
     }
     TableBlockInfo other = (TableBlockInfo) obj;
-    if (!segmentId.equals(other.segmentId)) {
+    if (!segment.equals(other.segment)) {
       return false;
     }
     if (blockOffset != other.blockOffset) {
@@ -300,8 +309,8 @@ public class TableBlockInfo implements Distributable, Serializable {
     // get the segment id
     // converr seg ID to double.
 
-    double seg1 = Double.parseDouble(segmentId);
-    double seg2 = Double.parseDouble(((TableBlockInfo) other).segmentId);
+    double seg1 = Double.parseDouble(segment.getSegmentNo());
+    double seg2 = Double.parseDouble(((TableBlockInfo) other).segment.getSegmentNo());
     if (seg1 - seg2 < 0) {
       return -1;
     }
@@ -358,7 +367,7 @@ public class TableBlockInfo implements Distributable, Serializable {
     int result = filePath.hashCode();
     result = 31 * result + (int) (blockOffset ^ (blockOffset >>> 32));
     result = 31 * result + (int) (blockLength ^ (blockLength >>> 32));
-    result = 31 * result + segmentId.hashCode();
+    result = 31 * result + segment.hashCode();
     result = 31 * result + blockletInfos.getStartBlockletNumber();
     return result;
   }
@@ -457,7 +466,7 @@ public class TableBlockInfo implements Distributable, Serializable {
     sb.append("filePath='").append(filePath).append('\'');
     sb.append(", blockOffset=").append(blockOffset);
     sb.append(", blockLength=").append(blockLength);
-    sb.append(", segmentId='").append(segmentId).append('\'');
+    sb.append(", segment='").append(segment.toString()).append('\'');
     sb.append(", blockletId='").append(blockletId).append('\'');
     sb.append(", locations=").append(Arrays.toString(locations));
     sb.append('}');

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index c434e2e..65fcb4b 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -122,7 +122,9 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory
     if (tableBlockIndexUniqueIdentifiers == null) {
       tableBlockIndexUniqueIdentifiers =
           BlockletDataMapUtil.getTableBlockUniqueIdentifiers(segment);
-      segmentMap.put(segment.getSegmentNo(), tableBlockIndexUniqueIdentifiers);
+      if (tableBlockIndexUniqueIdentifiers.size() > 0) {
+        segmentMap.put(segment.getSegmentNo(), tableBlockIndexUniqueIdentifiers);
+      }
     }
     return tableBlockIndexUniqueIdentifiers;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index acfc145..0b1c1e3 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -140,13 +140,16 @@ public class SegmentFileStore {
 
   /**
    * Write segment file to the metadata folder of the table
-   * @param tablePath table path
+   *
+   * @param carbonTable CarbonTable
    * @param segmentId segment id
-   * @param UUID a UUID string used to construct the segment file name
+   * @param UUID      a UUID string used to construct the segment file name
    * @return segment file name
    */
-  public static String writeSegmentFile(String tablePath, String segmentId, String UUID)
+  public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String UUID)
       throws IOException {
+    String tablePath = carbonTable.getTablePath();
+    boolean supportFlatFolder = carbonTable.isSupportFlatFolder();
     String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
     CarbonFile segmentFolder = FileFactory.getCarbonFile(segmentPath);
     CarbonFile[] indexFiles = segmentFolder.listFiles(new CarbonFileFilter() {
@@ -167,9 +170,12 @@ public class SegmentFileStore {
           folderDetails.getFiles().add(file.getName());
         }
       }
-      String segmentRelativePath = segmentPath.substring(tablePath.length(), segmentPath.length());
+      String segmentRelativePath = "/";
+      if (!supportFlatFolder) {
+        segmentRelativePath = segmentPath.substring(tablePath.length(), segmentPath.length());
+      }
       segmentFile.addPath(segmentRelativePath, folderDetails);
-      String segmentFileFolder =  CarbonTablePath.getSegmentFilesLocation(tablePath);
+      String segmentFileFolder = CarbonTablePath.getSegmentFilesLocation(tablePath);
       CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFileFolder);
       if (!carbonFile.exists()) {
         carbonFile.mkdirs(segmentFileFolder, FileFactory.getFileType(segmentFileFolder));
@@ -177,12 +183,31 @@ public class SegmentFileStore {
       String segmentFileName = genSegmentFileName(segmentId, UUID) + CarbonTablePath.SEGMENT_EXT;
       // write segment info to new file.
       writeSegmentFile(segmentFile, segmentFileFolder + File.separator + segmentFileName);
+
+      // Move all files to table path from segment folder.
+      if (supportFlatFolder) {
+        moveFromTempFolder(segmentPath, tablePath);
+      }
+
       return segmentFileName;
     }
     return null;
   }
 
   /**
+   * Move the loaded data from source folder to destination folder.
+   */
+  private static void moveFromTempFolder(String source, String dest) {
+
+    CarbonFile oldFolder = FileFactory.getCarbonFile(source);
+    CarbonFile[] oldFiles = oldFolder.listFiles();
+    for (CarbonFile file : oldFiles) {
+      file.renameForce(dest + CarbonCommonConstants.FILE_SEPARATOR + file.getName());
+    }
+    oldFolder.delete();
+  }
+
+  /**
    * Writes the segment file in json format
    * @param segmentFile
    * @param path
@@ -218,26 +243,37 @@ public class SegmentFileStore {
       throws IOException {
     CarbonFile[] segmentFiles = getSegmentFiles(readPath);
     if (segmentFiles != null && segmentFiles.length > 0) {
-      SegmentFile segmentFile = null;
-      for (CarbonFile file : segmentFiles) {
-        SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath());
-        if (segmentFile == null && localSegmentFile != null) {
-          segmentFile = localSegmentFile;
-        }
-        if (localSegmentFile != null) {
-          segmentFile = segmentFile.merge(localSegmentFile);
-        }
-      }
-      if (segmentFile != null) {
-        String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT;
-        writeSegmentFile(segmentFile, path);
-        FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath));
-      }
+      SegmentFile segmentFile = mergeSegmentFiles(mergeFileName, writePath, segmentFiles);
+      FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath));
       return segmentFile;
     }
     return null;
   }
 
+  public static SegmentFile mergeSegmentFiles(String mergeFileName, String writePath,
+      CarbonFile[] segmentFiles) throws IOException {
+    SegmentFile segmentFile = null;
+    for (CarbonFile file : segmentFiles) {
+      SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath());
+      if (segmentFile == null && localSegmentFile != null) {
+        segmentFile = localSegmentFile;
+      }
+      if (localSegmentFile != null) {
+        segmentFile = segmentFile.merge(localSegmentFile);
+      }
+    }
+    if (segmentFile != null) {
+      String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT;
+      writeSegmentFile(segmentFile, path);
+    }
+    return segmentFile;
+  }
+
+  public static String getSegmentFilePath(String tablePath, String segmentFileName) {
+    return CarbonTablePath.getSegmentFilesLocation(tablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + segmentFileName;
+  }
+
   /**
    * This API will update the segmentFile of a passed segment.
    *
@@ -248,6 +284,9 @@ public class SegmentFileStore {
       throws IOException {
     boolean status = false;
     String tableStatusPath = CarbonTablePath.getTableStatusFilePath(tablePath);
+    if (!FileFactory.isFileExist(tableStatusPath)) {
+      return status;
+    }
     String metadataPath = CarbonTablePath.getMetadataPath(tablePath);
     AbsoluteTableIdentifier absoluteTableIdentifier =
         AbsoluteTableIdentifier.from(tablePath, null, null);
@@ -654,7 +693,10 @@ public class SegmentFileStore {
             new SegmentFileStore(table.getTablePath(), segment.getSegmentFile());
         fileStore.readIndexFiles(SegmentStatus.MARKED_FOR_DELETE, false);
         if (forceDelete) {
-          deletePhysicalPartition(partitionSpecs, fileStore.getIndexFilesMap());
+          deletePhysicalPartition(
+              partitionSpecs,
+              fileStore.getIndexFilesMap(),
+              table.getTablePath());
         }
         for (Map.Entry<String, List<String>> entry : fileStore.indexFilesMap.entrySet()) {
           String indexFile = entry.getKey();
@@ -698,7 +740,7 @@ public class SegmentFileStore {
         FileFactory.deleteFile(file, FileFactory.getFileType(file));
       }
     }
-    deletePhysicalPartition(partitionSpecs, indexFilesMap);
+    deletePhysicalPartition(partitionSpecs, indexFilesMap, tablePath);
     String segmentFilePath =
         CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
             + segmentFile;
@@ -713,7 +755,7 @@ public class SegmentFileStore {
    * If partition specs are null, then directly delete parent directory in locationMap.
    */
   private static void deletePhysicalPartition(List<PartitionSpec> partitionSpecs,
-      Map<String, List<String>> locationMap) throws IOException {
+      Map<String, List<String>> locationMap, String tablePath) throws IOException {
     for (Map.Entry<String, List<String>> entry : locationMap.entrySet()) {
       if (partitionSpecs != null) {
         Path location = new Path(entry.getKey());
@@ -733,7 +775,8 @@ public class SegmentFileStore {
         Path location = new Path(entry.getKey()).getParent();
         // delete the segment folder
         CarbonFile segmentPath = FileFactory.getCarbonFile(location.toString());
-        if (null != segmentPath) {
+        if (null != segmentPath && segmentPath.exists() &&
+            !new Path(tablePath).equals(new Path(segmentPath.getAbsolutePath()))) {
           FileFactory.deleteAllCarbonFilesOfDir(segmentPath);
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 20bc7a1..f48ada0 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -1008,6 +1008,19 @@ public class CarbonTable implements Serializable {
   }
 
   /**
+   * Whether this table supports flat folder structure, it means all data files directly written
+   * under table path
+   */
+  public boolean isSupportFlatFolder() {
+    boolean supportFlatFolder = Boolean.parseBoolean(CarbonCommonConstants.DEFAULT_FLAT_FOLDER);
+    Map<String, String> tblProps = getTableInfo().getFactTable().getTableProperties();
+    if (tblProps.containsKey(CarbonCommonConstants.FLAT_FOLDER)) {
+      supportFlatFolder = tblProps.get(CarbonCommonConstants.FLAT_FOLDER).equalsIgnoreCase("true");
+    }
+    return supportFlatFolder;
+  }
+
+  /**
    * update the carbon table by using the passed tableInfo
    *
    * @param table

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index 1d0ef44..40d498c 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -47,6 +47,8 @@ import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
+import org.apache.hadoop.fs.Path;
+
 /**
  * This class contains all update utility methods
  */
@@ -78,20 +80,17 @@ public class CarbonUpdateUtil {
 
   /**
    * Returns block path from tuple id
-   *
-   * @param tid
-   * @param factPath
-   * @return
    */
-  public static String getTableBlockPath(String tid, String factPath, boolean isPartitionTable) {
+  public static String getTableBlockPath(String tid, String tablePath, boolean isSegmentFile) {
     String partField = getRequiredFieldFromTID(tid, TupleIdEnum.PART_ID);
-    if (isPartitionTable) {
-      return factPath + CarbonCommonConstants.FILE_SEPARATOR + partField;
+    // If it has segment file then partfield can be appended directly to table path
+    if (isSegmentFile) {
+      return tablePath + CarbonCommonConstants.FILE_SEPARATOR + partField.replace("#", "/");
     }
     String part = CarbonTablePath.addPartPrefix(partField);
     String segment =
             CarbonTablePath.addSegmentPrefix(getRequiredFieldFromTID(tid, TupleIdEnum.SEGMENT_ID));
-    return factPath + CarbonCommonConstants.FILE_SEPARATOR + part
+    return CarbonTablePath.getFactDir(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + part
             + CarbonCommonConstants.FILE_SEPARATOR + segment;
   }
 
@@ -386,29 +385,45 @@ public class CarbonUpdateUtil {
     return segmentName.split(CarbonCommonConstants.UNDERSCORE)[1];
   }
 
-  public static long getLatestTaskIdForSegment(String segmentId, String tablePath) {
-    String segmentDirPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
+  public static long getLatestTaskIdForSegment(Segment segment, String tablePath)
+      throws IOException {
+    long max = 0;
+    List<String> dataFiles = new ArrayList<>();
+    if (segment.getSegmentFileName() != null) {
+      SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName());
+      fileStore.readIndexFiles();
+      Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+      List<String> dataFilePaths = new ArrayList<>();
+      for (List<String> paths : indexFilesMap.values()) {
+        dataFilePaths.addAll(paths);
+      }
+      for (String dataFilePath : dataFilePaths) {
+        dataFiles.add(new Path(dataFilePath).getName());
+      }
 
-    // scan all the carbondata files and get the latest task ID.
-    CarbonFile segment =
-            FileFactory.getCarbonFile(segmentDirPath, FileFactory.getFileType(segmentDirPath));
-    CarbonFile[] dataFiles = segment.listFiles(new CarbonFileFilter() {
-      @Override public boolean accept(CarbonFile file) {
+    } else {
+      String segmentDirPath = CarbonTablePath.getSegmentPath(tablePath, segment.getSegmentNo());
+      // scan all the carbondata files and get the latest task ID.
+      CarbonFile segmentDir =
+          FileFactory.getCarbonFile(segmentDirPath, FileFactory.getFileType(segmentDirPath));
+      CarbonFile[] carbonDataFiles = segmentDir.listFiles(new CarbonFileFilter() {
+        @Override public boolean accept(CarbonFile file) {
 
-        if (file.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
-          return true;
+          if (file.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
+            return true;
+          }
+          return false;
         }
-        return false;
+      });
+      for (CarbonFile carbonDataFile : carbonDataFiles) {
+        dataFiles.add(carbonDataFile.getName());
       }
-    });
-    long max = 0;
-    if (null != dataFiles) {
-      for (CarbonFile file : dataFiles) {
-        long taskNumber =
-            Long.parseLong(CarbonTablePath.DataFileUtil.getTaskNo(file.getName()).split("_")[0]);
-        if (taskNumber > max) {
-          max = taskNumber;
-        }
+    }
+    for (String name : dataFiles) {
+      long taskNumber =
+          Long.parseLong(CarbonTablePath.DataFileUtil.getTaskNo(name).split("_")[0]);
+      if (taskNumber > max) {
+        max = taskNumber;
       }
     }
     // return max task No
@@ -562,7 +577,7 @@ public class CarbonUpdateUtil {
     List<Segment> segmentFilesToBeUpdatedLatest = new ArrayList<>();
     for (Segment segment : segmentFilesToBeUpdated) {
       String file =
-          SegmentFileStore.writeSegmentFile(table.getTablePath(), segment.getSegmentNo(), UUID);
+          SegmentFileStore.writeSegmentFile(table, segment.getSegmentNo(), UUID);
       segmentFilesToBeUpdatedLatest.add(new Segment(segment.getSegmentNo(), file));
     }
     if (segmentFilesToBeUpdated.size() > 0) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index ff0e5ce..2bbe75c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.common.logging.impl.StandardLogService;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.IndexKey;
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
@@ -68,6 +69,7 @@ import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import org.apache.commons.lang3.ArrayUtils;
 
@@ -256,7 +258,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
               dataRefNode.numberOfNodes(),
               dataRefNode.getBlockInfos().get(0).getFilePath(),
               dataRefNode.getBlockInfos().get(0).getDeletedDeltaFilePath(),
-              dataRefNode.getBlockInfos().get(0).getSegmentId()));
+              dataRefNode.getBlockInfos().get(0).getSegment()));
     }
     if (null != queryModel.getStatisticsRecorder()) {
       QueryStatistic queryStatistic = new QueryStatistic();
@@ -278,7 +280,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
    */
   private BlockExecutionInfo getBlockExecutionInfoForBlock(QueryModel queryModel,
       AbstractIndex blockIndex, int startBlockletIndex, int numberOfBlockletToScan, String filePath,
-      String[] deleteDeltaFiles, String segmentId)
+      String[] deleteDeltaFiles, Segment segment)
       throws QueryExecutionException {
     BlockExecutionInfo blockExecutionInfo = new BlockExecutionInfo();
     SegmentProperties segmentProperties = blockIndex.getSegmentProperties();
@@ -291,9 +293,15 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
             queryModel.getProjectionDimensions(), tableBlockDimensions,
             segmentProperties.getComplexDimensions(), queryModel.getProjectionMeasures().size(),
             queryModel.getTable().getTableInfo().isTransactionalTable());
-    blockExecutionInfo.setBlockId(
-        CarbonUtil.getBlockId(queryModel.getAbsoluteTableIdentifier(), filePath, segmentId,
-            queryModel.getTable().getTableInfo().isTransactionalTable()));
+    String blockId = CarbonUtil
+        .getBlockId(queryModel.getAbsoluteTableIdentifier(), filePath, segment.getSegmentNo(),
+            queryModel.getTable().getTableInfo().isTransactionalTable(),
+            segment.getSegmentFileName() != null);
+    if (segment.getSegmentFileName() != null) {
+      blockExecutionInfo.setBlockId(CarbonTablePath.getShortBlockIdForPartitionTable(blockId));
+    } else {
+      blockExecutionInfo.setBlockId(CarbonTablePath.getShortBlockId(blockId));
+    }
     blockExecutionInfo.setDeleteDeltaFilePath(deleteDeltaFiles);
     blockExecutionInfo.setStartBlockletIndex(startBlockletIndex);
     blockExecutionInfo.setNumberOfBlockletToScan(numberOfBlockletToScan);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
index b85945f..eadd502 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
@@ -45,7 +45,6 @@ import org.apache.carbondata.core.stats.QueryStatistic;
 import org.apache.carbondata.core.stats.QueryStatisticsConstants;
 import org.apache.carbondata.core.stats.QueryStatisticsModel;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 /**
  * Scanned result class which will store and provide the result on request
@@ -525,7 +524,7 @@ public abstract class BlockletScannedResult {
    * "Part0/Segment_0/part-0-0_batchno0-0-1517155583332.carbondata/0"
    */
   public void setBlockletId(String blockletId) {
-    this.blockletId = CarbonTablePath.getShortBlockId(blockletId);
+    this.blockletId = blockletId;
     blockletNumber = CarbonUpdateUtil.getRequiredFieldFromTID(blockletId, TupleIdEnum.BLOCKLET_ID);
     // if deleted recors map is present for this block
     // then get the first page deleted vo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
index a48804c..1665ce6 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
@@ -82,9 +82,9 @@ public class BlockletFullScanner implements BlockletScanner {
         .get(QueryStatisticsConstants.TOTAL_PAGE_SCANNED);
     totalPagesScanned.addCountStatistic(QueryStatisticsConstants.TOTAL_PAGE_SCANNED,
         totalPagesScanned.getCount() + rawBlockletColumnChunks.getDataBlock().numberOfPages());
-    scannedResult.setBlockletId(
-        blockExecutionInfo.getBlockIdString() + CarbonCommonConstants.FILE_SEPARATOR +
-            rawBlockletColumnChunks.getDataBlock().blockletIndex());
+    String blockletId = blockExecutionInfo.getBlockIdString() + CarbonCommonConstants.FILE_SEPARATOR
+        + rawBlockletColumnChunks.getDataBlock().blockletIndex();
+    scannedResult.setBlockletId(blockletId);
     if (!blockExecutionInfo.isPrefetchBlocklet()) {
       readBlocklet(rawBlockletColumnChunks);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index c2faadc..eb850e4 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -69,7 +69,6 @@ public class SegmentUpdateStatusManager {
   private LoadMetadataDetails[] segmentDetails;
   private SegmentUpdateDetails[] updateDetails;
   private Map<String, SegmentUpdateDetails> blockAndDetailsMap;
-  private boolean isPartitionTable;
 
   public SegmentUpdateStatusManager(CarbonTable table,
       LoadMetadataDetails[] segmentDetails) {
@@ -77,7 +76,6 @@ public class SegmentUpdateStatusManager {
     // current it is used only for read function scenarios, as file update always requires to work
     // on latest file status.
     this.segmentDetails = segmentDetails;
-    isPartitionTable = table.isHivePartitionTable();
     updateDetails = readLoadMetadata();
     populateMap();
   }
@@ -93,7 +91,6 @@ public class SegmentUpdateStatusManager {
       segmentDetails = SegmentStatusManager.readLoadMetadata(
           CarbonTablePath.getMetadataPath(identifier.getTablePath()));
     }
-    isPartitionTable = table.isHivePartitionTable();
     if (segmentDetails.length != 0) {
       updateDetails = readLoadMetadata();
     } else {
@@ -249,27 +246,37 @@ public class SegmentUpdateStatusManager {
    * @throws Exception
    */
   public String[] getDeleteDeltaFilePath(String blockFilePath, String segmentId) throws Exception {
-    String blockId = CarbonUtil.getBlockId(identifier, blockFilePath, segmentId, true);
+    String segmentFile = null;
+    for (LoadMetadataDetails segmentDetail : segmentDetails) {
+      if (segmentDetail.getLoadName().equals(segmentId)) {
+        segmentFile = segmentDetail.getSegmentFile();
+        break;
+      }
+    }
+    String blockId =
+        CarbonUtil.getBlockId(identifier, blockFilePath, segmentId, true, segmentFile != null);
     String tupleId;
-    if (isPartitionTable) {
+    if (segmentFile != null) {
       tupleId = CarbonTablePath.getShortBlockIdForPartitionTable(blockId);
     } else {
       tupleId = CarbonTablePath.getShortBlockId(blockId);
     }
-    return getDeltaFiles(tupleId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
+    return getDeltaFiles(tupleId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT, segmentFile)
         .toArray(new String[0]);
   }
 
   /**
    * Returns all delta file paths of specified block
    */
-  private List<String> getDeltaFiles(String tupleId, String extension) throws Exception {
+  private List<String> getDeltaFiles(String tupleId, String extension, String segmentFile)
+      throws Exception {
     String segment = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID);
     String completeBlockName = CarbonTablePath.addDataPartPrefix(
         CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.BLOCK_ID)
             + CarbonCommonConstants.FACT_FILE_EXT);
+
     String blockPath;
-    if (isPartitionTable) {
+    if (segmentFile != null) {
       blockPath = identifier.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR
           + CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.PART_ID)
           .replace("#", "/") + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName;
@@ -283,7 +290,8 @@ public class SegmentUpdateStatusManager {
     if (!file.exists()) {
       throw new Exception("Invalid tuple id " + tupleId);
     }
-    String blockNameWithoutExtn = completeBlockName.substring(0, completeBlockName.indexOf('.'));
+    String blockNameWithoutExtn =
+        completeBlockName.substring(0, completeBlockName.lastIndexOf('.'));
     //blockName without timestamp
     final String blockNameFromTuple =
         blockNameWithoutExtn.substring(0, blockNameWithoutExtn.lastIndexOf("-"));
@@ -363,15 +371,15 @@ public class SegmentUpdateStatusManager {
         public boolean accept(CarbonFile pathName) {
           String fileName = pathName.getName();
           if (fileName.endsWith(extension) && pathName.getSize() > 0) {
-            String firstPart = fileName.substring(0, fileName.indexOf('.'));
+            String firstPart = fileName.substring(0, fileName.lastIndexOf('.'));
             String blockName =
-                    firstPart.substring(0, firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN));
+                firstPart.substring(0, firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN));
             long timestamp = Long.parseLong(firstPart
-                    .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
-                            firstPart.length()));
+                .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
+                    firstPart.length()));
             if (blockNameFromTuple.equals(blockName) && (
-                    (Long.compare(timestamp, deltaEndTimeStamp) <= 0) && (
-                            Long.compare(timestamp, deltaStartTimestamp) >= 0))) {
+                (Long.compare(timestamp, deltaEndTimeStamp) <= 0) && (
+                    Long.compare(timestamp, deltaStartTimestamp) >= 0))) {
               return true;
             }
           }
@@ -479,7 +487,7 @@ public class SegmentUpdateStatusManager {
 
       String fileName = eachFile.getName();
       if (fileName.endsWith(fileExtension)) {
-        String firstPart = fileName.substring(0, fileName.indexOf('.'));
+        String firstPart = fileName.substring(0, fileName.lastIndexOf('.'));
 
         long timestamp = Long.parseLong(firstPart
             .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 2aa4a05..836b193 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2973,24 +2973,28 @@ public final class CarbonUtil {
    * @return
    */
   public static String getBlockId(AbsoluteTableIdentifier identifier, String filePath,
-      String segmentId, boolean isTransactionalTable) {
+      String segmentId, boolean isTransactionalTable, boolean hasSegmentFile) {
     String blockId;
     String blockName = filePath.substring(filePath.lastIndexOf("/") + 1, filePath.length());
     String tablePath = identifier.getTablePath();
 
     if (filePath.startsWith(tablePath)) {
-      String factDir = CarbonTablePath.getFactDir(tablePath);
-      if (filePath.startsWith(factDir) || !isTransactionalTable) {
+      if (!isTransactionalTable || !hasSegmentFile) {
         blockId = "Part0" + CarbonCommonConstants.FILE_SEPARATOR + "Segment_" + segmentId
             + CarbonCommonConstants.FILE_SEPARATOR + blockName;
       } else {
         // This is the case with partition table.
-        String partitionDir =
-            filePath.substring(tablePath.length() + 1, filePath.length() - blockName.length() - 1);
-
+        String partitionDir;
+        if (tablePath.length() + 1 < filePath.length() - blockName.length() - 1) {
+          partitionDir =
+              filePath.substring(tablePath.length() + 1,
+                  filePath.length() - blockName.length() - 1);
+        } else {
+          partitionDir = "";
+        }
         // Replace / with # on partition director to support multi level partitioning. And access
         // them all as a single entity.
-        blockId = partitionDir.replace("/", "#") + CarbonCommonConstants.FILE_SEPARATOR + "Segment_"
+        blockId = partitionDir.replace("/", "#") + CarbonCommonConstants.FILE_SEPARATOR
             + segmentId + CarbonCommonConstants.FILE_SEPARATOR + blockName;
       }
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index e8a121c..fe68adf 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -228,7 +228,7 @@ public class CarbonTablePath {
   public static String getCarbonDataFilePath(String tablePath, String segmentId, Integer filePartNo,
       Long taskNo, int batchNo, int bucketNumber, String factUpdateTimeStamp) {
     return getSegmentPath(tablePath, segmentId) + File.separator + getCarbonDataFileName(
-        filePartNo, taskNo, bucketNumber, batchNo, factUpdateTimeStamp);
+        filePartNo, taskNo, bucketNumber, batchNo, factUpdateTimeStamp, segmentId);
   }
 
   /**
@@ -283,7 +283,7 @@ public class CarbonTablePath {
       default:
         String segmentDir = getSegmentPath(tablePath, segmentId);
         return segmentDir + File.separator + getCarbonIndexFileName(taskId,
-            Integer.parseInt(bucketNumber), timeStamp);
+            Integer.parseInt(bucketNumber), timeStamp, segmentId);
     }
   }
 
@@ -297,16 +297,17 @@ public class CarbonTablePath {
       default:
         String segmentDir = getSegmentPath(tablePath, segmentId);
         return segmentDir + File.separator + getCarbonIndexFileName(Long.parseLong(taskId),
-            Integer.parseInt(bucketNumber), batchNo, timeStamp);
+            Integer.parseInt(bucketNumber), batchNo, timeStamp, segmentId);
     }
   }
 
   private static String getCarbonIndexFileName(String taskNo, int bucketNumber,
-      String factUpdatedtimeStamp) {
+      String factUpdatedtimeStamp, String segmentNo) {
     if (bucketNumber == -1) {
-      return taskNo + "-" + factUpdatedtimeStamp + INDEX_FILE_EXT;
+      return taskNo + "-" + segmentNo + "-" + factUpdatedtimeStamp + INDEX_FILE_EXT;
     }
-    return taskNo + "-" + bucketNumber + "-" + factUpdatedtimeStamp + INDEX_FILE_EXT;
+    return taskNo + "-" + bucketNumber + "-" + segmentNo + "-" + factUpdatedtimeStamp
+        + INDEX_FILE_EXT;
   }
 
   /**
@@ -325,14 +326,15 @@ public class CarbonTablePath {
    * @return gets data file name only with out path
    */
   public static String getCarbonDataFileName(Integer filePartNo, Long taskNo, int bucketNumber,
-      int batchNo, String factUpdateTimeStamp) {
+      int batchNo, String factUpdateTimeStamp, String segmentNo) {
     return DATA_PART_PREFIX + filePartNo + "-" + taskNo + BATCH_PREFIX + batchNo + "-"
-        + bucketNumber + "-" + factUpdateTimeStamp + CARBON_DATA_EXT;
+        + bucketNumber + "-" + segmentNo + "-" + factUpdateTimeStamp + CARBON_DATA_EXT;
   }
 
   public static String getShardName(Long taskNo, int bucketNumber, int batchNo,
-      String factUpdateTimeStamp) {
-    return taskNo + BATCH_PREFIX + batchNo + "-" + bucketNumber + "-" + factUpdateTimeStamp;
+      String factUpdateTimeStamp, String segmentNo) {
+    return taskNo + BATCH_PREFIX + batchNo + "-" + bucketNumber + "-" + segmentNo + "-"
+        + factUpdateTimeStamp;
   }
 
   /**
@@ -343,13 +345,13 @@ public class CarbonTablePath {
    * @return filename
    */
   public static String getCarbonIndexFileName(long taskNo, int bucketNumber, int batchNo,
-      String factUpdatedTimeStamp) {
-    return taskNo + BATCH_PREFIX + batchNo + "-" + bucketNumber + "-" + factUpdatedTimeStamp
+      String factUpdatedTimeStamp, String segmentNo) {
+    return getShardName(taskNo, bucketNumber, batchNo, factUpdatedTimeStamp, segmentNo)
         + INDEX_FILE_EXT;
   }
 
   public static String getCarbonStreamIndexFileName() {
-    return getCarbonIndexFileName(0, 0, 0, "0");
+    return getCarbonIndexFileName(0, 0, 0, "0", "0");
   }
 
   public static String getCarbonStreamIndexFilePath(String segmentDir) {
@@ -408,11 +410,25 @@ public class CarbonTablePath {
   public static String getDataMapStorePathOnShardName(String tablePath, String segmentId,
       String dataMapName, String shardName) {
     return new StringBuilder()
-        .append(getSegmentPath(tablePath, segmentId))
+        .append(getDataMapStorePath(tablePath, segmentId, dataMapName))
+        .append(File.separator)
+        .append(shardName)
+        .toString();
+  }
+
+  /**
+   * Return store path for datamap based on the dataMapName,
+   *
+   * @return store path based on datamapname
+   */
+  public static String getDataMapStorePath(String tablePath, String segmentId,
+      String dataMapName) {
+    return new StringBuilder()
+        .append(tablePath)
         .append(File.separator)
         .append(dataMapName)
         .append(File.separator)
-        .append(shardName)
+        .append(segmentId)
         .toString();
   }
 
@@ -517,6 +533,29 @@ public class CarbonTablePath {
     }
 
     /**
+     * Return the updated timestamp information from given carbon data file name
+     */
+    public static String getSegmentNo(String carbonDataFileName) {
+      // Get the file name from path
+      String fileName = getFileName(carbonDataFileName);
+      // + 1 for size of "-"
+      int firstDashPos = fileName.indexOf("-");
+      int startIndex1 = fileName.indexOf("-", firstDashPos + 1) + 1;
+      int endIndex1 = fileName.indexOf("-", startIndex1);
+      int startIndex = fileName.indexOf("-", endIndex1 + 1);
+      if (startIndex > -1) {
+        startIndex += 1;
+        int endIndex = fileName.indexOf("-", startIndex);
+        if (endIndex == -1) {
+          return null;
+        }
+        return fileName.substring(startIndex, endIndex);
+      } else {
+        return null;
+      }
+    }
+
+    /**
      * Return the taskId part from taskNo(include taskId + batchNo)
      */
     public static long getTaskIdFromTaskNo(String taskNo) {
@@ -545,7 +584,7 @@ public class CarbonTablePath {
     /**
      * gets segement id from given absolute data file path
      */
-    public static String getSegmentId(String dataFileAbsolutePath) {
+    public static String getSegmentIdFromPath(String dataFileAbsolutePath) {
       // find segment id from last of data file path
       String tempdataFileAbsolutePath = dataFileAbsolutePath.replace(
           CarbonCommonConstants.WINDOWS_FILE_SEPARATOR, CarbonCommonConstants.FILE_SEPARATOR);
@@ -647,8 +686,7 @@ public class CarbonTablePath {
    * @return shortBlockId
    */
   public static String getShortBlockIdForPartitionTable(String blockId) {
-    return blockId.replace(SEGMENT_PREFIX, "")
-        .replace(DATA_PART_PREFIX, "")
+    return blockId.replace(DATA_PART_PREFIX, "")
         .replace(CARBON_DATA_EXT, "");
   }
 
@@ -687,7 +725,8 @@ public class CarbonTablePath {
    */
   public static String getShardName(String actualBlockName) {
     return DataFileUtil.getTaskNo(actualBlockName) + "-" + DataFileUtil.getBucketNo(actualBlockName)
-        + "-" + DataFileUtil.getTimeStampFromFileName(actualBlockName);
+        + "-" + DataFileUtil.getSegmentNo(actualBlockName) + "-" + DataFileUtil
+        .getTimeStampFromFileName(actualBlockName);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java
index ecdaf3d..e61ea6f 100644
--- a/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/datastore/block/TableBlockInfoTest.java
@@ -169,7 +169,7 @@ public class TableBlockInfoTest {
 
   @Test public void hashCodeTest() {
     int res = tableBlockInfo.hashCode();
-    int expectedResult = 1041505621;
+    int expectedResult = 1041506582;
     assertEquals(res, expectedResult);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java b/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
index 4293536..e52c737 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
@@ -34,26 +34,6 @@ public class CarbonFormatDirectoryStructureTest {
 
   private final String CARBON_STORE = "/opt/carbonstore";
 
-  /**
-   * test table path methods
-   */
-  @Test public void testTablePathStructure() throws IOException {
-    CarbonTableIdentifier tableIdentifier = new CarbonTableIdentifier("d1", "t1", UUID.randomUUID().toString());
-    AbsoluteTableIdentifier identifier =
-        AbsoluteTableIdentifier.from(CARBON_STORE + "/d1/t1", tableIdentifier);
-    assertTrue(identifier.getTablePath().replace("\\", "/").equals(CARBON_STORE + "/d1/t1"));
-    assertTrue(CarbonTablePath.getSchemaFilePath(identifier.getTablePath()).replace("\\", "/").equals(CARBON_STORE + "/d1/t1/Metadata/schema"));
-    assertTrue(CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()).replace("\\", "/")
-        .equals(CARBON_STORE + "/d1/t1/Metadata/tablestatus"));
-    assertTrue(CarbonTablePath.getDictionaryFilePath(identifier.getTablePath(), "t1_c1").replace("\\", "/")
-        .equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.dict"));
-    assertTrue(CarbonTablePath.getDictionaryMetaFilePath(identifier.getTablePath(), "t1_c1").replace("\\", "/")
-        .equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.dictmeta"));
-    assertTrue(CarbonTablePath.getSortIndexFilePath(identifier.getTablePath(),"t1_c1").replace("\\", "/")
-        .equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.sortindex"));
-    assertTrue(CarbonTablePath.getCarbonDataFilePath(identifier.getTablePath(), "2", 3, 4L,  0, 0, "999").replace("\\", "/")
-        .equals(CARBON_STORE + "/d1/t1/Fact/Part0/Segment_2/part-3-4_batchno0-0-999.carbondata"));
-  }
 
   /**
    * test data file name
@@ -67,5 +47,7 @@ public class CarbonFormatDirectoryStructureTest {
     assertTrue(CarbonTablePath.DataFileUtil.getTaskNo("/opt/apache-carbon/part-3-4-999.carbondata").equals("4"));
     assertTrue(
         CarbonTablePath.DataFileUtil.getTimeStampFromFileName("/opt/apache-carbon/part-3-4-999.carbondata").equals("999"));
+    assertTrue(CarbonTablePath.DataFileUtil.getSegmentNo("part-3-4-0-999.carbondata") == null);
+    assertTrue(CarbonTablePath.DataFileUtil.getSegmentNo("part-3-4-0-0-999.carbondata").equals("0"));
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
index 3231551..cda49b3 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
@@ -16,7 +16,6 @@
  */
 package org.apache.carbondata.datamap.bloom;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -259,8 +258,8 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
     if (dataMaps.size() > 0) {
       for (TableDataMap dataMap : dataMaps) {
         List<CarbonFile> indexFiles;
-        String dmPath = CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator
-            + dataMap.getDataMapSchema().getDataMapName();
+        String dmPath = CarbonTablePath
+            .getDataMapStorePath(tablePath, segmentId, dataMap.getDataMapSchema().getDataMapName());
         FileFactory.FileType fileType = FileFactory.getFileType(dmPath);
         final CarbonFile dirPath = FileFactory.getCarbonFile(dmPath, fileType);
         indexFiles = Arrays.asList(dirPath.listFiles(new CarbonFileFilter() {
@@ -323,9 +322,8 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
       List<Segment> validSegments = ssm.getValidAndInvalidSegments().getValidSegments();
       for (Segment segment : validSegments) {
         String segmentId = segment.getSegmentNo();
-        String datamapPath = CarbonTablePath.getSegmentPath(
-            getCarbonTable().getAbsoluteTableIdentifier().getTablePath(), segmentId)
-            + File.separator + dataMapName;
+        String datamapPath = CarbonTablePath
+            .getDataMapStorePath(getCarbonTable().getTablePath(), segmentId, dataMapName);
         if (FileFactory.isFileExist(datamapPath)) {
           CarbonFile file = FileFactory.getCarbonFile(datamapPath,
               FileFactory.getFileType(datamapPath));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
----------------------------------------------------------------------
diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
index 1da8edd..cc14dc4 100644
--- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
+++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
@@ -17,7 +17,6 @@
 
 package org.apache.carbondata.datamap.lucene;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -182,9 +181,8 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> extends DataMapFactor
       List<Segment> validSegments = ssm.getValidAndInvalidSegments().getValidSegments();
       for (Segment segment : validSegments) {
         String segmentId = segment.getSegmentNo();
-        String datamapPath =
-            CarbonTablePath.getSegmentPath(tableIdentifier.getTablePath(), segmentId)
-                + File.separator + dataMapName;
+        String datamapPath = CarbonTablePath
+            .getDataMapStorePath(tableIdentifier.getTablePath(), segmentId, dataMapName);
         if (FileFactory.isFileExist(datamapPath)) {
           CarbonFile file =
               FileFactory.getCarbonFile(datamapPath, FileFactory.getFileType(datamapPath));
@@ -227,9 +225,9 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> extends DataMapFactor
         getAllIndexDirs(tableIdentifier.getTablePath(), segment.getSegmentNo());
     if (segment.getFilteredIndexShardNames().size() == 0) {
       for (CarbonFile indexDir : indexDirs) {
-        DataMapDistributable luceneDataMapDistributable = new LuceneDataMapDistributable(
-            CarbonTablePath.getSegmentPath(tableIdentifier.getTablePath(), segment.getSegmentNo()),
-            indexDir.getAbsolutePath());
+        DataMapDistributable luceneDataMapDistributable =
+            new LuceneDataMapDistributable(tableIdentifier.getTablePath(),
+                indexDir.getAbsolutePath());
         lstDataMapDistribute.add(luceneDataMapDistributable);
       }
       return lstDataMapDistribute;
@@ -303,9 +301,8 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> extends DataMapFactor
     if (dataMaps.size() > 0) {
       for (TableDataMap dataMap : dataMaps) {
         List<CarbonFile> indexFiles;
-        String dmPath =
-            CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator + dataMap
-                .getDataMapSchema().getDataMapName();
+        String dmPath = CarbonTablePath
+            .getDataMapStorePath(tablePath, segmentId, dataMap.getDataMapSchema().getDataMapName());
         FileFactory.FileType fileType = FileFactory.getFileType(dmPath);
         final CarbonFile dirPath = FileFactory.getCarbonFile(dmPath, fileType);
         indexFiles = Arrays.asList(dirPath.listFiles(new CarbonFileFilter() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index 02d272e..405ff53 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -54,7 +54,7 @@ public class CarbonInputSplit extends FileSplit
   private static final long serialVersionUID = 3520344046772190207L;
   public String taskId;
 
-  private String segmentId;
+  private Segment segment;
 
   private String bucketId;
 
@@ -91,7 +91,7 @@ public class CarbonInputSplit extends FileSplit
   private String dataMapWritePath;
 
   public CarbonInputSplit() {
-    segmentId = null;
+    segment = null;
     taskId = "0";
     bucketId = "0";
     blockletId = "0";
@@ -104,7 +104,7 @@ public class CarbonInputSplit extends FileSplit
       String[] locations, ColumnarFormatVersion version, String[] deleteDeltaFiles,
       String dataMapWritePath) {
     super(path, start, length, locations);
-    this.segmentId = segmentId;
+    this.segment = Segment.toSegment(segmentId);
     String taskNo = CarbonTablePath.DataFileUtil.getTaskNo(path.getName());
     if (taskNo.contains("_")) {
       taskNo = taskNo.split("_")[0];
@@ -128,7 +128,7 @@ public class CarbonInputSplit extends FileSplit
   public CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations,
       FileFormat fileFormat) {
     super(path, start, length, locations);
-    this.segmentId = segmentId;
+    this.segment = Segment.toSegment(segmentId);
     this.fileFormat = fileFormat;
     taskId = "0";
     bucketId = "0";
@@ -141,7 +141,7 @@ public class CarbonInputSplit extends FileSplit
   public CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations,
       String[] inMemoryHosts, FileFormat fileFormat) {
     super(path, start, length, locations, inMemoryHosts);
-    this.segmentId = segmentId;
+    this.segment = Segment.toSegment(segmentId);
     this.fileFormat = fileFormat;
     taskId = "0";
     bucketId = "0";
@@ -184,8 +184,8 @@ public class CarbonInputSplit extends FileSplit
       try {
         TableBlockInfo blockInfo =
             new TableBlockInfo(split.getPath().toString(), split.blockletId, split.getStart(),
-                split.getSegmentId(), split.getLocations(), split.getLength(), blockletInfos,
-                split.getVersion(), split.getDeleteDeltaFiles());
+                split.getSegment().toString(), split.getLocations(), split.getLength(),
+                blockletInfos, split.getVersion(), split.getDeleteDeltaFiles());
         blockInfo.setDetailInfo(split.getDetailInfo());
         blockInfo.setDataMapWriterPath(split.dataMapWritePath);
         blockInfo.setBlockOffset(split.getDetailInfo().getBlockFooterOffset());
@@ -203,7 +203,7 @@ public class CarbonInputSplit extends FileSplit
     try {
       TableBlockInfo blockInfo =
           new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.blockletId,
-              inputSplit.getStart(), inputSplit.getSegmentId(), inputSplit.getLocations(),
+              inputSplit.getStart(), inputSplit.getSegment().toString(), inputSplit.getLocations(),
               inputSplit.getLength(), blockletInfos, inputSplit.getVersion(),
               inputSplit.getDeleteDeltaFiles());
       blockInfo.setDetailInfo(inputSplit.getDetailInfo());
@@ -215,12 +215,21 @@ public class CarbonInputSplit extends FileSplit
   }
 
   public String getSegmentId() {
-    return segmentId;
+    if (segment != null) {
+      return segment.getSegmentNo();
+    } else {
+      return null;
+    }
   }
 
+  public Segment getSegment() {
+    return segment;
+  }
+
+
   @Override public void readFields(DataInput in) throws IOException {
     super.readFields(in);
-    this.segmentId = in.readUTF();
+    this.segment = Segment.toSegment(in.readUTF());
     this.version = ColumnarFormatVersion.valueOf(in.readShort());
     this.bucketId = in.readUTF();
     this.blockletId = in.readUTF();
@@ -247,7 +256,7 @@ public class CarbonInputSplit extends FileSplit
 
   @Override public void write(DataOutput out) throws IOException {
     super.write(out);
-    out.writeUTF(segmentId);
+    out.writeUTF(segment.toString());
     out.writeShort(version.number());
     out.writeUTF(bucketId);
     out.writeUTF(blockletId);
@@ -323,7 +332,7 @@ public class CarbonInputSplit extends FileSplit
     // get the segment id
     // converr seg ID to double.
 
-    double seg1 = Double.parseDouble(segmentId);
+    double seg1 = Double.parseDouble(segment.getSegmentNo());
     double seg2 = Double.parseDouble(other.getSegmentId());
     if (seg1 - seg2 < 0) {
       return -1;
@@ -381,7 +390,7 @@ public class CarbonInputSplit extends FileSplit
 
   @Override public int hashCode() {
     int result = taskId.hashCode();
-    result = 31 * result + segmentId.hashCode();
+    result = 31 * result + segment.hashCode();
     result = 31 * result + bucketId.hashCode();
     result = 31 * result + invalidSegments.hashCode();
     result = 31 * result + numberOfBlocklets;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 485b087..3688026 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -488,7 +488,7 @@ m filterExpression
       segment.getFilteredIndexShardNames().clear();
       // Check the segment exist in any of the pruned blocklets.
       for (ExtendedBlocklet blocklet : prunedBlocklets) {
-        if (blocklet.getSegmentId().equals(segment.getSegmentNo())) {
+        if (blocklet.getSegmentId().equals(segment.toString())) {
           found = true;
           // Set the pruned index file to the segment for further pruning.
           String shardName = CarbonTablePath.getShardName(blocklet.getFilePath());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 4feb044..b549b16 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -420,6 +420,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     List<Segment> invalidSegments = new ArrayList<>();
     List<UpdateVO> invalidTimestampsList = new ArrayList<>();
 
+
     try {
       carbonTable = getOrCreateCarbonTable(job.getConfiguration());
       ReadCommittedScope readCommittedScope =
@@ -427,7 +428,9 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
       this.readCommittedScope = readCommittedScope;
 
       List<Segment> segmentList = new ArrayList<>();
-      segmentList.add(new Segment(targetSegment, null, readCommittedScope));
+      Segment segment = Segment.getSegment(targetSegment, carbonTable.getTablePath());
+      segmentList.add(
+          new Segment(segment.getSegmentNo(), segment.getSegmentFileName(), readCommittedScope));
       setSegmentsToAccess(job.getConfiguration(), segmentList);
 
       // process and resolve the expression
@@ -599,7 +602,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
 
       long rowCount = blocklet.getDetailInfo().getRowCount();
 
-      String key = CarbonUpdateUtil.getSegmentBlockNameKey(blocklet.getSegmentId(), blockName);
+      String segmentId = Segment.toSegment(blocklet.getSegmentId()).getSegmentNo();
+      String key = CarbonUpdateUtil.getSegmentBlockNameKey(segmentId, blockName);
 
       // if block is invalid then dont add the count
       SegmentUpdateDetails details = updateStatusManager.getDetailsForABlock(key);
@@ -608,11 +612,11 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
         Long blockCount = blockRowCountMapping.get(key);
         if (blockCount == null) {
           blockCount = 0L;
-          Long count = segmentAndBlockCountMapping.get(blocklet.getSegmentId());
+          Long count = segmentAndBlockCountMapping.get(segmentId);
           if (count == null) {
             count = 0L;
           }
-          segmentAndBlockCountMapping.put(blocklet.getSegmentId(), count + 1);
+          segmentAndBlockCountMapping.put(segmentId, count + 1);
         }
         blockCount += rowCount;
         blockRowCountMapping.put(key, blockCount);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingIUDTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingIUDTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingIUDTestCase.scala
index 79458f5..f4d7034 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingIUDTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingIUDTestCase.scala
@@ -3301,7 +3301,12 @@ test("IUD-01-01-02_023-67", Include) {
 //Delete the uniqdata table 
 test("IUD-01-01-02_023-68", Include) {
    sql(s"""use default""").collect
- sql(s"""delete from table uniqdata where segment.id IN(0)""").collect
+ try {
+   sql(s"""delete from table uniqdata where segment.id IN(0)""").collect
+ } catch {
+   case e: Exception =>
+     // ignore as data is already deleted in segment 0
+ }
   checkAnswer(s"""select DOJ from uniqdata where CUST_ID=9001""",
     Seq(Row(Timestamp.valueOf("2012-01-12 03:14:05.0"))), "DataLoadingIUDTestCase_IUD-01-01-02_023-68")
   

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
index b027ce2..99a537a 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
@@ -18,12 +18,16 @@
 
 package org.apache.carbondata.cluster.sdv.generated
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql.common.util._
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
 import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
 import org.apache.carbondata.core.util.path.CarbonTablePath
 
@@ -93,35 +97,36 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll {
     val table = CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_nonmerge")
     new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0.1") == 0)
-    assert(getMergedIndexFileCount("default", "carbon_automation_nonmerge", "0.1") == 1)
+    assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0.1", true) >= 1)
     checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""), rows)
   }
 
-  private def getIndexFileCount(dbName: String, tableName: String, segment: String): Int = {
-    getFileCount(dbName, tableName, segment, CarbonTablePath.INDEX_FILE_EXT)
-  }
-
-  private def getMergedIndexFileCount(dbName: String, tableName: String, segment: String): Int = {
-    getFileCount(dbName, tableName, segment, CarbonTablePath.MERGE_INDEX_FILE_EXT)
-  }
-
-  private def getFileCount(dbName: String,
-      tableName: String,
-      segment: String,
-      suffix: String): Int = {
+  private def getIndexFileCount(dbName: String, tableName: String, segmentNo: String, mergeIndexCount: Boolean = false): Int = {
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName, tableName)
-    val identifier = carbonTable.getAbsoluteTableIdentifier
-    val path = CarbonTablePath
-      .getSegmentPath(identifier.getTablePath, segment)
-    val carbonFiles = FileFactory.getCarbonFile(path).listFiles(new CarbonFileFilter {
-      override def accept(file: CarbonFile): Boolean = {
-        file.getName.endsWith(suffix)
-      }
-    })
-    if (carbonFiles != null) {
-      carbonFiles.length
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNo)
+    if (FileFactory.isFileExist(segmentDir)) {
+      val map = new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir)
+      map.asScala.map { f =>
+        if (f._2 == null) {
+          1
+        } else {
+          if (mergeIndexCount) 1 else 0
+        }
+      }.sum
     } else {
-      0
+      val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath)
+      if (segment != null) {
+        val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
+        store.getSegmentFile.getLocationMap.values().asScala.map { f =>
+          if (f.getMergeFileName == null) {
+            f.getFiles.size()
+          } else {
+            if (mergeIndexCount) 1 else 0
+          }
+        }.sum
+      } else {
+        0
+      }
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
index f64a349..6530ec0 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
@@ -541,37 +541,51 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test lucene fine grain data map with text-match limit") {
-
+    sql("DROP TABLE IF EXISTS datamap_test_limit")
+    sql(
+      """
+        | CREATE TABLE datamap_test_limit(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
     sql(
       s"""
-         | CREATE DATAMAP dm ON TABLE datamap_test
+         | CREATE DATAMAP dm ON TABLE datamap_test_limit
          | USING 'lucene'
          | DMProperties('INDEX_COLUMNS'='name , city')
       """.stripMargin)
 
-    checkAnswer(sql("select count(*) from datamap_test where TEXT_MATCH_WITH_LIMIT('name:n10*',10)"),Seq(Row(10)))
-    checkAnswer(sql("select count(*) from datamap_test where TEXT_MATCH_WITH_LIMIT('name:n10*',50)"),Seq(Row(50)))
-    sql("drop datamap dm on table datamap_test")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_limit OPTIONS('header'='false')")
+    checkAnswer(sql("select count(*) from datamap_test_limit where TEXT_MATCH_WITH_LIMIT('name:n10*',10)"),Seq(Row(10)))
+    checkAnswer(sql("select count(*) from datamap_test_limit where TEXT_MATCH_WITH_LIMIT('name:n10*',50)"),Seq(Row(50)))
+    sql("drop datamap dm on table datamap_test_limit")
   }
 
   test("test lucene fine grain data map with InsertOverwrite") {
+    sql("DROP TABLE IF EXISTS datamap_test_overwrite")
+    sql(
+      """
+        | CREATE TABLE datamap_test_overwrite(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
     sql(
       s"""
-         | CREATE DATAMAP dm ON TABLE datamap_test
+         | CREATE DATAMAP dm ON TABLE datamap_test_overwrite
          | USING 'lucene'
          | DMProperties('INDEX_COLUMNS'='name , city')
       """.stripMargin)
 
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_overwrite OPTIONS('header'='false')")
     sql(
       """
         | CREATE TABLE table1(id INT, name STRING, city STRING, age INT)
         | STORED BY 'carbondata'
         | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
       """.stripMargin)
-    sql("INSERT OVERWRITE TABLE table1 select * from datamap_test where TEXT_MATCH('name:n*')")
+    sql("INSERT OVERWRITE TABLE table1 select *from datamap_test_overwrite where TEXT_MATCH('name:n*')")
     checkAnswer(sql("select count(*) from table1"),Seq(Row(10000)))
-    sql("drop datamap dm on table datamap_test")
-    sql("drop table table1")
+    sql("drop datamap dm on table datamap_test_overwrite")
   }
 
   test("explain query with lucene datamap") {
@@ -715,7 +729,7 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
       sql(s"SELECT * FROM datamap_test5 WHERE city='c020'"))
     sql("DROP TABLE IF EXISTS datamap_test5")
   }
-  
+
   test("test text_match on normal table") {
     sql("DROP TABLE IF EXISTS table1")
     sql(
@@ -731,7 +745,7 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
     assert(msg.getCause.getMessage.contains("TEXT_MATCH is not supported on table"))
     sql("DROP TABLE table1")
   }
-  
+
   test("test lucene with flush_cache as true") {
     sql("DROP TABLE IF EXISTS datamap_test_table")
     sql(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index 43b215e..688928f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -26,10 +26,11 @@ import org.scalatest.BeforeAndAfterEach
 
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
 import org.apache.spark.sql.test.util.QueryTest
 
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.util.CarbonProperties
 
 class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
@@ -53,10 +54,8 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
     val fileType: FileFactory.FileType = FileFactory.getFileType(partitionPath)
     val carbonFile = FileFactory.getCarbonFile(partitionPath, fileType)
     val segments: ArrayBuffer[String] = ArrayBuffer()
-    carbonFile.listFiles.foreach { file =>
-      segments += CarbonTablePath.DataFileUtil.getSegmentId(file.getAbsolutePath + "/dummy")
-    }
-    segments.contains(segmentId)
+    val segment = Segment.getSegment(segmentId, carbonTable.getAbsoluteTableIdentifier.getTablePath)
+    segment != null
   }
 
   test("test data loading CSV file") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
index 4860b32..8487b9e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
@@ -16,17 +16,13 @@
  */
 package org.apache.carbondata.spark.testsuite.allqueries
 
-import java.io.File
-
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.CarbonMetadata
-import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
 
 class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
   var timeStampPropOrig: String = _
@@ -227,11 +223,7 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql("insert overwrite table CarbonOverwrite select * from THive")
     sql("insert overwrite table HiveOverwrite select * from THive")
     checkAnswer(sql("select count(*) from CarbonOverwrite"), sql("select count(*) from HiveOverwrite"))
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "carbonoverwrite")
-    val partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getAbsoluteTableIdentifier.getTablePath)
-    val folder = new File(partitionPath)
-    assert(folder.isDirectory)
-    assert(folder.list().length == 1)
+    assert(checkSegment("CarbonOverwrite"))
   }
 
   test("Load overwrite") {
@@ -249,12 +241,7 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql("LOAD DATA INPATH '" + resourcesPath + "/100_olap.csv' overwrite INTO table TCarbonSourceOverwrite options ('DELIMITER'=',', 'QUOTECHAR'='\', 'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVe
 rsion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber')")
     sql(s"LOAD DATA local INPATH '$resourcesPath/100_olap.csv' overwrite INTO TABLE HiveOverwrite")
     checkAnswer(sql("select count(*) from TCarbonSourceOverwrite"), sql("select count(*) from HiveOverwrite"))
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "tcarbonsourceoverwrite")
-    val partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getAbsoluteTableIdentifier.getTablePath)
-    val folder = new File(partitionPath)
-
-    assert(folder.isDirectory)
-    assert(folder.list().length == 1)
+    assert(checkSegment("TCarbonSourceOverwrite"))
   }
 
   test("Load overwrite fail handle") {
@@ -379,15 +366,9 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
 
 
   private def checkSegment(tableName: String) : Boolean ={
-    val storePath_t1 = s"$storeLocation/${tableName.toLowerCase()}/Fact/Part0"
-    val carbonFile_t1: CarbonFile = FileFactory
-      .getCarbonFile(storePath_t1, FileFactory.getFileType(storePath_t1))
-    var exists: Boolean = carbonFile_t1.exists()
-    if (exists) {
-      val listFiles: Array[CarbonFile] = carbonFile_t1.listFiles()
-      exists = listFiles.size > 0
-    }
-    exists
+    val storePath_t1 = s"$storeLocation/${tableName.toLowerCase()}"
+    val detailses = SegmentStatusManager.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(storePath_t1))
+    detailses.map(_.getSegmentStatus == SegmentStatus.SUCCESS).exists(f => f)
   }
 
   test("test show segments after clean files for insert overwrite") {