You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/02/26 12:19:09 UTC

[9/9] carbondata git commit: [CARBONDATA-2187][PARTITION] Partition restructure for new folder structure and supporting partition location feature

[CARBONDATA-2187][PARTITION] Partition restructure for new folder structure and supporting partition location feature

This closes #1984


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8d3c7740
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8d3c7740
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8d3c7740

Branch: refs/heads/master
Commit: 8d3c77400cd6abc30c6914084268d9ecd3d18751
Parents: dded5d5
Author: ravipesala <ra...@gmail.com>
Authored: Thu Feb 15 00:31:56 2018 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Mon Feb 26 17:43:13 2018 +0530

----------------------------------------------------------------------
 .../core/datamap/DataMapDistributable.java      |  10 +-
 .../apache/carbondata/core/datamap/Segment.java |  97 +++
 .../carbondata/core/datamap/TableDataMap.java   |  45 +-
 .../carbondata/core/datamap/dev/DataMap.java    |   3 +-
 .../core/datamap/dev/DataMapFactory.java        |   9 +-
 .../filesystem/AbstractDFSCarbonFile.java       |  27 +-
 .../indexstore/BlockletDataMapIndexStore.java   | 107 +--
 .../core/indexstore/BlockletDetailsFetcher.java |  10 +-
 .../core/indexstore/PartitionSpec.java          |  92 +++
 .../TableBlockIndexUniqueIdentifier.java        |  87 +--
 .../blockletindex/BlockletDataMap.java          | 134 ++--
 .../BlockletDataMapDistributable.java           |   4 +-
 .../blockletindex/BlockletDataMapFactory.java   | 104 ++-
 .../blockletindex/BlockletDataMapModel.java     |  27 +-
 .../blockletindex/SegmentIndexFileStore.java    |  94 ++-
 .../carbondata/core/locks/HdfsFileLock.java     |  16 -
 .../carbondata/core/locks/LocalFileLock.java    |  16 +-
 .../core/metadata/PartitionMapFileStore.java    | 484 ------------
 .../core/metadata/SegmentFileStore.java         | 744 +++++++++++++++++++
 .../core/mutate/CarbonUpdateUtil.java           |  50 +-
 .../executor/impl/AbstractQueryExecutor.java    |  15 +-
 .../core/statusmanager/LoadMetadataDetails.java |  18 +
 .../statusmanager/SegmentStatusManager.java     |  87 ++-
 .../SegmentUpdateStatusManager.java             |  41 +-
 .../apache/carbondata/core/util/CarbonUtil.java |  95 ++-
 .../core/util/DataTypeConverterImpl.java        |  18 +-
 .../carbondata/core/util/DataTypeUtil.java      |  34 +-
 .../core/util/path/CarbonTablePath.java         |  33 +-
 .../carbondata/core/util/CarbonUtilTest.java    |   9 +-
 .../carbondata/hadoop/CarbonInputSplit.java     |   9 +-
 .../hadoop/api/CarbonOutputCommitter.java       | 191 +++--
 .../hadoop/api/CarbonTableInputFormat.java      | 128 ++--
 .../hadoop/api/CarbonTableOutputFormat.java     |   2 +
 .../hadoop/api/DistributableDataMapFormat.java  |  10 +-
 .../MajorCompactionIgnoreInMinorTest.scala      |   6 +-
 .../MajorCompactionStopsAfterCompaction.scala   |   2 +-
 .../testsuite/datamap/DataMapWriterSuite.scala  |  10 +-
 .../TestInsertAndOtherCommandConcurrent.scala   |  10 +-
 .../StandardPartitionBadRecordLoggerTest.scala  |  17 -
 .../StandardPartitionGlobalSortTestCase.scala   | 388 ++++++++--
 .../StandardPartitionTableCleanTestCase.scala   |  72 +-
 ...andardPartitionTableCompactionTestCase.scala |  36 +-
 .../StandardPartitionTableDropTestCase.scala    |   1 +
 .../StandardPartitionTableLoadingTestCase.scala |  62 +-
 ...tandardPartitionTableOverwriteTestCase.scala |  76 +-
 .../StandardPartitionTableQueryTestCase.scala   |  79 +-
 .../spark/util/SparkDataTypeConverterImpl.java  |  12 +
 .../org/apache/carbondata/api/CarbonStore.scala |  12 +-
 .../spark/rdd/CarbonDropPartitionRDD.scala      | 118 +--
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |  52 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   3 +-
 .../spark/rdd/CarbonSparkPartition.scala        |   3 +-
 .../carbondata/spark/rdd/PartitionDropper.scala |   2 +-
 .../carbondata/spark/util/CarbonScalaUtil.scala | 207 ++++--
 .../carbondata/spark/util/CommonUtil.scala      |  27 +-
 .../carbondata/spark/util/DataLoadingUtil.scala |  88 ++-
 .../command/carbonTableSchemaCommon.scala       |  17 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  49 +-
 .../spark/rdd/CarbonTableCompactor.scala        |  72 +-
 .../org/apache/spark/sql/CarbonCountStar.scala  |   4 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |   3 +-
 .../CarbonAlterTableCompactionCommand.scala     |   4 +-
 .../management/CarbonCleanFilesCommand.scala    |  13 +-
 .../management/CarbonLoadDataCommand.scala      | 432 +++++++----
 .../management/RefreshCarbonTableCommand.scala  |  27 +-
 .../CarbonProjectForUpdateCommand.scala         |   3 +-
 .../command/mutation/DeleteExecution.scala      |  20 +-
 .../command/mutation/HorizontalCompaction.scala |   5 +-
 ...arbonAlterTableAddHivePartitionCommand.scala | 117 +++
 ...rbonAlterTableDropHivePartitionCommand.scala | 102 +--
 .../CarbonAlterTableDropPartitionCommand.scala  |  10 +-
 .../CarbonAlterTableSplitPartitionCommand.scala |   4 +-
 .../CreatePreAggregateTableCommand.scala        |   7 +-
 .../preaaggregate/PreAggregateListeners.scala   |  13 +-
 .../table/CarbonCreateTableCommand.scala        |  13 +
 .../datasources/CarbonFileFormat.scala          | 222 +++---
 .../strategy/CarbonLateDecodeStrategy.scala     |  15 +-
 .../sql/execution/strategy/DDLStrategy.scala    |  15 +-
 .../apache/spark/sql/hive/CarbonRelation.scala  |  18 +-
 .../spark/sql/optimizer/CarbonFilters.scala     |  59 +-
 .../spark/sql/hive/CarbonSessionState.scala     |  12 +-
 .../spark/sql/hive/CarbonSessionState.scala     |  12 +-
 .../datamap/DataMapWriterListener.java          |   3 +-
 .../loading/CarbonDataLoadConfiguration.java    |  13 +
 .../loading/DataLoadProcessBuilder.java         |   4 +-
 .../impl/MeasureFieldConverterImpl.java         |   2 +-
 .../loading/model/CarbonLoadModel.java          |  13 +
 .../processing/merger/CarbonDataMergerUtil.java |  77 +-
 .../merger/CompactionResultSortProcessor.java   |  35 +-
 .../merger/RowResultMergerProcessor.java        |  34 +-
 .../partition/spliter/RowResultProcessor.java   |   5 +-
 .../store/CarbonFactDataHandlerModel.java       |  10 +-
 .../util/CarbonDataProcessorUtil.java           |   8 +-
 .../processing/util/CarbonLoaderUtil.java       |  39 +-
 .../processing/util/DeleteLoadFolders.java      |  80 +-
 .../carbon/datastore/BlockIndexStoreTest.java   |   6 +-
 .../carbondata/streaming/StreamHandoffRDD.scala |   5 +-
 97 files changed, 3741 insertions(+), 1994 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
