You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/02/27 08:19:12 UTC

[15/16] carbondata git commit: [CARBONDATA-2187][PARTITION] Partition restructure for new folder structure and supporting partition location feature

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
deleted file mode 100644
index a0ce24a..0000000
--- a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
+++ /dev/null
@@ -1,484 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.metadata;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.Serializable;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
-import org.apache.carbondata.core.fileoperations.FileWriteOperation;
-import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
-import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
-import org.apache.carbondata.core.statusmanager.SegmentStatus;
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataFileFooterConverter;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter;
-
-import com.google.gson.Gson;
-
-/**
- * Provide read and write support for partition mapping file in each segment
- */
-public class PartitionMapFileStore {
-
-  private Map<String, List<String>> partitionMap = new HashMap<>();
-
-  private boolean partionedSegment = false;
-  /**
-   * Write partitionmapp file to the segment folder with indexfilename and corresponding partitions.
-   *
-   * @param segmentPath
-   * @param taskNo
-   * @param partionNames
-   * @throws IOException
-   */
-  public void writePartitionMapFile(String segmentPath, final String taskNo,
-      List<String> partionNames) throws IOException {
-    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
-    // write partition info to new file.
-    if (carbonFile.exists() && partionNames.size() > 0) {
-      CarbonFile[] carbonFiles = carbonFile.listFiles(new CarbonFileFilter() {
-        @Override public boolean accept(CarbonFile file) {
-          return file.getName().startsWith(taskNo) && file.getName()
-              .endsWith(CarbonTablePath.INDEX_FILE_EXT);
-        }
-      });
-      if (carbonFiles != null && carbonFiles.length > 0) {
-        PartitionMapper partitionMapper = new PartitionMapper();
-        Map<String, List<String>> partitionMap = new HashMap<>();
-        partitionMap.put(carbonFiles[0].getName(), partionNames);
-        partitionMapper.setPartitionMap(partitionMap);
-        String path = segmentPath + "/" + taskNo + CarbonTablePath.PARTITION_MAP_EXT;
-        writePartitionFile(partitionMapper, path);
-      }
-    }
-  }
-
-  private void writePartitionFile(PartitionMapper partitionMapper, String path) throws IOException {
-    AtomicFileOperations fileWrite =
-        new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
-    BufferedWriter brWriter = null;
-    DataOutputStream dataOutputStream = null;
-    Gson gsonObjectToWrite = new Gson();
-    try {
-      dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
-      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
-          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
-
-      String metadataInstance = gsonObjectToWrite.toJson(partitionMapper);
-      brWriter.write(metadataInstance);
-    } finally {
-      if (null != brWriter) {
-        brWriter.flush();
-      }
-      CarbonUtil.closeStreams(brWriter);
-      fileWrite.close();
-    }
-  }
-
-  /**
-   * Merge all partition files in a segment to single file.
-   *
-   * @param segmentPath
-   * @throws IOException
-   */
-  public void mergePartitionMapFiles(String segmentPath, String mergeFileName) throws IOException {
-    CarbonFile[] partitionFiles = getPartitionFiles(segmentPath);
-    if (partitionFiles != null && partitionFiles.length > 0) {
-      PartitionMapper partitionMapper = null;
-      for (CarbonFile file : partitionFiles) {
-        PartitionMapper localMapper = readPartitionMap(file.getAbsolutePath());
-        if (partitionMapper == null && localMapper != null) {
-          partitionMapper = localMapper;
-        }
-        if (localMapper != null) {
-          partitionMapper = partitionMapper.merge(localMapper);
-        }
-      }
-      if (partitionMapper != null) {
-        String path = segmentPath + "/" + mergeFileName + CarbonTablePath.PARTITION_MAP_EXT;
-        writePartitionFile(partitionMapper, path);
-        for (CarbonFile file : partitionFiles) {
-          if (!FileFactory.deleteAllCarbonFilesOfDir(file)) {
-            throw new IOException("Old partition map files cannot be deleted");
-          }
-        }
-      }
-    }
-  }
-
-  private String getPartitionFilePath(String segmentPath) {
-    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
-    if (carbonFile.exists()) {
-      CarbonFile[] partitionFiles = carbonFile.listFiles(new CarbonFileFilter() {
-        @Override public boolean accept(CarbonFile file) {
-          return file.getName().endsWith(CarbonTablePath.PARTITION_MAP_EXT);
-        }
-      });
-      if (partitionFiles != null && partitionFiles.length > 0) {
-        partionedSegment = true;
-        int i = 0;
-        // Get the latest partition map file based on the timestamp of that file.
-        long[] partitionTimestamps = new long[partitionFiles.length];
-        for (CarbonFile file : partitionFiles) {
-          partitionTimestamps[i++] = Long.parseLong(file.getName()
-              .substring(0, file.getName().length() - CarbonTablePath.PARTITION_MAP_EXT.length()));
-        }
-        Arrays.sort(partitionTimestamps);
-        return segmentPath + "/" + partitionTimestamps[partitionTimestamps.length - 1]
-            + CarbonTablePath.PARTITION_MAP_EXT;
-      }
-    }
-    return null;
-  }
-
-  private String getPartitionFilePath(CarbonFile[] carbonFiles, String segmentPath) {
-
-    List<CarbonFile> partitionFiles = new ArrayList<>();
-    for (CarbonFile file : carbonFiles) {
-      if (file.getName().endsWith(CarbonTablePath.PARTITION_MAP_EXT)) {
-        partitionFiles.add(file);
-      }
-    }
-    if (partitionFiles.size() > 0) {
-      partionedSegment = true;
-      int i = 0;
-      // Get the latest partition map file based on the timestamp of that file.
-      long[] partitionTimestamps = new long[partitionFiles.size()];
-      for (CarbonFile file : partitionFiles) {
-        partitionTimestamps[i++] = Long.parseLong(file.getName()
-            .substring(0, file.getName().length() - CarbonTablePath.PARTITION_MAP_EXT.length()));
-      }
-      Arrays.sort(partitionTimestamps);
-      return segmentPath + "/" + partitionTimestamps[partitionTimestamps.length - 1]
-          + CarbonTablePath.PARTITION_MAP_EXT;
-    }
-    return null;
-  }
-
-  private CarbonFile[] getPartitionFiles(String segmentPath) {
-    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
-    if (carbonFile.exists()) {
-      return carbonFile.listFiles(new CarbonFileFilter() {
-        @Override public boolean accept(CarbonFile file) {
-          return file.getName().endsWith(CarbonTablePath.PARTITION_MAP_EXT);
-        }
-      });
-    }
-    return null;
-  }
-
-  /**
-   * This method reads the partition file
-   *
-   * @param partitionMapPath
-   * @return
-   */
-  private PartitionMapper readPartitionMap(String partitionMapPath) throws IOException {
-    Gson gsonObjectToRead = new Gson();
-    DataInputStream dataInputStream = null;
-    BufferedReader buffReader = null;
-    InputStreamReader inStream = null;
-    PartitionMapper partitionMapper;
-    AtomicFileOperations fileOperation =
-        new AtomicFileOperationsImpl(partitionMapPath, FileFactory.getFileType(partitionMapPath));
-
-    try {
-      if (!FileFactory.isFileExist(partitionMapPath, FileFactory.getFileType(partitionMapPath))) {
-        return null;
-      }
-      dataInputStream = fileOperation.openForRead();
-      inStream = new InputStreamReader(dataInputStream,
-          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
-      buffReader = new BufferedReader(inStream);
-      partitionMapper = gsonObjectToRead.fromJson(buffReader, PartitionMapper.class);
-    } finally {
-      if (inStream != null) {
-        CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
-      }
-    }
-
-    return partitionMapper;
-  }
-
-  /**
-   * Reads all partitions which existed inside the passed segment path
-   * @param segmentPath
-   */
-  public void readAllPartitionsOfSegment(String segmentPath) throws IOException {
-    String partitionFilePath = getPartitionFilePath(segmentPath);
-    if (partitionFilePath != null) {
-      partionedSegment = true;
-      PartitionMapper partitionMapper = readPartitionMap(partitionFilePath);
-      partitionMap.putAll(partitionMapper.getPartitionMap());
-    }
-  }
-
-  /**
-   * Reads all partitions which existed inside the passed segment path
-   * @param carbonFiles
-   */
-  public void readAllPartitionsOfSegment(CarbonFile[] carbonFiles, String segmentPath)
-      throws IOException {
-    String partitionFilePath = getPartitionFilePath(carbonFiles, segmentPath);
-    if (partitionFilePath != null) {
-      partionedSegment = true;
-      PartitionMapper partitionMapper = readPartitionMap(partitionFilePath);
-      partitionMap.putAll(partitionMapper.getPartitionMap());
-    }
-  }
-
-  public boolean isPartionedSegment() {
-    return partionedSegment;
-  }
-
-  /**
-   * Drops the partitions from the partition mapper file of the segment and writes to a new file.
-   * @param segmentPath
-   * @param partitionsToDrop
-   * @param uniqueId
-   * @param partialMatch  If it is true then even the partial partition spec matches also can be
-   *                      dropped
-   * @throws IOException
-   */
-  public void dropPartitions(String segmentPath, List<List<String>> partitionsToDrop,
-      String uniqueId, boolean partialMatch) throws IOException {
-    readAllPartitionsOfSegment(segmentPath);
-    List<String> indexesToDrop = new ArrayList<>();
-    for (Map.Entry<String, List<String>> entry: partitionMap.entrySet()) {
-      for (List<String> partitions: partitionsToDrop) {
-        if (partialMatch) {
-          if (entry.getValue().containsAll(partitions)) {
-            indexesToDrop.add(entry.getKey());
-          }
-        } else {
-          if (partitions.containsAll(entry.getValue())) {
-            indexesToDrop.add(entry.getKey());
-          }
-        }
-      }
-    }
-    if (indexesToDrop.size() > 0) {
-      // Remove the indexes from partition map
-      for (String indexToDrop : indexesToDrop) {
-        partitionMap.remove(indexToDrop);
-      }
-      PartitionMapper mapper = new PartitionMapper();
-      mapper.setPartitionMap(partitionMap);
-      String path = segmentPath + "/" + uniqueId + CarbonTablePath.PARTITION_MAP_EXT + ".tmp";
-      writePartitionFile(mapper, path);
-    }
-  }
-
-  /**
-   * It deletes the old partition mapper files in case of success. And in case of failure it removes
-   * the old new file.
-   * @param segmentPath
-   * @param uniqueId
-   * @param success
-   */
-  public void commitPartitions(String segmentPath, final String uniqueId, boolean success,
-      String tablePath, List<String> partitionsToDrop) {
-    CarbonFile carbonFile = FileFactory
-        .getCarbonFile(segmentPath + "/" + uniqueId + CarbonTablePath.PARTITION_MAP_EXT + ".tmp");
-    CarbonFile carbonPartFile = FileFactory
-        .getCarbonFile(tablePath + "/" + partitionsToDrop.get(0));
-    // write partition info to new file.
-    if (carbonFile.exists()) {
-      if (success) {
-        carbonFile.renameForce(segmentPath + "/" + uniqueId + CarbonTablePath.PARTITION_MAP_EXT);
-      } else {
-        carbonFile.delete();
-      }
-    }
-    //Remove the partition directory from table path
-    carbonPartFile.delete();
-  }
-
-  /**
-   * Clean up invalid data after drop partition in all segments of table
-   * @param table
-   * @param currentPartitions Current partitions of table
-   * @param forceDelete Whether it should be deleted force or check the time for an hour creation
-   *                    to delete data.
-   * @throws IOException
-   */
-  public void cleanSegments(
-      CarbonTable table,
-      List<String> currentPartitions,
-      boolean forceDelete) throws IOException {
-    SegmentStatusManager ssm = new SegmentStatusManager(table.getAbsoluteTableIdentifier());
-
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(table.getAbsoluteTableIdentifier().getTablePath(),
-            table.getAbsoluteTableIdentifier().getCarbonTableIdentifier());
-
-    LoadMetadataDetails[] details = ssm.readLoadMetadata(table.getMetaDataFilepath());
-    // scan through each segment.
-    List<String> segmentsNeedToBeDeleted = new ArrayList<>();
-    for (LoadMetadataDetails segment : details) {
-
-      // if this segment is valid then only we will go for deletion of related
-      // dropped partition files. if the segment is mark for delete or compacted then any way
-      // it will get deleted.
-
-      if (segment.getSegmentStatus() == SegmentStatus.SUCCESS
-          || segment.getSegmentStatus() == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
-        List<String> toBeDeletedIndexFiles = new ArrayList<>();
-        List<String> toBeDeletedDataFiles = new ArrayList<>();
-        // take the list of files from this segment.
-        String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment.getLoadName());
-        String partitionFilePath = getPartitionFilePath(segmentPath);
-        if (partitionFilePath != null) {
-          PartitionMapper partitionMapper = readPartitionMap(partitionFilePath);
-          if (partitionMapper.partitionMap.size() == 0) {
-            // There is no partition information, it means all partitions are dropped.
-            // So segment need to be marked as delete.
-            segmentsNeedToBeDeleted.add(segment.getLoadName());
-            continue;
-          }
-          DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
-          SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
-          indexFileStore.readAllIIndexOfSegment(segmentPath);
-          Set<String> indexFilesFromSegment = indexFileStore.getCarbonIndexMap().keySet();
-          for (String indexFile : indexFilesFromSegment) {
-            // Check the partition information in the partiton mapper
-            List<String> indexPartitions = partitionMapper.partitionMap.get(indexFile);
-            if (indexPartitions == null || !currentPartitions.containsAll(indexPartitions)) {
-              Long fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(indexFile
-                  .substring(indexFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
-                      indexFile.length() - CarbonTablePath.INDEX_FILE_EXT.length()));
-              if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimestamp) || forceDelete) {
-                toBeDeletedIndexFiles.add(indexFile);
-                // Add the corresponding carbondata files to the delete list.
-                byte[] fileData = indexFileStore.getFileData(indexFile);
-                List<DataFileFooter> indexInfo =
-                    fileFooterConverter.getIndexInfo(segmentPath + "/" + indexFile, fileData);
-                for (DataFileFooter footer : indexInfo) {
-                  toBeDeletedDataFiles.add(footer.getBlockInfo().getTableBlockInfo().getFilePath());
-                }
-              }
-            }
-          }
-
-          if (toBeDeletedIndexFiles.size() > 0) {
-            indexFilesFromSegment.removeAll(toBeDeletedIndexFiles);
-            new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentPath,
-                new ArrayList<String>(indexFilesFromSegment));
-            for (String dataFile : toBeDeletedDataFiles) {
-              FileFactory.deleteFile(dataFile, FileFactory.getFileType(dataFile));
-            }
-          }
-          CarbonFile[] partitionFiles = getPartitionFiles(segmentPath);
-          CarbonFile currentPartitionFile = FileFactory.getCarbonFile(partitionFilePath);
-          if (partitionFiles != null) {
-            // Delete all old partition files
-            for (CarbonFile partitionFile : partitionFiles) {
-              if (!currentPartitionFile.getName().equalsIgnoreCase(partitionFile.getName())) {
-                long fileTimeStamp = Long.parseLong(partitionFile.getName().substring(0,
-                    partitionFile.getName().length() - CarbonTablePath.PARTITION_MAP_EXT.length()));
-                if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimeStamp) || forceDelete) {
-                  partitionFile.delete();
-                }
-              }
-            }
-          }
-          partitionMapper = readPartitionMap(partitionFilePath);
-          if (partitionMapper != null) {
-            // delete partition map if there is no partition files exist
-            if (partitionMapper.partitionMap.size() == 0) {
-              currentPartitionFile.delete();
-            }
-          }
-        }
-      }
-    }
-    // If any segments that are required to delete
-    if (segmentsNeedToBeDeleted.size() > 0) {
-      try {
-        // Mark the segments as delete.
-        SegmentStatusManager.updateDeletionStatus(
-            table.getAbsoluteTableIdentifier(),
-            segmentsNeedToBeDeleted,
-            table.getMetaDataFilepath());
-      } catch (Exception e) {
-        throw new IOException(e);
-      }
-    }
-  }
-
-  public List<String> getPartitions(String indexFileName) {
-    return partitionMap.get(indexFileName);
-  }
-
-  public Map<String, List<String>> getPartitionMap() {
-    return partitionMap;
-  }
-
-  public static class PartitionMapper implements Serializable {
-
-    private static final long serialVersionUID = 3582245668420401089L;
-
-    private Map<String, List<String>> partitionMap;
-
-    public PartitionMapper merge(PartitionMapper mapper) {
-      if (this == mapper) {
-        return this;
-      }
-      if (partitionMap != null && mapper.partitionMap != null) {
-        partitionMap.putAll(mapper.partitionMap);
-      }
-      if (partitionMap == null) {
-        partitionMap = mapper.partitionMap;
-      }
-      return this;
-    }
-
-    public Map<String, List<String>> getPartitionMap() {
-      return partitionMap;
-    }
-
-    public void setPartitionMap(Map<String, List<String>> partitionMap) {
-      this.partitionMap = partitionMap;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
new file mode 100644
index 0000000..b5f5a25
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -0,0 +1,744 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Provide read and write support for segment file associated with each segment
+ */
+public class SegmentFileStore {
+
+  private SegmentFile segmentFile;
+
+  /**
+   * Here key folder path and values are index files in it.
+   */
+  private Map<String, List<String>> indexFilesMap;
+
+  private String tablePath;
+
+  public SegmentFileStore(String tablePath, String segmentFileName) throws IOException {
+    this.tablePath = tablePath;
+    this.segmentFile = readSegment(tablePath, segmentFileName);
+  }
+
+  /**
+   * Write segment information to the segment folder with indexfilename and
+   * corresponding partitions.
+   */
+  public static void writeSegmentFile(String tablePath, final String taskNo, String location,
+      String timeStamp, List<String> partionNames) throws IOException {
+    String tempFolderLoc = timeStamp + ".tmp";
+    String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
+    CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
+    if (!carbonFile.exists()) {
+      carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
+    }
+    CarbonFile tempFolder =
+        FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
+
+    if (tempFolder.exists() && partionNames.size() > 0) {
+      CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() {
+        @Override public boolean accept(CarbonFile file) {
+          return file.getName().startsWith(taskNo) && file.getName()
+              .endsWith(CarbonTablePath.INDEX_FILE_EXT);
+        }
+      });
+      if (carbonFiles != null && carbonFiles.length > 0) {
+        boolean isRelative = false;
+        if (location.startsWith(tablePath)) {
+          location = location.substring(tablePath.length(), location.length());
+          isRelative = true;
+        }
+        SegmentFile segmentFile = new SegmentFile();
+        Map<String, FolderDetails> locationMap = new HashMap<>();
+        FolderDetails folderDetails = new FolderDetails();
+        folderDetails.setRelative(isRelative);
+        folderDetails.setPartitions(partionNames);
+        folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+        for (CarbonFile file : carbonFiles) {
+          folderDetails.getFiles().add(file.getName());
+        }
+        locationMap.put(location, folderDetails);
+        segmentFile.setLocationMap(locationMap);
+        String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT;
+        // write segment info to new file.
+        writeSegmentFile(segmentFile, path);
+      }
+    }
+  }
+
+  /**
+   * Writes the segment file in json format
+   * @param segmentFile
+   * @param path
+   * @throws IOException
+   */
+  public static void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException {
+    AtomicFileOperations fileWrite =
+        new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
+    BufferedWriter brWriter = null;
+    DataOutputStream dataOutputStream = null;
+    Gson gsonObjectToWrite = new Gson();
+    try {
+      dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
+      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+      String metadataInstance = gsonObjectToWrite.toJson(segmentFile);
+      brWriter.write(metadataInstance);
+      brWriter.flush();
+    } finally {
+      CarbonUtil.closeStreams(brWriter);
+      fileWrite.close();
+    }
+  }
+
+  /**
+   * Merge all segment files in a segment to single file.
+   *
+   * @throws IOException
+   */
+  public static SegmentFile mergeSegmentFiles(String readPath, String mergeFileName,
+      String writePath)
+      throws IOException {
+    CarbonFile[] segmentFiles = getSegmentFiles(readPath);
+    if (segmentFiles != null && segmentFiles.length > 0) {
+      SegmentFile segmentFile = null;
+      for (CarbonFile file : segmentFiles) {
+        SegmentFile localSegmentFile = readSegmentFile(file.getAbsolutePath());
+        if (segmentFile == null && localSegmentFile != null) {
+          segmentFile = localSegmentFile;
+        }
+        if (localSegmentFile != null) {
+          segmentFile = segmentFile.merge(localSegmentFile);
+        }
+      }
+      if (segmentFile != null) {
+        String path = writePath + "/" + mergeFileName + CarbonTablePath.SEGMENT_EXT;
+        writeSegmentFile(segmentFile, path);
+        FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath));
+      }
+      return segmentFile;
+    }
+    return null;
+  }
+
+  private static CarbonFile[] getSegmentFiles(String segmentPath) {
+    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
+    if (carbonFile.exists()) {
+      return carbonFile.listFiles(new CarbonFileFilter() {
+        @Override public boolean accept(CarbonFile file) {
+          return file.getName().endsWith(CarbonTablePath.SEGMENT_EXT);
+        }
+      });
+    }
+    return null;
+  }
+
+  /**
+   * It provides segment file only for the partitions which has physical index files.
+   *
+   * @param partitionSpecs
+   */
+  public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath,
+      List<PartitionSpec> partitionSpecs) {
+    SegmentFile segmentFile = null;
+    for (PartitionSpec spec : partitionSpecs) {
+      String location = spec.getLocation().toString();
+      CarbonFile carbonFile = FileFactory.getCarbonFile(location);
+
+      CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
+        @Override public boolean accept(CarbonFile file) {
+          return CarbonTablePath.isCarbonIndexFile(file.getAbsolutePath());
+        }
+      });
+      if (listFiles != null && listFiles.length > 0) {
+        boolean isRelative = false;
+        if (location.startsWith(tablePath)) {
+          location = location.substring(tablePath.length(), location.length());
+          isRelative = true;
+        }
+        SegmentFile localSegmentFile = new SegmentFile();
+        Map<String, FolderDetails> locationMap = new HashMap<>();
+        FolderDetails folderDetails = new FolderDetails();
+        folderDetails.setRelative(isRelative);
+        folderDetails.setPartitions(spec.getPartitions());
+        folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+        for (CarbonFile file : listFiles) {
+          if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+            folderDetails.setMergeFileName(file.getName());
+          } else {
+            folderDetails.getFiles().add(file.getName());
+          }
+        }
+        locationMap.put(location, folderDetails);
+        localSegmentFile.setLocationMap(locationMap);
+        if (segmentFile == null) {
+          segmentFile = localSegmentFile;
+        } else {
+          segmentFile = segmentFile.merge(localSegmentFile);
+        }
+      }
+    }
+    return segmentFile;
+  }
+
+  /**
+   * This method reads the segment file which is written in json format
+   *
+   * @param segmentFilePath
+   * @return
+   */
+  private static SegmentFile readSegmentFile(String segmentFilePath) throws IOException {
+    Gson gsonObjectToRead = new Gson();
+    DataInputStream dataInputStream = null;
+    BufferedReader buffReader = null;
+    InputStreamReader inStream = null;
+    SegmentFile segmentFile;
+    AtomicFileOperations fileOperation =
+        new AtomicFileOperationsImpl(segmentFilePath, FileFactory.getFileType(segmentFilePath));
+
+    try {
+      if (!FileFactory.isFileExist(segmentFilePath, FileFactory.getFileType(segmentFilePath))) {
+        return null;
+      }
+      dataInputStream = fileOperation.openForRead();
+      inStream = new InputStreamReader(dataInputStream,
+          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+      buffReader = new BufferedReader(inStream);
+      segmentFile = gsonObjectToRead.fromJson(buffReader, SegmentFile.class);
+    } finally {
+      if (inStream != null) {
+        CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
+      }
+    }
+
+    return segmentFile;
+  }
+
+  /**
+   * Reads segment file.
+   */
+  private SegmentFile readSegment(String tablePath, String segmentFileName) throws IOException {
+    String segmentFilePath =
+        CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
+            + segmentFileName;
+    return readSegmentFile(segmentFilePath);
+  }
+
+  public String getTablePath() {
+    return tablePath;
+  }
+
+  /**
+   * Gets all the index files and related carbondata files from this segment. First user needs to
+   * call @readIndexFiles method before calling it.
+   * @return
+   */
+  public Map<String, List<String>> getIndexFilesMap() {
+    return indexFilesMap;
+  }
+
+  /**
+   * Reads all index files which are located in this segment. First user needs to call
+   * @readSegment method before calling it.
+   * @throws IOException
+   */
+  public void readIndexFiles() throws IOException {
+    readIndexFiles(SegmentStatus.SUCCESS, false);
+  }
+
+  /**
+   * Reads all index files as per the status of the file. In case of @ignoreStatus is true it just
+   * reads all index files
+   * @param status
+   * @param ignoreStatus
+   * @throws IOException
+   */
+  private void readIndexFiles(SegmentStatus status, boolean ignoreStatus) throws IOException {
+    if (indexFilesMap != null) {
+      return;
+    }
+    SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+    indexFilesMap = new HashMap<>();
+    indexFileStore.readAllIIndexOfSegment(this, status, ignoreStatus);
+    Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath();
+    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+    for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
+      List<DataFileFooter> indexInfo =
+          fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue());
+      List<String> blocks = new ArrayList<>();
+      for (DataFileFooter footer : indexInfo) {
+        blocks.add(footer.getBlockInfo().getTableBlockInfo().getFilePath());
+      }
+      indexFilesMap.put(entry.getKey(), blocks);
+    }
+  }
+
+  /**
+   * Gets all index files from this segment
+   * @return
+   */
+  public Map<String, String> getIndexFiles() {
+    Map<String, String> indexFiles = new HashMap<>();
+    if (segmentFile != null) {
+      for (Map.Entry<String, FolderDetails> entry : getLocationMap().entrySet()) {
+        String location = entry.getKey();
+        if (entry.getValue().isRelative) {
+          location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+        }
+        if (entry.getValue().status.equals(SegmentStatus.SUCCESS.getMessage())) {
+          for (String indexFile : entry.getValue().getFiles()) {
+            indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR + indexFile,
+                entry.getValue().mergeFileName);
+          }
+        }
+      }
+    }
+    return indexFiles;
+  }
+
+  /**
+   * Drops the partition related files from the segment file of the segment and writes
+   * to a new file. First iterator over segment file and check the path it needs to be dropped.
+   * And update the status with delete if it found.
+   *
+   * @param uniqueId
+   * @throws IOException
+   */
+  public void dropPartitions(Segment segment, List<PartitionSpec> partitionSpecs,
+      String uniqueId, List<String> toBeDeletedSegments, List<String> toBeUpdatedSegments)
+      throws IOException {
+    readSegment(tablePath, segment.getSegmentFileName());
+    boolean updateSegment = false;
+    for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) {
+      String location = entry.getKey();
+      if (entry.getValue().isRelative) {
+        location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+      }
+      Path path = new Path(location);
+      // Update the status to delete if path equals
+      for (PartitionSpec spec : partitionSpecs) {
+        if (path.equals(spec.getLocation())) {
+          entry.getValue().setStatus(SegmentStatus.MARKED_FOR_DELETE.getMessage());
+          updateSegment = true;
+          break;
+        }
+      }
+    }
+    String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath);
+    writePath =
+        writePath + CarbonCommonConstants.FILE_SEPARATOR + segment.getSegmentNo() + "_" + uniqueId
+            + CarbonTablePath.SEGMENT_EXT;
+    writeSegmentFile(segmentFile, writePath);
+    // Check whether we can completly remove the segment.
+    boolean deleteSegment = true;
+    for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) {
+      if (entry.getValue().getStatus().equals(SegmentStatus.SUCCESS.getMessage())) {
+        deleteSegment = false;
+        break;
+      }
+    }
+    if (deleteSegment) {
+      toBeDeletedSegments.add(segment.getSegmentNo());
+    }
+    if (updateSegment) {
+      toBeUpdatedSegments.add(segment.getSegmentNo());
+    }
+  }
+
+  /**
+   * Update the table status file with the dropped partitions information
+   *
+   * @param carbonTable
+   * @param uniqueId
+   * @param toBeUpdatedSegments
+   * @param toBeDeleteSegments
+   * @throws IOException
+   */
+  public static void commitDropPartitions(CarbonTable carbonTable, String uniqueId,
+      List<String> toBeUpdatedSegments, List<String> toBeDeleteSegments) throws IOException {
+    if (toBeDeleteSegments.size() > 0 || toBeUpdatedSegments.size() > 0) {
+      Set<Segment> segmentSet = new HashSet<>(
+          new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier())
+              .getValidAndInvalidSegments().getValidSegments());
+      CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, uniqueId, true,
+          Segment.toSegmentList(toBeDeleteSegments), Segment.toSegmentList(toBeUpdatedSegments));
+    }
+  }
+
+  /**
+   * Clean up invalid data after drop partition in all segments of table
+   *
+   * @param table
+   * @param forceDelete Whether it should be deleted force or check the time for an hour creation
+   *                    to delete data.
+   * @throws IOException
+   */
+  public static void cleanSegments(CarbonTable table, List<PartitionSpec> partitionSpecs,
+      boolean forceDelete) throws IOException {
+
+    LoadMetadataDetails[] details =
+        SegmentStatusManager.readLoadMetadata(table.getMetaDataFilepath());
+    // scan through each segment.
+    for (LoadMetadataDetails segment : details) {
+      // if this segment is valid then only we will go for deletion of related
+      // dropped partition files. if the segment is mark for delete or compacted then any way
+      // it will get deleted.
+
+      if ((segment.getSegmentStatus() == SegmentStatus.SUCCESS
+          || segment.getSegmentStatus() == SegmentStatus.LOAD_PARTIAL_SUCCESS)
+          && segment.getSegmentFile() != null) {
+        List<String> toBeDeletedIndexFiles = new ArrayList<>();
+        List<String> toBeDeletedDataFiles = new ArrayList<>();
+        // take the list of files from this segment.
+        SegmentFileStore fileStore =
+            new SegmentFileStore(table.getTablePath(), segment.getSegmentFile());
+        fileStore.readIndexFiles(SegmentStatus.MARKED_FOR_DELETE, false);
+        if (forceDelete) {
+          deletePhysicalPartition(partitionSpecs, fileStore.getIndexFilesMap());
+        }
+        for (Map.Entry<String, List<String>> entry : fileStore.indexFilesMap.entrySet()) {
+          String indexFile = entry.getKey();
+          // Check the partition information in the partiton mapper
+          Long fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(indexFile
+              .substring(indexFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
+                  indexFile.length() - CarbonTablePath.INDEX_FILE_EXT.length()));
+          if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimestamp) || forceDelete) {
+            toBeDeletedIndexFiles.add(indexFile);
+            // Add the corresponding carbondata files to the delete list.
+            toBeDeletedDataFiles.addAll(entry.getValue());
+          }
+        }
+        if (toBeDeletedIndexFiles.size() > 0) {
+          for (String dataFile : toBeDeletedIndexFiles) {
+            FileFactory.deleteFile(dataFile, FileFactory.getFileType(dataFile));
+          }
+          for (String dataFile : toBeDeletedDataFiles) {
+            FileFactory.deleteFile(dataFile, FileFactory.getFileType(dataFile));
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Deletes the segment file and its physical files like partition folders from disk
+   * @param tablePath
+   * @param segmentFile
+   * @param partitionSpecs
+   * @throws IOException
+   */
+  public static void deleteSegment(String tablePath, String segmentFile,
+      List<PartitionSpec> partitionSpecs) throws IOException {
+    SegmentFileStore fileStore = new SegmentFileStore(tablePath, segmentFile);
+    fileStore.readIndexFiles(SegmentStatus.SUCCESS, true);
+    Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+    for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+      FileFactory.deleteFile(entry.getKey(), FileFactory.getFileType(entry.getKey()));
+      for (String file : entry.getValue()) {
+        FileFactory.deleteFile(file, FileFactory.getFileType(file));
+      }
+    }
+    deletePhysicalPartition(partitionSpecs, indexFilesMap);
+    String segmentFilePath =
+        CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
+            + segmentFile;
+    // Deletes the physical segment file
+    FileFactory.deleteFile(segmentFilePath, FileFactory.getFileType(segmentFilePath));
+  }
+
+  private static void deletePhysicalPartition(List<PartitionSpec> partitionSpecs,
+      Map<String, List<String>> locationMap) {
+    for (Map.Entry<String, List<String>> entry : locationMap.entrySet()) {
+      Path location = new Path(entry.getKey()).getParent();
+      boolean exists = pathExistsInPartitionSpec(partitionSpecs, location);
+      if (!exists) {
+        FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location.toString()));
+      }
+    }
+  }
+
+  private static boolean pathExistsInPartitionSpec(List<PartitionSpec> partitionSpecs,
+      Path partitionPath) {
+    for (PartitionSpec spec : partitionSpecs) {
+      if (spec.getLocation().equals(partitionPath)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Get the partition specs of the segment
+   * @param segmentId
+   * @param tablePath
+   * @return
+   * @throws IOException
+   */
+  public static List<PartitionSpec> getPartitionSpecs(String segmentId, String tablePath)
+      throws IOException {
+    LoadMetadataDetails segEntry = null;
+    LoadMetadataDetails[] details =
+        SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(tablePath));
+    for (LoadMetadataDetails entry : details) {
+      if (entry.getLoadName().equals(segmentId)) {
+        segEntry = entry;
+        break;
+      }
+    }
+    if (segEntry != null && segEntry.getSegmentFile() != null) {
+      SegmentFileStore fileStore = new SegmentFileStore(tablePath, segEntry.getSegmentFile());
+      List<PartitionSpec> partitionSpecs = fileStore.getPartitionSpecs();
+      for (PartitionSpec spec : partitionSpecs) {
+        spec.setUuid(segmentId + "_" + segEntry.getLoadStartTime());
+      }
+      return partitionSpecs;
+    }
+    return null;
+  }
+
+  /**
+   * Move the loaded data from temp folder to respective partition folder.
+   * @param segmentFile
+   * @param tmpFolder
+   * @param tablePath
+   */
+  public static void moveFromTempFolder(SegmentFile segmentFile, String tmpFolder,
+      String tablePath) {
+
+    for (Map.Entry<String, SegmentFileStore.FolderDetails> entry : segmentFile.getLocationMap()
+        .entrySet()) {
+      String location = entry.getKey();
+      if (entry.getValue().isRelative()) {
+        location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+      }
+      CarbonFile oldFolder =
+          FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tmpFolder);
+      CarbonFile[] oldFiles = oldFolder.listFiles();
+      for (CarbonFile file : oldFiles) {
+        file.renameForce(location + CarbonCommonConstants.FILE_SEPARATOR + file.getName());
+      }
+      oldFolder.delete();
+    }
+  }
+
+  /**
+   * Remove temp stage folder in case of job aborted.
+   *
+   * @param locationMap
+   * @param tmpFolder
+   * @param tablePath
+   */
+  public static void removeTempFolder(Map<String, FolderDetails> locationMap, String tmpFolder,
+      String tablePath) {
+    if (locationMap == null) {
+      return;
+    }
+    for (Map.Entry<String, SegmentFileStore.FolderDetails> entry : locationMap.entrySet()) {
+      String location = entry.getKey();
+      if (entry.getValue().isRelative()) {
+        location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+      }
+      CarbonFile oldFolder =
+          FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tmpFolder);
+      if (oldFolder.exists()) {
+        FileFactory.deleteAllCarbonFilesOfDir(oldFolder);
+      }
+    }
+  }
+
+  /**
+   * Returns content of segment
+   * @return
+   */
+  public Map<String, FolderDetails> getLocationMap() {
+    if (segmentFile == null) {
+      return new HashMap<>();
+    }
+    return segmentFile.getLocationMap();
+  }
+
+  /**
+   * Returs the current partition specs of this segment
+   * @return
+   */
+  public List<PartitionSpec> getPartitionSpecs() {
+    List<PartitionSpec> partitionSpecs = new ArrayList<>();
+    if (segmentFile != null) {
+      for (Map.Entry<String, FolderDetails> entry : segmentFile.getLocationMap().entrySet()) {
+        String location = entry.getKey();
+        if (entry.getValue().isRelative) {
+          location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
+        }
+        if (entry.getValue().getStatus().equals(SegmentStatus.SUCCESS.getMessage())) {
+          partitionSpecs.add(new PartitionSpec(entry.getValue().partitions, location));
+        }
+      }
+    }
+    return partitionSpecs;
+  }
+
+  /**
+   * It contains the segment information like location, partitions and related index files
+   */
+  public static class SegmentFile implements Serializable {
+
+    private static final long serialVersionUID = 3582245668420401089L;
+
+    private Map<String, FolderDetails> locationMap;
+
+    public SegmentFile merge(SegmentFile mapper) {
+      if (this == mapper) {
+        return this;
+      }
+      if (locationMap != null && mapper.locationMap != null) {
+        for (Map.Entry<String, FolderDetails> entry : mapper.locationMap.entrySet()) {
+          FolderDetails folderDetails = locationMap.get(entry.getKey());
+          if (folderDetails != null) {
+            folderDetails.merge(entry.getValue());
+          } else {
+            locationMap.put(entry.getKey(), entry.getValue());
+          }
+        }
+      }
+      if (locationMap == null) {
+        locationMap = mapper.locationMap;
+      }
+      return this;
+    }
+
+    public Map<String, FolderDetails> getLocationMap() {
+      return locationMap;
+    }
+
+    public void setLocationMap(Map<String, FolderDetails> locationMap) {
+      this.locationMap = locationMap;
+    }
+  }
+
+  /**
+   * Represents one partition folder
+   */
+  public static class FolderDetails implements Serializable {
+
+    private static final long serialVersionUID = 501021868886928553L;
+
+    private Set<String> files = new HashSet<>();
+
+    private List<String> partitions = new ArrayList<>();
+
+    private String status;
+
+    private String mergeFileName;
+
+    private boolean isRelative;
+
+    public FolderDetails merge(FolderDetails folderDetails) {
+      if (this == folderDetails || folderDetails == null) {
+        return this;
+      }
+      if (folderDetails.files != null) {
+        files.addAll(folderDetails.files);
+      }
+      if (files == null) {
+        files = folderDetails.files;
+      }
+      partitions = folderDetails.partitions;
+      return this;
+    }
+
+    public Set<String> getFiles() {
+      return files;
+    }
+
+    public void setFiles(Set<String> files) {
+      this.files = files;
+    }
+
+    public List<String> getPartitions() {
+      return partitions;
+    }
+
+    public void setPartitions(List<String> partitions) {
+      this.partitions = partitions;
+    }
+
+    public boolean isRelative() {
+      return isRelative;
+    }
+
+    public void setRelative(boolean relative) {
+      isRelative = relative;
+    }
+
+    public String getMergeFileName() {
+      return mergeFileName;
+    }
+
+    public void setMergeFileName(String mergeFileName) {
+      this.mergeFileName = mergeFileName;
+    }
+
+    public String getStatus() {
+      return status;
+    }
+
+    public void setStatus(String status) {
+      this.status = status;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index c5f61c2..de98fa8 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -82,14 +83,16 @@ public class CarbonUpdateUtil {
    * @param factPath
    * @return
    */
-  public static String getTableBlockPath(String tid, String factPath) {
-    String part =
-            CarbonTablePath.addPartPrefix(getRequiredFieldFromTID(tid, TupleIdEnum.PART_ID));
+  public static String getTableBlockPath(String tid, String factPath, boolean isPartitionTable) {
+    String partField = getRequiredFieldFromTID(tid, TupleIdEnum.PART_ID);
+    if (isPartitionTable) {
+      return factPath + CarbonCommonConstants.FILE_SEPARATOR + partField;
+    }
+    String part = CarbonTablePath.addPartPrefix(partField);
     String segment =
             CarbonTablePath.addSegmentPrefix(getRequiredFieldFromTID(tid, TupleIdEnum.SEGMENT_ID));
     return factPath + CarbonCommonConstants.FILE_SEPARATOR + part
             + CarbonCommonConstants.FILE_SEPARATOR + segment;
-
   }
 
   /**
@@ -172,6 +175,22 @@ public class CarbonUpdateUtil {
   }
 
   /**
+   * Update table status
+   * @param updatedSegmentsList
+   * @param table
+   * @param updatedTimeStamp
+   * @param isTimestampUpdationRequired
+   * @param segmentsToBeDeleted
+   * @return
+   */
+  public static boolean updateTableMetadataStatus(Set<Segment> updatedSegmentsList,
+      CarbonTable table, String updatedTimeStamp, boolean isTimestampUpdationRequired,
+      List<Segment> segmentsToBeDeleted) {
+    return updateTableMetadataStatus(updatedSegmentsList, table, updatedTimeStamp,
+        isTimestampUpdationRequired, segmentsToBeDeleted, new ArrayList<Segment>());
+  }
+
+  /**
    *
    * @param updatedSegmentsList
    * @param table
@@ -180,10 +199,9 @@ public class CarbonUpdateUtil {
    * @param segmentsToBeDeleted
    * @return
    */
-  public static boolean updateTableMetadataStatus(Set<String> updatedSegmentsList,
-                                                  CarbonTable table, String updatedTimeStamp,
-                                                  boolean isTimestampUpdationRequired,
-                                                  List<String> segmentsToBeDeleted) {
+  public static boolean updateTableMetadataStatus(Set<Segment> updatedSegmentsList,
+      CarbonTable table, String updatedTimeStamp, boolean isTimestampUpdationRequired,
+      List<Segment> segmentsToBeDeleted, List<Segment> segmentFilesTobeUpdated) {
 
     boolean status = false;
 
@@ -221,13 +239,13 @@ public class CarbonUpdateUtil {
             }
 
             // if the segments is in the list of marked for delete then update the status.
-            if (segmentsToBeDeleted.contains(loadMetadata.getLoadName())) {
+            if (segmentsToBeDeleted.contains(new Segment(loadMetadata.getLoadName(), null))) {
               loadMetadata.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
               loadMetadata.setModificationOrdeletionTimesStamp(Long.parseLong(updatedTimeStamp));
             }
           }
-          for (String segName : updatedSegmentsList) {
-            if (loadMetadata.getLoadName().equalsIgnoreCase(segName)) {
+          for (Segment segName : updatedSegmentsList) {
+            if (loadMetadata.getLoadName().equalsIgnoreCase(segName.getSegmentNo())) {
               // if this call is coming from the delete delta flow then the time stamp
               // String will come empty then no need to write into table status file.
               if (isTimestampUpdationRequired) {
@@ -240,6 +258,10 @@ public class CarbonUpdateUtil {
                 // update end timestamp for each time.
                 loadMetadata.setUpdateDeltaEndTimestamp(updatedTimeStamp);
               }
+              if (segmentFilesTobeUpdated.contains(Segment.toSegment(loadMetadata.getLoadName()))) {
+                loadMetadata.setSegmentFile(loadMetadata.getLoadName() + "_" + updatedTimeStamp
+                    + CarbonTablePath.SEGMENT_EXT);
+              }
             }
           }
         }
@@ -696,14 +718,14 @@ public class CarbonUpdateUtil {
    * @param segmentBlockCount
    * @return
    */
-  public static List<String> getListOfSegmentsToMarkDeleted(Map<String, Long> segmentBlockCount) {
-    List<String> segmentsToBeDeleted =
+  public static List<Segment> getListOfSegmentsToMarkDeleted(Map<String, Long> segmentBlockCount) {
+    List<Segment> segmentsToBeDeleted =
         new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
     for (Map.Entry<String, Long> eachSeg : segmentBlockCount.entrySet()) {
 
       if (eachSeg.getValue() == 0) {
-        segmentsToBeDeleted.add(eachSeg.getKey());
+        segmentsToBeDeleted.add(new Segment(eachSeg.getKey(), null));
       }
 
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 6490694..cc2e513 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -74,7 +74,6 @@ import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 
 import org.apache.commons.lang3.ArrayUtils;
 
@@ -269,7 +268,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
       blockExecutionInfoList.add(getBlockExecutionInfoForBlock(queryModel, abstractIndex,
           dataRefNode.getBlockInfos().get(0).getBlockletInfos().getStartBlockletNumber(),
           dataRefNode.numberOfNodes(), dataRefNode.getBlockInfos().get(0).getFilePath(),
-          dataRefNode.getBlockInfos().get(0).getDeletedDeltaFilePath()));
+          dataRefNode.getBlockInfos().get(0).getDeletedDeltaFilePath(),
+          dataRefNode.getBlockInfos().get(0).getSegmentId()));
     }
     if (null != queryModel.getStatisticsRecorder()) {
       QueryStatistic queryStatistic = new QueryStatistic();
@@ -291,7 +291,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
    */
   protected BlockExecutionInfo getBlockExecutionInfoForBlock(QueryModel queryModel,
       AbstractIndex blockIndex, int startBlockletIndex, int numberOfBlockletToScan, String filePath,
-      String[] deleteDeltaFiles)
+      String[] deleteDeltaFiles, String segmentId)
       throws QueryExecutionException {
     BlockExecutionInfo blockExecutionInfo = new BlockExecutionInfo();
     SegmentProperties segmentProperties = blockIndex.getSegmentProperties();
@@ -304,11 +304,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
         .createDimensionInfoAndGetCurrentBlockQueryDimension(blockExecutionInfo,
             queryModel.getQueryDimension(), tableBlockDimensions,
             segmentProperties.getComplexDimensions());
-    int tableFactPathLength = CarbonStorePath
-        .getCarbonTablePath(queryModel.getAbsoluteTableIdentifier().getTablePath(),
-            queryModel.getAbsoluteTableIdentifier().getCarbonTableIdentifier()).getFactDir()
-        .length() + 1;
-    blockExecutionInfo.setBlockId(filePath.substring(tableFactPathLength));
+    blockExecutionInfo.setBlockId(
+        CarbonUtil.getBlockId(queryModel.getAbsoluteTableIdentifier(), filePath, segmentId));
     blockExecutionInfo.setDeleteDeltaFilePath(deleteDeltaFiles);
     blockExecutionInfo.setStartBlockletIndex(startBlockletIndex);
     blockExecutionInfo.setNumberOfBlockletToScan(numberOfBlockletToScan);
@@ -457,6 +454,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     return blockExecutionInfo;
   }
 
+
+
   /**
    * This method will be used to get fixed key length size this will be used
    * to create a row from column chunk

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
index 85602bc..a0fa67d 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
@@ -123,6 +123,11 @@ public class LoadMetadataDetails implements Serializable {
    */
   private FileFormat fileFormat = FileFormat.COLUMNAR_V3;
 
+  /**
+   * Segment file name where it has the information of partition information.
+   */
+  private String segmentFile;
+
   public String getPartitionCount() {
     return partitionCount;
   }
@@ -417,4 +422,17 @@ public class LoadMetadataDetails implements Serializable {
   public void setFileFormat(FileFormat fileFormat) {
     this.fileFormat = fileFormat;
   }
+
+  public String getSegmentFile() {
+    return segmentFile;
+  }
+
+  public void setSegmentFile(String segmentFile) {
+    this.segmentFile = segmentFile;
+  }
+
+  @Override public String toString() {
+    return "LoadMetadataDetails{" + "loadStatus=" + loadStatus + ", loadName='" + loadName + '\''
+        + ", loadStartTime='" + loadStartTime + '\'' + ", segmentFile='" + segmentFile + '\'' + '}';
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 9d14c62..76c2dc7 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -33,6 +33,7 @@ import java.util.List;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
@@ -99,13 +100,14 @@ public class SegmentStatusManager {
   public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws IOException {
 
     // @TODO: move reading LoadStatus file to separate class
-    List<String> listOfValidSegments = new ArrayList<>(10);
-    List<String> listOfValidUpdatedSegments = new ArrayList<>(10);
-    List<String> listOfInvalidSegments = new ArrayList<>(10);
-    List<String> listOfStreamSegments = new ArrayList<>(10);
+    List<Segment> listOfValidSegments = new ArrayList<>(10);
+    List<Segment> listOfValidUpdatedSegments = new ArrayList<>(10);
+    List<Segment> listOfInvalidSegments = new ArrayList<>(10);
+    List<Segment> listOfStreamSegments = new ArrayList<>(10);
+    List<Segment> listOfInProgressSegments = new ArrayList<>(10);
     CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
-                    absoluteTableIdentifier.getCarbonTableIdentifier());
+        .getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier());
     String dataPath = carbonTablePath.getTableStatusFilePath();
     DataInputStream dataInputStream = null;
 
@@ -113,7 +115,7 @@ public class SegmentStatusManager {
     Gson gson = new Gson();
 
     AtomicFileOperations fileOperation =
-            new AtomicFileOperationsImpl(dataPath, FileFactory.getFileType(dataPath));
+        new AtomicFileOperationsImpl(dataPath, FileFactory.getFileType(dataPath));
     LoadMetadataDetails[] loadFolderDetailsArray;
     try {
       if (FileFactory.isFileExist(dataPath, FileFactory.getFileType(dataPath))) {
@@ -127,37 +129,44 @@ public class SegmentStatusManager {
         }
         //just directly iterate Array
         for (LoadMetadataDetails segment : loadFolderDetailsArray) {
-          if (SegmentStatus.SUCCESS == segment.getSegmentStatus() ||
-              SegmentStatus.MARKED_FOR_UPDATE == segment.getSegmentStatus() ||
-              SegmentStatus.LOAD_PARTIAL_SUCCESS == segment.getSegmentStatus() ||
-              SegmentStatus.STREAMING == segment.getSegmentStatus() ||
-              SegmentStatus.STREAMING_FINISH == segment.getSegmentStatus()) {
+          if (SegmentStatus.SUCCESS == segment.getSegmentStatus()
+              || SegmentStatus.MARKED_FOR_UPDATE == segment.getSegmentStatus()
+              || SegmentStatus.LOAD_PARTIAL_SUCCESS == segment.getSegmentStatus()
+              || SegmentStatus.STREAMING == segment.getSegmentStatus()
+              || SegmentStatus.STREAMING_FINISH == segment.getSegmentStatus()) {
             // check for merged loads.
             if (null != segment.getMergedLoadName()) {
-              if (!listOfValidSegments.contains(segment.getMergedLoadName())) {
-                listOfValidSegments.add(segment.getMergedLoadName());
+              Segment seg = new Segment(segment.getMergedLoadName(), segment.getSegmentFile());
+              if (!listOfValidSegments.contains(seg)) {
+                listOfValidSegments.add(seg);
               }
               // if merged load is updated then put it in updated list
               if (SegmentStatus.MARKED_FOR_UPDATE == segment.getSegmentStatus()) {
-                listOfValidUpdatedSegments.add(segment.getMergedLoadName());
+                listOfValidUpdatedSegments.add(seg);
               }
               continue;
             }
 
             if (SegmentStatus.MARKED_FOR_UPDATE == segment.getSegmentStatus()) {
 
-              listOfValidUpdatedSegments.add(segment.getLoadName());
+              listOfValidUpdatedSegments
+                  .add(new Segment(segment.getLoadName(), segment.getSegmentFile()));
             }
-            if (SegmentStatus.STREAMING == segment.getSegmentStatus() ||
-                SegmentStatus.STREAMING_FINISH == segment.getSegmentStatus()) {
-              listOfStreamSegments.add(segment.getLoadName());
+            if (SegmentStatus.STREAMING == segment.getSegmentStatus()
+                || SegmentStatus.STREAMING_FINISH == segment.getSegmentStatus()) {
+              listOfStreamSegments
+                  .add(new Segment(segment.getLoadName(), segment.getSegmentFile()));
               continue;
             }
-            listOfValidSegments.add(segment.getLoadName());
-          } else if ((SegmentStatus.LOAD_FAILURE == segment.getSegmentStatus() ||
-              SegmentStatus.COMPACTED == segment.getSegmentStatus() ||
-              SegmentStatus.MARKED_FOR_DELETE == segment.getSegmentStatus())) {
-            listOfInvalidSegments.add(segment.getLoadName());
+            listOfValidSegments.add(new Segment(segment.getLoadName(), segment.getSegmentFile()));
+          } else if ((SegmentStatus.LOAD_FAILURE == segment.getSegmentStatus()
+              || SegmentStatus.COMPACTED == segment.getSegmentStatus()
+              || SegmentStatus.MARKED_FOR_DELETE == segment.getSegmentStatus())) {
+            listOfInvalidSegments.add(new Segment(segment.getLoadName(), segment.getSegmentFile()));
+          } else if (SegmentStatus.INSERT_IN_PROGRESS == segment.getSegmentStatus() ||
+              SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == segment.getSegmentStatus()) {
+            listOfInProgressSegments
+                .add(new Segment(segment.getLoadName(), segment.getSegmentFile()));
           }
         }
       }
@@ -168,7 +177,7 @@ public class SegmentStatusManager {
       CarbonUtil.closeStreams(dataInputStream);
     }
     return new ValidAndInvalidSegmentsInfo(listOfValidSegments, listOfValidUpdatedSegments,
-            listOfInvalidSegments, listOfStreamSegments);
+        listOfInvalidSegments, listOfStreamSegments, listOfInProgressSegments);
   }
 
   /**
@@ -688,29 +697,35 @@ public class SegmentStatusManager {
 
 
   public static class ValidAndInvalidSegmentsInfo {
-    private final List<String> listOfValidSegments;
-    private final List<String> listOfValidUpdatedSegments;
-    private final List<String> listOfInvalidSegments;
-    private final List<String> listOfStreamSegments;
-
-    private ValidAndInvalidSegmentsInfo(List<String> listOfValidSegments,
-        List<String> listOfValidUpdatedSegments, List<String> listOfInvalidUpdatedSegments,
-        List<String> listOfStreamSegments) {
+    private final List<Segment> listOfValidSegments;
+    private final List<Segment> listOfValidUpdatedSegments;
+    private final List<Segment> listOfInvalidSegments;
+    private final List<Segment> listOfStreamSegments;
+    private final List<Segment> listOfInProgressSegments;
+
+    private ValidAndInvalidSegmentsInfo(List<Segment> listOfValidSegments,
+        List<Segment> listOfValidUpdatedSegments, List<Segment> listOfInvalidUpdatedSegments,
+        List<Segment> listOfStreamSegments, List<Segment> listOfInProgressSegments) {
       this.listOfValidSegments = listOfValidSegments;
       this.listOfValidUpdatedSegments = listOfValidUpdatedSegments;
       this.listOfInvalidSegments = listOfInvalidUpdatedSegments;
       this.listOfStreamSegments = listOfStreamSegments;
+      this.listOfInProgressSegments = listOfInProgressSegments;
     }
-    public List<String> getInvalidSegments() {
+    public List<Segment> getInvalidSegments() {
       return listOfInvalidSegments;
     }
-    public List<String> getValidSegments() {
+    public List<Segment> getValidSegments() {
       return listOfValidSegments;
     }
 
-    public List<String> getStreamSegments() {
+    public List<Segment> getStreamSegments() {
       return listOfStreamSegments;
     }
+
+    public List<Segment> getListOfInProgressSegments() {
+      return listOfInProgressSegments;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index e0e7b70..71b6ba8 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -33,6 +33,7 @@ import java.util.Map;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -70,6 +71,7 @@ public class SegmentUpdateStatusManager {
   private SegmentUpdateDetails[] updateDetails;
   private CarbonTablePath carbonTablePath;
   private Map<String, SegmentUpdateDetails> blockAndDetailsMap;
+  private boolean isPartitionTable;
 
   /**
    * @param absoluteTableIdentifier
@@ -83,6 +85,9 @@ public class SegmentUpdateStatusManager {
     // on latest file status.
     segmentDetails =
         segmentStatusManager.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath());
+    if (segmentDetails.length > 0) {
+      isPartitionTable = segmentDetails[0].getSegmentFile() != null;
+    }
     updateDetails = readLoadMetadata();
     populateMap();
   }
@@ -268,12 +273,14 @@ public class SegmentUpdateStatusManager {
    * @return all delete delta files
    * @throws Exception
    */
-  public String[] getDeleteDeltaFilePath(String blockFilePath) throws Exception {
-    int tableFactPathLength = CarbonStorePath
-        .getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
-            absoluteTableIdentifier.getCarbonTableIdentifier()).getFactDir().length() + 1;
-    String blockame = blockFilePath.substring(tableFactPathLength);
-    String tupleId = CarbonTablePath.getShortBlockId(blockame);
+  public String[] getDeleteDeltaFilePath(String blockFilePath, String segmentId) throws Exception {
+    String blockId = CarbonUtil.getBlockId(absoluteTableIdentifier, blockFilePath, segmentId);
+    String tupleId;
+    if (isPartitionTable) {
+      tupleId = CarbonTablePath.getShortBlockIdForPartitionTable(blockId);
+    } else {
+      tupleId = CarbonTablePath.getShortBlockId(blockId);
+    }
     return getDeltaFiles(tupleId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
         .toArray(new String[0]);
   }
@@ -292,12 +299,19 @@ public class SegmentUpdateStatusManager {
           .getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
               absoluteTableIdentifier.getCarbonTableIdentifier());
       String segment = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID);
-      String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment);
       String completeBlockName = CarbonTablePath.addDataPartPrefix(
           CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.BLOCK_ID)
               + CarbonCommonConstants.FACT_FILE_EXT);
-      String blockPath =
-          carbonDataDirectoryPath + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName;
+      String blockPath;
+      if (isPartitionTable) {
+        blockPath = absoluteTableIdentifier.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR
+            + CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.PART_ID)
+            .replace("#", "/") + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName;
+      } else {
+        String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment);
+        blockPath =
+            carbonDataDirectoryPath + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName;
+      }
       CarbonFile file = FileFactory.getCarbonFile(blockPath, FileFactory.getFileType(blockPath));
       if (!file.exists()) {
         throw new Exception("Invalid tuple id " + tupleId);
@@ -306,8 +320,7 @@ public class SegmentUpdateStatusManager {
       //blockName without timestamp
       final String blockNameFromTuple =
           blockNameWithoutExtn.substring(0, blockNameWithoutExtn.lastIndexOf("-"));
-      return getDeltaFiles(file, blockNameFromTuple, extension,
-          segment);
+      return getDeltaFiles(file, blockNameFromTuple, extension, segment);
     } catch (Exception ex) {
       String errorMsg = "Invalid tuple id " + tupleId;
       LOG.error(errorMsg);
@@ -418,20 +431,20 @@ public class SegmentUpdateStatusManager {
    * @param blockName
    * @return
    */
-  public CarbonFile[] getDeleteDeltaFilesList(final String segmentId, final String blockName) {
+  public CarbonFile[] getDeleteDeltaFilesList(final Segment segmentId, final String blockName) {
 
     CarbonTablePath carbonTablePath = CarbonStorePath
         .getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
             absoluteTableIdentifier.getCarbonTableIdentifier());
 
-    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
+    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId.getSegmentNo());
 
     CarbonFile segDir =
         FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
 
     for (SegmentUpdateDetails block : updateDetails) {
       if ((block.getBlockName().equalsIgnoreCase(blockName)) &&
-          (block.getSegmentName().equalsIgnoreCase(segmentId))
+          (block.getSegmentName().equalsIgnoreCase(segmentId.getSegmentNo()))
           && !CarbonUpdateUtil.isBlockInvalid((block.getSegmentStatus()))) {
         final long deltaStartTimestamp =
             getStartTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index c208154..eb0a9d7 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -47,6 +47,7 @@ import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
@@ -63,6 +64,7 @@ import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
 import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.blocklet.SegmentInfo;
@@ -1437,14 +1439,13 @@ public final class CarbonUtil {
    * @param values
    * @return comma separated segment string
    */
-  public static String convertToString(List<String> values) {
+  public static String convertToString(List<Segment> values) {
     if (values == null || values.isEmpty()) {
       return "";
     }
     StringBuilder segmentStringbuilder = new StringBuilder();
     for (int i = 0; i < values.size() - 1; i++) {
-      String segmentNo = values.get(i);
-      segmentStringbuilder.append(segmentNo);
+      segmentStringbuilder.append(values.get(i));
       segmentStringbuilder.append(",");
     }
     segmentStringbuilder.append(values.get(values.size() - 1));
@@ -2145,7 +2146,8 @@ public final class CarbonUtil {
       }
       for (String value : values) {
         if (!value.equalsIgnoreCase("*")) {
-          Float aFloatValue = Float.parseFloat(value);
+          Segment segment = Segment.toSegment(value);
+          Float aFloatValue = Float.parseFloat(segment.getSegmentNo());
           if (aFloatValue < 0 || aFloatValue > Float.MAX_VALUE) {
             throw new InvalidConfigurationException(
                 "carbon.input.segments.<database_name>.<table_name> value range should be greater "
@@ -2298,7 +2300,7 @@ public final class CarbonUtil {
   }
 
   // Get the total size of carbon data and the total size of carbon index
-  public static HashMap<String, Long> getDataSizeAndIndexSize(CarbonTablePath carbonTablePath,
+  private static HashMap<String, Long> getDataSizeAndIndexSize(CarbonTablePath carbonTablePath,
       String segmentId) throws IOException {
     long carbonDataSize = 0L;
     long carbonIndexSize = 0L;
@@ -2350,6 +2352,51 @@ public final class CarbonUtil {
     return dataAndIndexSize;
   }
 
+  // Get the total size of carbon data and the total size of carbon index
+  private static HashMap<String, Long> getDataSizeAndIndexSize(SegmentFileStore fileStore)
+      throws IOException {
+    long carbonDataSize = 0L;
+    long carbonIndexSize = 0L;
+    HashMap<String, Long> dataAndIndexSize = new HashMap<String, Long>();
+    if (fileStore.getLocationMap() != null) {
+      fileStore.readIndexFiles();
+      Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
+      for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+        carbonIndexSize += FileFactory.getCarbonFile(entry.getKey()).getSize();
+        for (String blockFile : entry.getValue()) {
+          carbonDataSize += FileFactory.getCarbonFile(blockFile).getSize();
+        }
+      }
+    }
+    dataAndIndexSize.put(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE, carbonDataSize);
+    dataAndIndexSize.put(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE, carbonIndexSize);
+    return dataAndIndexSize;
+  }
+
+  // Get the total size of carbon data and the total size of carbon index
+  public static HashMap<String, Long> getDataSizeAndIndexSize(CarbonTablePath carbonTablePath,
+      Segment segment) throws IOException {
+    if (segment.getSegmentFileName() != null) {
+      SegmentFileStore fileStore =
+          new SegmentFileStore(carbonTablePath.getPath(), segment.getSegmentFileName());
+      return getDataSizeAndIndexSize(fileStore);
+    } else {
+      return getDataSizeAndIndexSize(carbonTablePath, segment.getSegmentNo());
+    }
+  }
+
+  // Get the total size of segment.
+  public static long getSizeOfSegment(CarbonTablePath carbonTablePath,
+      Segment segment) throws IOException {
+    HashMap<String, Long> dataSizeAndIndexSize = getDataSizeAndIndexSize(carbonTablePath, segment);
+    long size = 0;
+    for (Long eachSize: dataSizeAndIndexSize.values()) {
+      size += eachSize;
+    }
+    return size;
+  }
+
+
   /**
    * Utility function to check whether table has timseries datamap or not
    * @param carbonTable
@@ -2449,5 +2496,43 @@ public final class CarbonUtil {
     return updatedMinMaxValues;
   }
 
+  /**
+   * Generate the blockid as per the block path
+   *
+   * @param identifier
+   * @param filePath
+   * @param segmentId
+   * @return
+   */
+  public static String getBlockId(AbsoluteTableIdentifier identifier, String filePath,
+      String segmentId) {
+    String blockId;
+    String blockName = filePath.substring(filePath.lastIndexOf("/") + 1, filePath.length());
+    String tablePath = identifier.getTablePath();
+    if (filePath.startsWith(tablePath)) {
+      String factDir =
+          CarbonStorePath.getCarbonTablePath(tablePath, identifier.getCarbonTableIdentifier())
+              .getFactDir();
+      if (filePath.startsWith(factDir)) {
+        blockId = "Part0" + CarbonCommonConstants.FILE_SEPARATOR + "Segment_" + segmentId
+            + CarbonCommonConstants.FILE_SEPARATOR + blockName;
+      } else {
+        // This is the case with partition table.
+        String partitionDir =
+            filePath.substring(tablePath.length() + 1, filePath.length() - blockName.length() - 1);
+
+        // Replace / with # on partition director to support multi level partitioning. And access
+        // them all as a single entity.
+        blockId = partitionDir.replace("/", "#") + CarbonCommonConstants.FILE_SEPARATOR + "Segment_"
+            + segmentId + CarbonCommonConstants.FILE_SEPARATOR + blockName;
+      }
+    } else {
+      blockId = filePath.substring(0, filePath.length() - blockName.length()).replace("/", "#")
+          + CarbonCommonConstants.FILE_SEPARATOR + "Segment_" + segmentId
+          + CarbonCommonConstants.FILE_SEPARATOR + blockName;
+    }
+    return blockId;
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java
index d4b328d..f1603dc 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.core.util;
 
 import java.io.Serializable;
+import java.math.BigDecimal;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 
@@ -26,18 +27,33 @@ public class DataTypeConverterImpl implements DataTypeConverter, Serializable {
   private static final long serialVersionUID = -1718154403432354200L;
 
   public Object convertToDecimal(Object data) {
-    return new java.math.BigDecimal(data.toString());
+    if (null == data) {
+      return null;
+    }
+    if (data instanceof BigDecimal) {
+      return data;
+    }
+    return new BigDecimal(data.toString());
   }
 
   public Object convertFromByteToUTF8String(Object data) {
+    if (null == data) {
+      return null;
+    }
     return new String((byte[]) data, CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
   }
 
   public byte[] convertFromStringToByte(Object data) {
+    if (null == data) {
+      return null;
+    }
     return data.toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
   }
 
   public Object convertFromStringToUTF8String(Object data) {
+    if (null == data) {
+      return null;
+    }
     return data.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index c370b14..4602cc4 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -83,25 +83,7 @@ public final class DataTypeUtil {
    */
   public static Object getMeasureValueBasedOnDataType(String msrValue, DataType dataType,
       CarbonMeasure carbonMeasure) {
-    if (dataType == DataTypes.BOOLEAN) {
-      return BooleanConvert.parseBoolean(msrValue);
-    } else if (DataTypes.isDecimal(dataType)) {
-      BigDecimal bigDecimal =
-          new BigDecimal(msrValue).setScale(carbonMeasure.getScale(), RoundingMode.HALF_UP);
-      return normalizeDecimalValue(bigDecimal, carbonMeasure.getPrecision());
-    } else if (dataType == DataTypes.SHORT) {
-      return Short.parseShort(msrValue);
-    } else if (dataType == DataTypes.INT) {
-      return Integer.parseInt(msrValue);
-    } else if (dataType == DataTypes.LONG) {
-      return Long.valueOf(msrValue);
-    } else {
-      Double parsedValue = Double.valueOf(msrValue);
-      if (Double.isInfinite(parsedValue) || Double.isNaN(parsedValue)) {
-        return null;
-      }
-      return parsedValue;
-    }
+    return getMeasureValueBasedOnDataType(msrValue, dataType,carbonMeasure, false);
   }
 
   /**
@@ -112,15 +94,19 @@ public final class DataTypeUtil {
    * @param carbonMeasure
    * @return
    */
-  public static Object getConvertedMeasureValueBasedOnDataType(String msrValue, DataType dataType,
-      CarbonMeasure carbonMeasure) {
+  public static Object getMeasureValueBasedOnDataType(String msrValue, DataType dataType,
+      CarbonMeasure carbonMeasure, boolean useConverter) {
     if (dataType == DataTypes.BOOLEAN) {
       return BooleanConvert.parseBoolean(msrValue);
     } else if (DataTypes.isDecimal(dataType)) {
       BigDecimal bigDecimal =
           new BigDecimal(msrValue).setScale(carbonMeasure.getScale(), RoundingMode.HALF_UP);
-      return converter
-          .convertToDecimal(normalizeDecimalValue(bigDecimal, carbonMeasure.getPrecision()));
+      BigDecimal decimal = normalizeDecimalValue(bigDecimal, carbonMeasure.getPrecision());
+      if (useConverter) {
+        return converter.convertToDecimal(decimal);
+      } else {
+        return decimal;
+      }
     } else if (dataType == DataTypes.SHORT) {
       return Short.parseShort(msrValue);
     } else if (dataType == DataTypes.INT) {
@@ -815,6 +801,8 @@ public final class DataTypeUtil {
    */
   public static void setDataTypeConverter(DataTypeConverter converterLocal) {
     converter = converterLocal;
+    timeStampformatter.remove();
+    dateformatter.remove();
   }
 
   public static DataTypeConverter getDataTypeConverter() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 5a63d2f..d70d9ef 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -48,7 +48,7 @@ public class CarbonTablePath extends Path {
   public static final String CARBON_DATA_EXT = ".carbondata";
   public static final String INDEX_FILE_EXT = ".carbonindex";
   public static final String MERGE_INDEX_FILE_EXT = ".carbonindexmerge";
-  public static final String PARTITION_MAP_EXT = ".partitionmap";
+  public static final String SEGMENT_EXT = ".segment";
 
   private static final String STREAMING_DIR = ".streaming";
   private static final String STREAMING_LOG_DIR = "log";
@@ -111,17 +111,6 @@ public class CarbonTablePath extends Path {
   }
 
   /**
-   * Return true if the fileNameWithPath ends with partition map file extension name
-   */
-  public static boolean isPartitionMapFile(String fileNameWithPath) {
-    int pos = fileNameWithPath.lastIndexOf('.');
-    if (pos != -1) {
-      return fileNameWithPath.substring(pos).startsWith(PARTITION_MAP_EXT);
-    }
-    return false;
-  }
-
-  /**
    * check if it is carbon index file matching extension
    *
    * @param fileNameWithPath
@@ -667,6 +656,18 @@ public class CarbonTablePath extends Path {
   }
 
   /**
+   * This method will remove strings in path and return short block id
+   *
+   * @param blockId
+   * @return shortBlockId
+   */
+  public static String getShortBlockIdForPartitionTable(String blockId) {
+    return blockId.replace(SEGMENT_PREFIX, "")
+        .replace(DATA_PART_PREFIX, "")
+        .replace(CARBON_DATA_EXT, "");
+  }
+
+  /**
    * This method will append strings in path and return block id
    *
    * @param shortBlockId
@@ -735,4 +736,12 @@ public class CarbonTablePath extends Path {
   public static String getSegmentPath(String tablePath, String segmentId) {
     return tablePath + "/Fact/Part0/Segment_" + segmentId;
   }
+
+  /**
+   * Get the segment file locations of table
+   */
+  public static String getSegmentFilesLocation(String tablePath) {
+    return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + "segments";
+  }
+
 }
\ No newline at end of file