index 50af789..edd724a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java
@@ -31,7 +31,7 @@ public abstract class DataMapDistributable extends InputSplit
 
   private String tablePath;
 
-  private String segmentId;
+  private Segment segment;
 
   private String dataMapName;
 
@@ -47,12 +47,12 @@ public abstract class DataMapDistributable extends InputSplit
     this.tablePath = tablePath;
   }
 
-  public String getSegmentId() {
-    return segmentId;
+  public Segment getSegment() {
+    return segment;
   }
 
-  public void setSegmentId(String segmentId) {
-    this.segmentId = segmentId;
+  public void setSegment(Segment segment) {
+    this.segment = segment;
   }
 
   public String getDataMapName() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
new file mode 100644
index 0000000..c47f16c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datamap;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Represents one load of carbondata
+ */
+public class Segment implements Serializable {
+
+  private static final long serialVersionUID = 7044555408162234064L;
+
+  private String segmentNo;
+
+  private String segmentFileName;
+
+  public Segment(String segmentNo, String segmentFileName) {
+    this.segmentNo = segmentNo;
+    this.segmentFileName = segmentFileName;
+  }
+
+  public String getSegmentNo() {
+    return segmentNo;
+  }
+
+  public String getSegmentFileName() {
+    return segmentFileName;
+  }
+
+  public static List<Segment> toSegmentList(String[] segmentIds) {
+    List<Segment> list = new ArrayList<>(segmentIds.length);
+    for (String segmentId : segmentIds) {
+      list.add(toSegment(segmentId));
+    }
+    return list;
+  }
+
+  public static List<Segment> toSegmentList(List<String> segmentIds) {
+    List<Segment> list = new ArrayList<>(segmentIds.size());
+    for (String segmentId : segmentIds) {
+      list.add(toSegment(segmentId));
+    }
+    return list;
+  }
+
+  /**
+   * SegmentId can be combination of segmentNo and segmentFileName
+   * @param segmentId
+   * @return
+   */
+  public static Segment toSegment(String segmentId) {
+    String[] split = segmentId.split("#");
+    if (split.length > 1) {
+      return new Segment(split[0], split[1]);
+    } else if (split.length > 0) {
+      return new Segment(split[0], null);
+    }
+    return new Segment(segmentId, null);
+  }
+
+  @Override public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    Segment segment = (Segment) o;
+    return Objects.equals(segmentNo, segment.segmentNo);
+  }
+
+  @Override public int hashCode() {
+    return Objects.hash(segmentNo);
+  }
+
+  @Override public String toString() {
+    if (segmentFileName != null) {
+      return segmentNo + "#" + segmentFileName;
+    } else {
+      return segmentNo;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 9c84891..6555d6c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.events.Event;
@@ -60,21 +61,21 @@ public final class TableDataMap extends OperationEventListener {
   /**
    * Pass the valid segments and prune the datamap using filter expression
    *
-   * @param segmentIds
+   * @param segments
    * @param filterExp
    * @return
    */
-  public List<ExtendedBlocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp,
-      List<String> partitions) throws IOException {
+  public List<ExtendedBlocklet> prune(List<Segment> segments, FilterResolverIntf filterExp,
+      List<PartitionSpec> partitions) throws IOException {
     List<ExtendedBlocklet> blocklets = new ArrayList<>();
-    for (String segmentId : segmentIds) {
+    for (Segment segment : segments) {
       List<Blocklet> pruneBlocklets = new ArrayList<>();
-      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
+      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segment);
       for (DataMap dataMap : dataMaps) {
         pruneBlocklets.addAll(dataMap.prune(filterExp, partitions));
       }
       blocklets.addAll(addSegmentId(blockletDetailsFetcher
-          .getExtendedBlocklets(pruneBlocklets, segmentId), segmentId));
+          .getExtendedBlocklets(pruneBlocklets, segment), segment.getSegmentNo()));
     }
     return blocklets;
   }
@@ -94,13 +95,13 @@ public final class TableDataMap extends OperationEventListener {
    *
    * @return
    */
-  public List<DataMapDistributable> toDistributable(List<String> segmentIds) throws IOException {
+  public List<DataMapDistributable> toDistributable(List<Segment> segments) throws IOException {
     List<DataMapDistributable> distributables = new ArrayList<>();
-    for (String segmentsId : segmentIds) {
-      List<DataMapDistributable> list = dataMapFactory.toDistributable(segmentsId);
+    for (Segment segment : segments) {
+      List<DataMapDistributable> list = dataMapFactory.toDistributable(segment);
       for (DataMapDistributable distributable: list) {
         distributable.setDataMapName(dataMapName);
-        distributable.setSegmentId(segmentsId);
+        distributable.setSegment(segment);
         distributable.setTablePath(identifier.getTablePath());
         distributable.setDataMapFactoryClass(dataMapFactory.getClass().getName());
       }
@@ -118,7 +119,7 @@ public final class TableDataMap extends OperationEventListener {
    * @return
    */
   public List<ExtendedBlocklet> prune(DataMapDistributable distributable,
-      FilterResolverIntf filterExp, List<String> partitions) throws IOException {
+      FilterResolverIntf filterExp, List<PartitionSpec> partitions) throws IOException {
     List<ExtendedBlocklet> detailedBlocklets = new ArrayList<>();
     List<Blocklet> blocklets = new ArrayList<>();
     List<DataMap> dataMaps = dataMapFactory.getDataMaps(distributable);
@@ -127,8 +128,8 @@ public final class TableDataMap extends OperationEventListener {
     }
     for (Blocklet blocklet: blocklets) {
       ExtendedBlocklet detailedBlocklet =
-          blockletDetailsFetcher.getExtendedBlocklet(blocklet, distributable.getSegmentId());
-      detailedBlocklet.setSegmentId(distributable.getSegmentId());
+          blockletDetailsFetcher.getExtendedBlocklet(blocklet, distributable.getSegment());
+      detailedBlocklet.setSegmentId(distributable.getSegment().getSegmentNo());
       detailedBlocklets.add(detailedBlocklet);
     }
     return detailedBlocklets;
@@ -136,11 +137,11 @@ public final class TableDataMap extends OperationEventListener {
 
   /**
    * Clear only the datamaps of the segments
-   * @param segmentIds
+   * @param segments
    */
-  public void clear(List<String> segmentIds) {
-    for (String segmentId: segmentIds) {
-      dataMapFactory.clear(segmentId);
+  public void clear(List<Segment> segments) {
+    for (Segment segment: segments) {
+      dataMapFactory.clear(segment);
     }
   }
 
@@ -170,21 +171,21 @@ public final class TableDataMap extends OperationEventListener {
   /**
    * Method to prune the segments based on task min/max values
    *
-   * @param segmentIds
+   * @param segments
    * @param filterExp
    * @return
    * @throws IOException
    */
-  public List<String> pruneSegments(List<String> segmentIds, FilterResolverIntf filterExp)
+  public List<String> pruneSegments(List<Segment> segments, FilterResolverIntf filterExp)
       throws IOException {
     List<String> prunedSegments = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    for (String segmentId : segmentIds) {
-      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId);
+    for (Segment segment : segments) {
+      List<DataMap> dataMaps = dataMapFactory.getDataMaps(segment);
       for (DataMap dataMap : dataMaps) {
         if (dataMap.isScanRequired(filterExp)) {
           // If any one task in a given segment contains the data that means the segment need to
           // be scanned and we need to validate further data maps in the same segment
-          prunedSegments.add(segmentId);
+          prunedSegments.add(segment.getSegmentNo());
           break;
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
index 16be1ac..f3642d6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.carbondata.core.indexstore.Blocklet;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 
@@ -50,7 +51,7 @@ public interface DataMap {
    * @param filterExp
    * @return
    */
-  List<Blocklet> prune(FilterResolverIntf filterExp, List<String> partitions);
+  List<Blocklet> prune(FilterResolverIntf filterExp, List<PartitionSpec> partitions);
 
   // TODO Move this method to Abstract class
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index f5a7404..40cd436 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.events.Event;
 
@@ -37,12 +38,12 @@ public interface DataMapFactory {
   /**
    * Return a new write for this datamap
    */
-  DataMapWriter createWriter(String segmentId);
+  DataMapWriter createWriter(Segment segment);
 
   /**
    * Get the datamap for segmentid
    */
-  List<DataMap> getDataMaps(String segmentId) throws IOException;
+  List<DataMap> getDataMaps(Segment segment) throws IOException;
 
   /**
    * Get datamaps for distributable object.
@@ -53,7 +54,7 @@ public interface DataMapFactory {
    * Get all distributable objects of a segmentid
    * @return
    */
-  List<DataMapDistributable> toDistributable(String segmentId);
+  List<DataMapDistributable> toDistributable(Segment segment);
 
   /**
    *
@@ -64,7 +65,7 @@ public interface DataMapFactory {
   /**
    * Clears datamap of the segment
    */
-  void clear(String segmentId);
+  void clear(Segment segment);
 
   /**
    * Clear all datamaps from memory

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index 927cef5..68eaa21 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -412,15 +413,21 @@ public abstract  class AbstractDFSCarbonFile implements CarbonFile {
 
   @Override
   public boolean createNewFile(String filePath, FileFactory.FileType fileType, boolean doAs,
-      final FsPermission permission) throws IOException {
+      FsPermission permission) throws IOException {
     filePath = filePath.replace("\\", "/");
     Path path = new Path(filePath);
     FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
-    boolean result = fs.createNewFile(path);
-    if (null != permission) {
-      fs.setPermission(path, permission);
+    if (fs.exists(path)) {
+      return false;
+    } else {
+      if (permission == null) {
+        permission = FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(fs.getConf()));
+      }
+      // Pass the permissions duringg file creation itself
+      fs.create(path, permission, false, fs.getConf().getInt("io.file.buffer.size", 4096),
+          fs.getDefaultReplication(path), fs.getDefaultBlockSize(path), null).close();
+      return true;
     }
-    return result;
   }
 
   @Override public boolean deleteFile(String filePath, FileFactory.FileType fileType)
@@ -453,11 +460,15 @@ public abstract  class AbstractDFSCarbonFile implements CarbonFile {
     filePath = filePath.replace("\\", "/");
     Path path = new Path(filePath);
     FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
-    if (fs.createNewFile(path)) {
-      fs.deleteOnExit(path);
+    if (fs.exists(path)) {
+      return false;
+    } else {
+      // Pass the permissions duringg file creation itself
+      fs.create(path, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL), false,
+          fs.getConf().getInt("io.file.buffer.size", 4096), fs.getDefaultReplication(path),
+          fs.getDefaultBlockSize(path), null).close();
       return true;
     }
-    return false;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index 111a7a2..c6073d5 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -28,14 +28,20 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CarbonLRUCache;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapModel;
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.PartitionMapFileStore;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 
 /**
  * Class to handle loading, unloading,clearing,storing of the table
@@ -75,22 +81,10 @@ public class BlockletDataMapIndexStore
     BlockletDataMap dataMap = (BlockletDataMap) lruCache.get(lruCacheKey);
     if (dataMap == null) {
       try {
-        String segmentPath = CarbonTablePath.getSegmentPath(
-            identifier.getAbsoluteTableIdentifier().getTablePath(),
-            identifier.getSegmentId());
-        Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>();
-        CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
-        CarbonFile[] carbonFiles = carbonFile.locationAwareListFiles();
         SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
-        indexFileStore.readAllIIndexOfSegment(carbonFiles);
-        PartitionMapFileStore partitionFileStore = new PartitionMapFileStore();
-        partitionFileStore.readAllPartitionsOfSegment(carbonFiles, segmentPath);
-        for (CarbonFile file : carbonFiles) {
-          blockMetaInfoMap
-              .put(file.getAbsolutePath(), new BlockMetaInfo(file.getLocations(), file.getSize()));
-        }
-        dataMap =
-            loadAndGetDataMap(identifier, indexFileStore, partitionFileStore, blockMetaInfoMap);
+        Map<String, BlockMetaInfo> blockMetaInfoMap =
+            getBlockMetaInfoMap(identifier, indexFileStore);
+        dataMap = loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap);
       } catch (MemoryException e) {
         LOGGER.error("memory exception when loading datamap: " + e.getMessage());
         throw new RuntimeException(e.getMessage(), e);
@@ -99,6 +93,47 @@ public class BlockletDataMapIndexStore
     return dataMap;
   }
 
+  private Map<String, BlockMetaInfo> getBlockMetaInfoMap(TableBlockIndexUniqueIdentifier identifier,
+      SegmentIndexFileStore indexFileStore) throws IOException {
+    if (identifier.getMergeIndexFileName() != null) {
+      CarbonFile indexMergeFile = FileFactory.getCarbonFile(
+          identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+              .getMergeIndexFileName());
+      if (indexMergeFile.exists()) {
+        indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { indexMergeFile });
+      }
+    }
+    if (indexFileStore.getFileData(identifier.getIndexFileName()) == null) {
+      indexFileStore.readAllIIndexOfSegment(new CarbonFile[] { FileFactory.getCarbonFile(
+          identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+              .getIndexFileName()) });
+    }
+    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+    Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>();
+    List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(
+        identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+            .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()));
+    for (DataFileFooter footer : indexInfo) {
+      String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath();
+      blockMetaInfoMap.put(blockPath, createBlockMetaInfo(blockPath));
+    }
+    return blockMetaInfoMap;
+  }
+
+  private BlockMetaInfo createBlockMetaInfo(String carbonDataFile) throws IOException {
+    CarbonFile carbonFile = FileFactory.getCarbonFile(carbonDataFile);
+    if (carbonFile instanceof AbstractDFSCarbonFile) {
+      RemoteIterator<LocatedFileStatus> iter =
+          ((AbstractDFSCarbonFile)carbonFile).fs.listLocatedStatus(new Path(carbonDataFile));
+      LocatedFileStatus fileStatus = iter.next();
+      String[] location = fileStatus.getBlockLocations()[0].getHosts();
+      long len = fileStatus.getLen();
+      return new BlockMetaInfo(location, len);
+    } else {
+      return new BlockMetaInfo(new String[]{"localhost"}, carbonFile.getSize());
+    }
+  }
+
   @Override
   public List<BlockletDataMap> getAll(
       List<TableBlockIndexUniqueIdentifier> tableSegmentUniqueIdentifiers) throws IOException {
@@ -116,34 +151,13 @@ public class BlockletDataMapIndexStore
         }
       }
       if (missedIdentifiers.size() > 0) {
-        Map<String, SegmentIndexFileStore> segmentIndexFileStoreMap = new HashMap<>();
-        Map<String, PartitionMapFileStore> partitionFileStoreMap = new HashMap<>();
-        Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>();
+        SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
 
         for (TableBlockIndexUniqueIdentifier identifier: missedIdentifiers) {
-          SegmentIndexFileStore indexFileStore =
-              segmentIndexFileStoreMap.get(identifier.getSegmentId());
-          PartitionMapFileStore partitionFileStore =
-              partitionFileStoreMap.get(identifier.getSegmentId());
-          String segmentPath = CarbonTablePath.getSegmentPath(
-              identifier.getAbsoluteTableIdentifier().getTablePath(),
-              identifier.getSegmentId());
-          if (indexFileStore == null) {
-            CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
-            CarbonFile[] carbonFiles = carbonFile.locationAwareListFiles();
-            indexFileStore = new SegmentIndexFileStore();
-            indexFileStore.readAllIIndexOfSegment(carbonFiles);
-            segmentIndexFileStoreMap.put(identifier.getSegmentId(), indexFileStore);
-            partitionFileStore = new PartitionMapFileStore();
-            partitionFileStore.readAllPartitionsOfSegment(carbonFiles, segmentPath);
-            partitionFileStoreMap.put(identifier.getSegmentId(), partitionFileStore);
-            for (CarbonFile file : carbonFiles) {
-              blockMetaInfoMap.put(FileFactory.getUpdatedFilePath(file.getAbsolutePath()),
-                  new BlockMetaInfo(file.getLocations(), file.getSize()));
-            }
-          }
+          Map<String, BlockMetaInfo> blockMetaInfoMap =
+              getBlockMetaInfoMap(identifier, indexFileStore);
           blockletDataMaps.add(
-              loadAndGetDataMap(identifier, indexFileStore, partitionFileStore, blockMetaInfoMap));
+              loadAndGetDataMap(identifier, indexFileStore, blockMetaInfoMap));
         }
       }
     } catch (Throwable e) {
@@ -194,7 +208,6 @@ public class BlockletDataMapIndexStore
   private BlockletDataMap loadAndGetDataMap(
       TableBlockIndexUniqueIdentifier identifier,
       SegmentIndexFileStore indexFileStore,
-      PartitionMapFileStore partitionFileStore,
       Map<String, BlockMetaInfo> blockMetaInfoMap)
       throws IOException, MemoryException {
     String uniqueTableSegmentIdentifier =
@@ -206,10 +219,10 @@ public class BlockletDataMapIndexStore
     BlockletDataMap dataMap;
     synchronized (lock) {
       dataMap = new BlockletDataMap();
-      dataMap.init(new BlockletDataMapModel(identifier.getFilePath(),
-          indexFileStore.getFileData(identifier.getCarbonIndexFileName()),
-          partitionFileStore.getPartitions(identifier.getCarbonIndexFileName()),
-          partitionFileStore.isPartionedSegment(), blockMetaInfoMap));
+      dataMap.init(new BlockletDataMapModel(
+          identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+              .getIndexFileName(), indexFileStore.getFileData(identifier.getIndexFileName()),
+          blockMetaInfoMap, identifier.getSegmentId()));
       lruCache.put(identifier.getUniqueTableSegmentIdentifier(), dataMap,
           dataMap.getMemorySize());
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
index 21ecba1..b4d6db2 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.core.indexstore;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.carbondata.core.datamap.Segment;
+
 /**
  * Fetches the detailed blocklet which has more information to execute the query
  */
@@ -28,20 +30,20 @@ public interface BlockletDetailsFetcher {
    * Get the blocklet detail information based on blockletid, blockid and segmentid.
    *
    * @param blocklets
-   * @param segmentId
+   * @param segment
    * @return
    * @throws IOException
    */
-  List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, String segmentId)
+  List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, Segment segment)
       throws IOException;
 
   /**
    * Get the blocklet detail information based on blockletid, blockid and segmentid.
    *
    * @param blocklet
-   * @param segmentId
+   * @param segment
    * @return
    * @throws IOException
    */
-  ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, String segmentId) throws IOException;
+  ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java b/core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java
new file mode 100644
index 0000000..87c875e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/PartitionSpec.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Holds partition information.
+ */
+public class PartitionSpec implements Serializable {
+
+  private static final long serialVersionUID = 4828007433384867678L;
+
+  /**
+   * It holds the partition information in columnName=partitionValue combination.
+   */
+  private List<String> partitions;
+
+  private transient Path locationPath;
+
+  private String location;
+
+  private String uuid;
+
+  public PartitionSpec(List<String> partitions, String location) {
+    this.partitions = partitions;
+    this.locationPath = new Path(FileFactory.getUpdatedFilePath(location));
+    this.location = locationPath.toString();
+  }
+
+  public PartitionSpec(List<String> partitions, URI location) {
+    this.partitions = partitions;
+    this.locationPath = new Path(FileFactory.getUpdatedFilePath(new Path(location).toString()));
+    this.location = locationPath.toString();
+  }
+
+  public List<String> getPartitions() {
+    return partitions;
+  }
+
+  public Path getLocation() {
+    if (locationPath == null) {
+      locationPath = new Path(location);
+    }
+    return locationPath;
+  }
+
+  public String getUuid() {
+    return uuid;
+  }
+
+  public void setUuid(String uuid) {
+    this.uuid = uuid;
+  }
+
+  @Override public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    PartitionSpec spec = (PartitionSpec) o;
+    return Objects.equals(getLocation(), spec.getLocation());
+  }
+
+  @Override public int hashCode() {
+    return Objects.hash(locationPath);
+  }
+
+  @Override public String toString() {
+    return "PartitionSpec{" + "partitions=" + partitions + ", locationPath=" + locationPath
+        + ", location='" + location + '\'' + '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
index 18357ac..c907fa8 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/TableBlockIndexUniqueIdentifier.java
@@ -17,91 +17,66 @@
 
 package org.apache.carbondata.core.indexstore;
 
+import java.util.Objects;
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 
 /**
- * Class holds the absoluteTableIdentifier and segmentId to uniquely identify a segment
+ * Class holds the indexFile information to uniquely identitify the carbon index
  */
 public class TableBlockIndexUniqueIdentifier {
-  /**
-   * table fully qualified identifier
-   */
-  private AbsoluteTableIdentifier absoluteTableIdentifier;
 
-  private String segmentId;
+  private String indexFilePath;
 
-  private String carbonIndexFileName;
+  private String indexFileName;
 
-  /**
-   * Constructor to initialize the class instance
-   *
-   * @param absoluteTableIdentifier
-   * @param segmentId
-   */
-  public TableBlockIndexUniqueIdentifier(AbsoluteTableIdentifier absoluteTableIdentifier,
-      String segmentId, String carbonIndexFileName) {
-    this.absoluteTableIdentifier = absoluteTableIdentifier;
+  private String mergeIndexFileName;
+
+  private String segmentId;
+
+  public TableBlockIndexUniqueIdentifier(String indexFilePath, String indexFileName,
+      String mergeIndexFileName, String segmentId) {
+    this.indexFilePath = indexFilePath;
+    this.indexFileName = indexFileName;
+    this.mergeIndexFileName = mergeIndexFileName;
     this.segmentId = segmentId;
-    this.carbonIndexFileName = carbonIndexFileName;
   }
 
   /**
-   * returns AbsoluteTableIdentifier
+   * method returns the id to uniquely identify a key
    *
    * @return
    */
-  public AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
-    return absoluteTableIdentifier;
+  public String getUniqueTableSegmentIdentifier() {
+    return indexFilePath + CarbonCommonConstants.FILE_SEPARATOR + indexFileName;
   }
 
-  public String getSegmentId() {
-    return segmentId;
+  public String getIndexFilePath() {
+    return indexFilePath;
   }
 
-  /**
-   * method returns the id to uniquely identify a key
-   *
-   * @return
-   */
-  public String getUniqueTableSegmentIdentifier() {
-    CarbonTableIdentifier carbonTableIdentifier =
-        absoluteTableIdentifier.getCarbonTableIdentifier();
-    return carbonTableIdentifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR
-        + carbonTableIdentifier.getTableName() + CarbonCommonConstants.UNDERSCORE
-        + carbonTableIdentifier.getTableId() + CarbonCommonConstants.FILE_SEPARATOR + segmentId
-        + CarbonCommonConstants.FILE_SEPARATOR + carbonIndexFileName;
+  public String getIndexFileName() {
+    return indexFileName;
+  }
+
+  public String getMergeIndexFileName() {
+    return mergeIndexFileName;
   }
 
-  public String getFilePath() {
-    return absoluteTableIdentifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId + "/"
-        + carbonIndexFileName;
+  public String getSegmentId() {
+    return segmentId;
   }
 
   @Override public boolean equals(Object o) {
     if (this == o) return true;
     if (o == null || getClass() != o.getClass()) return false;
-
     TableBlockIndexUniqueIdentifier that = (TableBlockIndexUniqueIdentifier) o;
-
-    if (!absoluteTableIdentifier.equals(that.absoluteTableIdentifier)) {
-      return false;
-    }
-    if (!segmentId.equals(that.segmentId)) {
-      return false;
-    }
-    return carbonIndexFileName.equals(that.carbonIndexFileName);
+    return Objects.equals(indexFilePath, that.indexFilePath) && Objects
+        .equals(indexFileName, that.indexFileName) && Objects
+        .equals(mergeIndexFileName, that.mergeIndexFileName);
   }
 
   @Override public int hashCode() {
-    int result = absoluteTableIdentifier.hashCode();
-    result = 31 * result + segmentId.hashCode();
-    result = 31 * result + carbonIndexFileName.hashCode();
-    return result;
-  }
-
-  public String getCarbonIndexFileName() {
-    return carbonIndexFileName;
+    return Objects.hash(indexFilePath, indexFileName, mergeIndexFileName);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 699f9e1..9ec7a46 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -43,6 +43,7 @@ import org.apache.carbondata.core.indexstore.BlockMetaInfo;
 import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore;
 import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
@@ -65,8 +66,10 @@ import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataFileFooterConverter;
 import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
 import org.xerial.snappy.Snappy;
 
 /**
@@ -111,7 +114,11 @@ public class BlockletDataMap implements DataMap, Cacheable {
 
   private static int SCHEMA = 2;
 
-  private static int PARTITION_INFO = 3;
+  private static int INDEX_PATH = 3;
+
+  private static int INDEX_FILE_NAME = 4;
+
+  private static int SEGMENTID = 5;
 
   private UnsafeMemoryDMStore unsafeMemoryDMStore;
 
@@ -121,8 +128,6 @@ public class BlockletDataMap implements DataMap, Cacheable {
 
   private int[] columnCardinality;
 
-  private boolean isPartitionedSegment;
-
   @Override
   public void init(DataMapModel dataMapModel) throws IOException, MemoryException {
     long startTime = System.currentTimeMillis();
@@ -131,7 +136,11 @@ public class BlockletDataMap implements DataMap, Cacheable {
     DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
     List<DataFileFooter> indexInfo = fileFooterConverter
         .getIndexInfo(blockletDataMapInfo.getFilePath(), blockletDataMapInfo.getFileData());
-    isPartitionedSegment = blockletDataMapInfo.isPartitionedSegment();
+    Path path = new Path(blockletDataMapInfo.getFilePath());
+    byte[] filePath = path.getParent().toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
+    byte[] fileName = path.getName().toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
+    byte[] segmentId =
+        blockletDataMapInfo.getSegmentId().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
     DataMapRowImpl summaryRow = null;
     byte[] schemaBinary = null;
     // below 2 variables will be used for fetching the relative blocklet id. Relative blocklet ID
@@ -145,7 +154,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
         columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality();
         segmentProperties = new SegmentProperties(columnInTable, columnCardinality);
         createSchema(segmentProperties);
-        createSummarySchema(segmentProperties, blockletDataMapInfo.getPartitions(), schemaBinary);
+        createSummarySchema(segmentProperties, schemaBinary, filePath, fileName,
+            segmentId);
       }
       TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
       BlockMetaInfo blockMetaInfo =
@@ -182,8 +192,10 @@ public class BlockletDataMap implements DataMap, Cacheable {
     if (null != unsafeMemorySummaryDMStore) {
       addTaskSummaryRowToUnsafeMemoryStore(
           summaryRow,
-          blockletDataMapInfo.getPartitions(),
-          schemaBinary);
+          schemaBinary,
+          filePath,
+          fileName,
+          segmentId);
       unsafeMemorySummaryDMStore.finishWriting();
     }
     LOGGER.info(
@@ -354,8 +366,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
     return summaryRow;
   }
 
-  private void addTaskSummaryRowToUnsafeMemoryStore(DataMapRow summaryRow,
-      List<String> partitions, byte[] schemaBinary) throws IOException {
+  private void addTaskSummaryRowToUnsafeMemoryStore(DataMapRow summaryRow, byte[] schemaBinary,
+      byte[] filePath, byte[] fileName, byte[] segmentId) {
     // write the task summary info to unsafe memory store
     if (null != summaryRow) {
       // Add column schema , it is useful to generate segment properties in executor.
@@ -363,18 +375,9 @@ public class BlockletDataMap implements DataMap, Cacheable {
       if (schemaBinary != null) {
         summaryRow.setByteArray(schemaBinary, SCHEMA);
       }
-      if (partitions != null && partitions.size() > 0) {
-        CarbonRowSchema[] minSchemas =
-            ((CarbonRowSchema.StructCarbonRowSchema) unsafeMemorySummaryDMStore
-                .getSchema()[PARTITION_INFO]).getChildSchemas();
-        DataMapRow partitionRow = new DataMapRowImpl(minSchemas);
-        for (int i = 0; i < partitions.size(); i++) {
-          partitionRow
-              .setByteArray(partitions.get(i).getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS),
-                  i);
-        }
-        summaryRow.setRow(partitionRow, PARTITION_INFO);
-      }
+      summaryRow.setByteArray(filePath, INDEX_PATH);
+      summaryRow.setByteArray(fileName, INDEX_FILE_NAME);
+      summaryRow.setByteArray(segmentId, SEGMENTID);
       try {
         unsafeMemorySummaryDMStore.addIndexRowToUnsafe(summaryRow);
       } catch (Exception e) {
@@ -560,27 +563,25 @@ public class BlockletDataMap implements DataMap, Cacheable {
    * once per datamap. It stores datamap level max/min of each column and partition information of
    * datamap
    * @param segmentProperties
-   * @param partitions
    * @throws MemoryException
    */
-  private void createSummarySchema(SegmentProperties segmentProperties, List<String> partitions,
-      byte[] schemaBinary)
+  private void createSummarySchema(SegmentProperties segmentProperties, byte[] schemaBinary,
+      byte[] filePath, byte[] fileName, byte[] segmentId)
       throws MemoryException {
     List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>();
     getMinMaxSchema(segmentProperties, taskMinMaxSchemas);
     // for storing column schema
     taskMinMaxSchemas.add(
         new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, schemaBinary.length));
-    if (partitions != null && partitions.size() > 0) {
-      CarbonRowSchema[] mapSchemas = new CarbonRowSchema[partitions.size()];
-      for (int i = 0; i < mapSchemas.length; i++) {
-        mapSchemas[i] = new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY);
-      }
-      CarbonRowSchema mapSchema =
-          new CarbonRowSchema.StructCarbonRowSchema(DataTypes.createDefaultStructType(),
-              mapSchemas);
-      taskMinMaxSchemas.add(mapSchema);
-    }
+    // for storing file path
+    taskMinMaxSchemas.add(
+        new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, filePath.length));
+    // for storing file name
+    taskMinMaxSchemas.add(
+        new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, fileName.length));
+    // for storing segmentid
+    taskMinMaxSchemas.add(
+        new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, segmentId.length));
     unsafeMemorySummaryDMStore = new UnsafeMemoryDMStore(
         taskMinMaxSchemas.toArray(new CarbonRowSchema[taskMinMaxSchemas.size()]));
   }
@@ -660,22 +661,24 @@ public class BlockletDataMap implements DataMap, Cacheable {
     return blocklets;
   }
 
-  @Override public List<Blocklet> prune(FilterResolverIntf filterExp, List<String> partitions) {
+  @Override
+  public List<Blocklet> prune(FilterResolverIntf filterExp, List<PartitionSpec> partitions) {
     if (unsafeMemoryDMStore.getRowCount() == 0) {
       return new ArrayList<>();
     }
-    // First get the partitions which are stored inside datamap.
-    List<String> storedPartitions = getPartitions();
     // if it has partitioned datamap but there is no partitioned information stored, it means
     // partitions are dropped so return empty list.
-    if (isPartitionedSegment && (storedPartitions == null || storedPartitions.size() == 0)) {
-      return new ArrayList<>();
-    }
-    if (storedPartitions != null && storedPartitions.size() > 0) {
+    if (partitions != null) {
+      // First get the partitions which are stored inside datamap.
+      String[] fileDetails = getFileDetails();
       // Check the exact match of partition information inside the stored partitions.
       boolean found = false;
-      if (partitions != null && partitions.size() > 0) {
-        found = partitions.containsAll(storedPartitions);
+      Path folderPath = new Path(fileDetails[0]);
+      for (PartitionSpec spec : partitions) {
+        if (folderPath.equals(spec.getLocation()) && isCorrectUUID(fileDetails, spec)) {
+          found = true;
+          break;
+        }
       }
       if (!found) {
         return new ArrayList<>();
@@ -685,6 +688,20 @@ public class BlockletDataMap implements DataMap, Cacheable {
     return prune(filterExp);
   }
 
+  private boolean isCorrectUUID(String[] fileDetails, PartitionSpec spec) {
+    boolean needToScan = false;
+    if (spec.getUuid() != null) {
+      String[] split = spec.getUuid().split("_");
+      if (split[0].equals(fileDetails[2]) && CarbonTablePath.DataFileUtil
+          .getTimeStampFromFileName(fileDetails[1]).equals(split[1])) {
+        needToScan = true;
+      }
+    } else {
+      needToScan = true;
+    }
+    return needToScan;
+  }
+
   /**
    * select the blocks based on column min and max value
    *
@@ -767,6 +784,23 @@ public class BlockletDataMap implements DataMap, Cacheable {
     return blocklet;
   }
 
+  private String[] getFileDetails() {
+    try {
+      String[] fileDetails = new String[3];
+      DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0);
+      fileDetails[0] =
+          new String(unsafeRow.getByteArray(INDEX_PATH), CarbonCommonConstants.DEFAULT_CHARSET);
+      fileDetails[1] = new String(unsafeRow.getByteArray(INDEX_FILE_NAME),
+          CarbonCommonConstants.DEFAULT_CHARSET);
+      fileDetails[2] = new String(unsafeRow.getByteArray(SEGMENTID),
+          CarbonCommonConstants.DEFAULT_CHARSET);
+      return fileDetails;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+
   /**
    * Binary search used to get the first tentative index row based on
    * search key
@@ -874,20 +908,6 @@ public class BlockletDataMap implements DataMap, Cacheable {
     return dataMapRow;
   }
 
-  private List<String> getPartitions() {
-    DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0);
-    if (unsafeRow.getColumnCount() > PARTITION_INFO) {
-      List<String> partitions = new ArrayList<>();
-      DataMapRow row = unsafeRow.getRow(PARTITION_INFO);
-      for (int i = 0; i < row.getColumnCount(); i++) {
-        partitions.add(
-            new String(row.getByteArray(i), CarbonCommonConstants.DEFAULT_CHARSET_CLASS));
-      }
-      return partitions;
-    }
-    return null;
-  }
-
   private byte[] getColumnSchemaBinary() {
     DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0);
     return unsafeRow.getByteArray(SCHEMA);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
index 63f45b5..99e48a5 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapDistributable.java
@@ -31,8 +31,8 @@ public class BlockletDataMapDistributable extends DataMapDistributable {
    */
   private String filePath;
 
-  public BlockletDataMapDistributable(String indexFileName) {
-    this.filePath = indexFileName;
+  public BlockletDataMapDistributable(String indexFilePath) {
+    this.filePath = indexFilePath;
   }
 
   public String getFilePath() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 2e2cab5..5eb077f 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
@@ -37,6 +38,7 @@ import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
@@ -65,30 +67,40 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
   }
 
   @Override
-  public DataMapWriter createWriter(String segmentId) {
+  public DataMapWriter createWriter(Segment segment) {
     throw new UnsupportedOperationException("not implemented");
   }
 
   @Override
-  public List<DataMap> getDataMaps(String segmentId) throws IOException {
+  public List<DataMap> getDataMaps(Segment segment) throws IOException {
     List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
-        getTableBlockIndexUniqueIdentifiers(segmentId);
+        getTableBlockIndexUniqueIdentifiers(segment);
     return cache.getAll(tableBlockIndexUniqueIdentifiers);
   }
 
   private List<TableBlockIndexUniqueIdentifier> getTableBlockIndexUniqueIdentifiers(
-      String segmentId) throws IOException {
+      Segment segment) throws IOException {
     List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
-        segmentMap.get(segmentId);
+        segmentMap.get(segment.getSegmentNo());
     if (tableBlockIndexUniqueIdentifiers == null) {
       tableBlockIndexUniqueIdentifiers = new ArrayList<>();
-      String path = CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId);
-      List<String> indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(path);
-      for (int i = 0; i < indexFiles.size(); i++) {
+      Map<String, String> indexFiles;
+      if (segment.getSegmentFileName() == null) {
+        String path =
+            CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
+        indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(path);
+      } else {
+        SegmentFileStore fileStore =
+            new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
+        indexFiles = fileStore.getIndexFiles();
+      }
+      for (Map.Entry<String, String> indexFileEntry: indexFiles.entrySet()) {
+        Path indexFile = new Path(indexFileEntry.getKey());
         tableBlockIndexUniqueIdentifiers.add(
-            new TableBlockIndexUniqueIdentifier(identifier, segmentId, indexFiles.get(i)));
+            new TableBlockIndexUniqueIdentifier(indexFile.getParent().toString(),
+                indexFile.getName(), indexFileEntry.getValue(), segment.getSegmentNo()));
       }
-      segmentMap.put(segmentId, tableBlockIndexUniqueIdentifiers);
+      segmentMap.put(segment.getSegmentNo(), tableBlockIndexUniqueIdentifiers);
     }
     return tableBlockIndexUniqueIdentifiers;
   }
@@ -99,7 +111,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
    * datamap.
    */
   @Override
-  public List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, String segmentId)
+  public List<ExtendedBlocklet> getExtendedBlocklets(List<Blocklet> blocklets, Segment segment)
       throws IOException {
     List<ExtendedBlocklet> detailedBlocklets = new ArrayList<>();
     // If it is already detailed blocklet then type cast and return same
@@ -110,7 +122,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
       return detailedBlocklets;
     }
     List<TableBlockIndexUniqueIdentifier> identifiers =
-        getTableBlockIndexUniqueIdentifiers(segmentId);
+        getTableBlockIndexUniqueIdentifiers(segment);
     // Retrieve each blocklets detail information from blocklet datamap
     for (Blocklet blocklet : blocklets) {
       detailedBlocklets.add(getExtendedBlocklet(identifiers, blocklet));
@@ -119,13 +131,13 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
   }
 
   @Override
-  public ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, String segmentId)
+  public ExtendedBlocklet getExtendedBlocklet(Blocklet blocklet, Segment segment)
       throws IOException {
     if (blocklet instanceof ExtendedBlocklet) {
       return (ExtendedBlocklet) blocklet;
     }
     List<TableBlockIndexUniqueIdentifier> identifiers =
-        getTableBlockIndexUniqueIdentifiers(segmentId);
+        getTableBlockIndexUniqueIdentifiers(segment);
     return getExtendedBlocklet(identifiers, blocklet);
   }
 
@@ -133,7 +145,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
       Blocklet blocklet) throws IOException {
     String carbonIndexFileName = CarbonTablePath.getCarbonIndexFileName(blocklet.getPath());
     for (TableBlockIndexUniqueIdentifier identifier : identifiers) {
-      if (identifier.getCarbonIndexFileName().equals(carbonIndexFileName)) {
+      if (identifier.getIndexFilePath().equals(carbonIndexFileName)) {
         DataMap dataMap = cache.get(identifier);
         return ((BlockletDataMap) dataMap).getDetailedBlocklet(blocklet.getBlockletId());
       }
@@ -141,36 +153,51 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
     throw new IOException("Blocklet with blockid " + blocklet.getPath() + " not found ");
   }
 
-
   @Override
-  public List<DataMapDistributable> toDistributable(String segmentId) {
-    CarbonFile[] carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentId);
+  public List<DataMapDistributable> toDistributable(Segment segment) {
     List<DataMapDistributable> distributables = new ArrayList<>();
-    for (int i = 0; i < carbonIndexFiles.length; i++) {
-      Path path = new Path(carbonIndexFiles[i].getPath());
-      try {
+    try {
+      CarbonFile[] carbonIndexFiles;
+      if (segment.getSegmentFileName() == null) {
+        carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(
+            CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo()));
+      } else {
+        SegmentFileStore fileStore =
+            new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
+        Map<String, String> indexFiles = fileStore.getIndexFiles();
+        carbonIndexFiles = new CarbonFile[indexFiles.size()];
+        int i = 0;
+        for (String indexFile : indexFiles.keySet()) {
+          carbonIndexFiles[i++] = FileFactory.getCarbonFile(indexFile);
+        }
+      }
+      for (int i = 0; i < carbonIndexFiles.length; i++) {
+        Path path = new Path(carbonIndexFiles[i].getPath());
+
         FileSystem fs = path.getFileSystem(FileFactory.getConfiguration());
         RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
         LocatedFileStatus fileStatus = iter.next();
         String[] location = fileStatus.getBlockLocations()[0].getHosts();
         BlockletDataMapDistributable distributable =
-            new BlockletDataMapDistributable(path.getName());
+            new BlockletDataMapDistributable(path.toString());
         distributable.setLocations(location);
         distributables.add(distributable);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
+
       }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
     }
     return distributables;
   }
 
-  @Override public void fireEvent(Event event) {
+  @Override
+  public void fireEvent(Event event) {
 
   }
 
   @Override
-  public void clear(String segmentId) {
-    List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segmentId);
+  public void clear(Segment segment) {
+    List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segment.getSegmentNo());
     if (blockIndexes != null) {
       for (TableBlockIndexUniqueIdentifier blockIndex : blockIndexes) {
         DataMap dataMap = cache.getIfPresent(blockIndex);
@@ -185,7 +212,7 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
   @Override
   public void clear() {
     for (String segmentId : segmentMap.keySet().toArray(new String[segmentMap.size()])) {
-      clear(segmentId);
+      clear(new Segment(segmentId, null));
     }
   }
 
@@ -193,18 +220,21 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
   public List<DataMap> getDataMaps(DataMapDistributable distributable) throws IOException {
     BlockletDataMapDistributable mapDistributable = (BlockletDataMapDistributable) distributable;
     List<TableBlockIndexUniqueIdentifier> identifiers = new ArrayList<>();
-    if (mapDistributable.getFilePath().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
-      identifiers.add(new TableBlockIndexUniqueIdentifier(identifier, distributable.getSegmentId(),
-          mapDistributable.getFilePath()));
-    } else if (mapDistributable.getFilePath().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+    Path indexPath = new Path(mapDistributable.getFilePath());
+    String segmentNo = mapDistributable.getSegment().getSegmentNo();
+    if (indexPath.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+      String parent = indexPath.getParent().toString();
+      identifiers
+          .add(new TableBlockIndexUniqueIdentifier(parent, indexPath.getName(), null, segmentNo));
+    } else if (indexPath.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
       SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
-      List<String> indexFiles = fileStore.getIndexFilesFromMergeFile(
-          CarbonTablePath.getSegmentPath(identifier.getTablePath(), mapDistributable.getSegmentId())
-              + "/" + mapDistributable.getFilePath());
+      CarbonFile carbonFile = FileFactory.getCarbonFile(indexPath.toString());
+      String parentPath = carbonFile.getParentFile().getAbsolutePath();
+      List<String> indexFiles = fileStore.getIndexFilesFromMergeFile(carbonFile.getAbsolutePath());
       for (String indexFile : indexFiles) {
         identifiers.add(
-            new TableBlockIndexUniqueIdentifier(identifier, distributable.getSegmentId(),
-                indexFile));
+            new TableBlockIndexUniqueIdentifier(parentPath, indexFile, carbonFile.getName(),
+                segmentNo));
       }
     }
     List<DataMap> dataMaps;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
index b3a7f8c..ebeb278 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
@@ -16,7 +16,6 @@
  */
 package org.apache.carbondata.core.indexstore.blockletindex;
 
-import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
@@ -29,35 +28,27 @@ public class BlockletDataMapModel extends DataMapModel {
 
   private byte[] fileData;
 
-  private List<String> partitions;
+  private Map<String, BlockMetaInfo> blockMetaInfoMap;
 
-  private boolean partitionedSegment;
+  private String segmentId;
 
-  Map<String, BlockMetaInfo> blockMetaInfoMap;
-
-  public BlockletDataMapModel(String filePath, byte[] fileData, List<String> partitions,
-      boolean partitionedSegment,
-      Map<String, BlockMetaInfo> blockMetaInfoMap) {
+  public BlockletDataMapModel(String filePath, byte[] fileData,
+      Map<String, BlockMetaInfo> blockMetaInfoMap, String segmentId) {
     super(filePath);
     this.fileData = fileData;
-    this.partitions = partitions;
-    this.partitionedSegment = partitionedSegment;
     this.blockMetaInfoMap = blockMetaInfoMap;
+    this.segmentId = segmentId;
   }
 
   public byte[] getFileData() {
     return fileData;
   }
 
-  public List<String> getPartitions() {
-    return partitions;
-  }
-
-  public boolean isPartitionedSegment() {
-    return partitionedSegment;
-  }
-
   public Map<String, BlockMetaInfo> getBlockMetaInfoMap() {
     return blockMetaInfoMap;
   }
+
+  public String getSegmentId() {
+    return segmentId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index a30b04c..b88c1f4 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -28,10 +28,12 @@ import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.reader.CarbonIndexFileReader;
 import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataFileFooterConverter;
@@ -58,8 +60,14 @@ public class SegmentIndexFileStore {
    */
   private Map<String, byte[]> carbonIndexMap;
 
+  /**
+   * Stores the indexfile name and related binary file data in it.
+   */
+  private Map<String, byte[]> carbonIndexMapWithFullPath;
+
   public SegmentIndexFileStore() {
     carbonIndexMap = new HashMap<>();
+    carbonIndexMapWithFullPath = new HashMap<>();
   }
 
   /**
@@ -80,6 +88,45 @@ public class SegmentIndexFileStore {
   }
 
   /**
+   * Read all index files and keep the cache in it.
+   *
+   * @param segmentFileStore
+   * @throws IOException
+   */
+  public void readAllIIndexOfSegment(SegmentFileStore segmentFileStore, SegmentStatus status,
+      boolean ignoreStatus) throws IOException {
+    List<CarbonFile> carbonIndexFiles = new ArrayList<>();
+    if (segmentFileStore.getLocationMap() == null) {
+      return;
+    }
+    for (Map.Entry<String, SegmentFileStore.FolderDetails> locations : segmentFileStore
+        .getLocationMap().entrySet()) {
+      String location = locations.getKey();
+
+      if (locations.getValue().getStatus().equals(status.getMessage()) || ignoreStatus) {
+        if (locations.getValue().isRelative()) {
+          location =
+              segmentFileStore.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + location;
+        }
+        for (String indexFile : locations.getValue().getFiles()) {
+          CarbonFile carbonFile = FileFactory
+              .getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + indexFile);
+          if (carbonFile.exists()) {
+            carbonIndexFiles.add(carbonFile);
+          }
+        }
+      }
+    }
+    for (int i = 0; i < carbonIndexFiles.size(); i++) {
+      if (carbonIndexFiles.get(i).getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+        readMergeFile(carbonIndexFiles.get(i).getCanonicalPath());
+      } else if (carbonIndexFiles.get(i).getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+        readIndexFile(carbonIndexFiles.get(i));
+      }
+    }
+  }
+
+  /**
    * read index file and fill the blocklet information
    *
    * @param segmentPath
@@ -120,17 +167,22 @@ public class SegmentIndexFileStore {
    * @return
    * @throws IOException
    */
-  public List<String> getIndexFilesFromSegment(String segmentPath) throws IOException {
+  public Map<String, String> getIndexFilesFromSegment(String segmentPath) throws IOException {
     CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath);
-    Set<String> indexFiles = new HashSet<>();
+    Map<String, String> indexFiles = new HashMap<>();
     for (int i = 0; i < carbonIndexFiles.length; i++) {
       if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
-        indexFiles.addAll(getIndexFilesFromMergeFile(carbonIndexFiles[i].getCanonicalPath()));
+        List<String> indexFilesFromMergeFile =
+            getIndexFilesFromMergeFile(carbonIndexFiles[i].getCanonicalPath());
+        for (String file: indexFilesFromMergeFile) {
+          indexFiles.put(carbonIndexFiles[i].getParentFile().getAbsolutePath()
+              + CarbonCommonConstants.FILE_SEPARATOR + file, carbonIndexFiles[i].getName());
+        }
       } else if (carbonIndexFiles[i].getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
-        indexFiles.add(carbonIndexFiles[i].getName());
+        indexFiles.put(carbonIndexFiles[i].getAbsolutePath(), null);
       }
     }
-    return new ArrayList<>(indexFiles);
+    return indexFiles;
   }
 
   /**
@@ -156,16 +208,23 @@ public class SegmentIndexFileStore {
    */
   private void readMergeFile(String mergeFilePath) throws IOException {
     ThriftReader thriftReader = new ThriftReader(mergeFilePath);
-    thriftReader.open();
-    MergedBlockIndexHeader indexHeader = readMergeBlockIndexHeader(thriftReader);
-    MergedBlockIndex mergedBlockIndex = readMergeBlockIndex(thriftReader);
-    List<String> file_names = indexHeader.getFile_names();
-    List<ByteBuffer> fileData = mergedBlockIndex.getFileData();
-    assert (file_names.size() == fileData.size());
-    for (int i = 0; i < file_names.size(); i++) {
-      carbonIndexMap.put(file_names.get(i), fileData.get(i).array());
+    try {
+      thriftReader.open();
+      MergedBlockIndexHeader indexHeader = readMergeBlockIndexHeader(thriftReader);
+      MergedBlockIndex mergedBlockIndex = readMergeBlockIndex(thriftReader);
+      List<String> file_names = indexHeader.getFile_names();
+      List<ByteBuffer> fileData = mergedBlockIndex.getFileData();
+      CarbonFile mergeFile = FileFactory.getCarbonFile(mergeFilePath);
+      assert (file_names.size() == fileData.size());
+      for (int i = 0; i < file_names.size(); i++) {
+        carbonIndexMap.put(file_names.get(i), fileData.get(i).array());
+        carbonIndexMapWithFullPath.put(
+            mergeFile.getParentFile().getAbsolutePath() + CarbonCommonConstants.FILE_SEPARATOR
+                + file_names.get(i), fileData.get(i).array());
+      }
+    } finally {
+      thriftReader.close();
     }
-    thriftReader.close();
   }
 
   /**
@@ -181,6 +240,9 @@ public class SegmentIndexFileStore {
     byte[] bytes = new byte[(int) indexFile.getSize()];
     dataInputStream.readFully(bytes);
     carbonIndexMap.put(indexFile.getName(), bytes);
+    carbonIndexMapWithFullPath.put(
+        indexFile.getParentFile().getAbsolutePath() + CarbonCommonConstants.FILE_SEPARATOR
+            + indexFile.getName(), bytes);
     dataInputStream.close();
   }
 
@@ -253,6 +315,10 @@ public class SegmentIndexFileStore {
     return carbonIndexMap;
   }
 
+  public Map<String, byte[]> getCarbonIndexMapWithFullPath() {
+    return carbonIndexMapWithFullPath;
+  }
+
   /**
    * This method will read the index information from carbon index file
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
index cc98b03..be98f7d 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 
@@ -102,21 +101,6 @@ public class HdfsFileLock extends AbstractCarbonLock {
         status = true;
       } catch (IOException e) {
         status = false;
-      } finally {
-        CarbonFile carbonFile =
-            FileFactory.getCarbonFile(location, FileFactory.getFileType(location));
-        if (carbonFile.exists()) {
-          if (carbonFile.delete()) {
-            LOGGER.info("Deleted the lock file " + location);
-          } else {
-            LOGGER.error("Not able to delete the lock file " + location);
-            status = false;
-          }
-        } else {
-          LOGGER.error("Not able to delete the lock file because "
-              + "it is not existed in location " + location);
-          status = false;
-        }
       }
     }
     return status;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
index 4fee4c4..75ea074 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
@@ -26,7 +26,6 @@ import java.nio.channels.OverlappingFileLockException;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 
@@ -121,7 +120,6 @@ public class LocalFileLock extends AbstractCarbonLock {
       LOGGER.info(e.getMessage());
       return false;
     }
-
   }
 
   /**
@@ -130,27 +128,18 @@ public class LocalFileLock extends AbstractCarbonLock {
    * @return
    */
   @Override public boolean unlock() {
-    boolean status;
+    boolean status = false;
     try {
       if (null != fileLock) {
         fileLock.release();
+        status = true;
       }
-      status = true;
     } catch (IOException e) {
       status = false;
     } finally {
       if (null != fileOutputStream) {
         try {
           fileOutputStream.close();
-          // deleting the lock file after releasing the lock.
-          CarbonFile lockFile = FileFactory
-              .getCarbonFile(lockFilePath, FileFactory.getFileType(lockFilePath));
-          if (!lockFile.exists() || lockFile.delete()) {
-            LOGGER.info("Successfully deleted the lock file " + lockFilePath);
-          } else {
-            LOGGER.error("Not able to delete the lock file " + lockFilePath);
-            status = false;
-          }
         } catch (IOException e) {
           LOGGER.error(e.getMessage());
         }
@@ -158,5 +147,4 @@ public class LocalFileLock extends AbstractCarbonLock {
     }
     return status;
   }
-
 }