You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/06/30 17:41:55 UTC

[08/50] [abbrv] incubator-carbondata git commit: Fixed driver btree performance issue (#729)

Fixed driver btree performance issue (#729)

LGTM

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/82332b0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/82332b0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/82332b0e

Branch: refs/heads/master
Commit: 82332b0e4381ede8a39789ccf1d917af0c89ab39
Parents: be46423
Author: Kumar Vishal <ku...@gmail.com>
Authored: Sat Jun 25 16:05:07 2016 +0800
Committer: sujith71955 <su...@gmail.com>
Committed: Sat Jun 25 16:05:07 2016 +0800

----------------------------------------------------------------------
 .../carbon/datastore/SegmentTaskIndexStore.java |  21 ++-
 .../carbon/metadata/index/BlockIndexInfo.java   |  92 ++++++++++++
 .../core/carbon/path/CarbonTablePath.java       |  74 +++++++---
 .../core/reader/CarbonIndexFileReader.java      |  95 ++++++++++++
 .../carbondata/core/reader/ThriftReader.java    |  47 ++++--
 .../core/util/CarbonMetadataUtil.java           |  78 ++++++++--
 .../org/carbondata/core/util/CarbonUtil.java    |  35 +++++
 .../core/writer/CarbonIndexFileWriter.java      |  64 +++++++++
 .../query/util/DataFileFooterConverter.java     |  44 ++++++
 .../datastore/SegmentTaskIndexStoreTest.java    | 143 -------------------
 format/src/main/thrift/carbondataindex.thrift   |  45 ++++++
 .../org/carbondata/hadoop/CarbonPathFilter.java |   4 +-
 .../store/writer/AbstractFactDataWriter.java    | 135 ++++++++++++++---
 13 files changed, 653 insertions(+), 224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82332b0e/core/src/main/java/org/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
index fde062c..28a892e 100644
--- a/core/src/main/java/org/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
+++ b/core/src/main/java/org/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java
@@ -116,6 +116,7 @@ public class SegmentTaskIndexStore {
         addTableSegmentMap(absoluteTableIdentifier);
     Map<String, AbstractIndex> taskIdToSegmentIndexMap = null;
     String segmentId = null;
+    String taskId = null;
     try {
       while (iteratorOverSegmentBlocksInfos.hasNext()) {
         // segment id to table block mapping
@@ -147,8 +148,9 @@ public class SegmentTaskIndexStore {
                   taskIdToTableBlockInfoMap.entrySet().iterator();
               while (iterator.hasNext()) {
                 Entry<String, List<TableBlockInfo>> taskToBlockInfoList = iterator.next();
-                taskIdToSegmentIndexMap
-                    .put(taskToBlockInfoList.getKey(), loadBlocks(taskToBlockInfoList.getValue()));
+                taskId = taskToBlockInfoList.getKey();
+                taskIdToSegmentIndexMap.put(taskId,
+                    loadBlocks(taskId, taskToBlockInfoList.getValue(), absoluteTableIdentifier));
               }
               // removing from segment lock map as once segment is loaded
               //if concurrent query is coming for same segment
@@ -231,19 +233,12 @@ public class SegmentTaskIndexStore {
    * @return loaded segment
    * @throws CarbonUtilException
    */
-  private AbstractIndex loadBlocks(List<TableBlockInfo> tableBlockInfoList)
-      throws CarbonUtilException {
-    DataFileFooter footer = null;
+  private AbstractIndex loadBlocks(String taskId, List<TableBlockInfo> tableBlockInfoList,
+      AbsoluteTableIdentifier tableIdentifier) throws CarbonUtilException {
     // all the block of one task id will be loaded together
     // so creating a list which will have all the data file meta data to of one task
-    List<DataFileFooter> footerList = new ArrayList<DataFileFooter>();
-    for (TableBlockInfo tableBlockInfo : tableBlockInfoList) {
-      footer = CarbonUtil
-          .readMetadatFile(tableBlockInfo.getFilePath(), tableBlockInfo.getBlockOffset(),
-              tableBlockInfo.getBlockLength());
-      footer.setTableBlockInfo(tableBlockInfo);
-      footerList.add(footer);
-    }
+    List<DataFileFooter> footerList =
+        CarbonUtil.readCarbonIndexFile(taskId, tableBlockInfoList, tableIdentifier);
     AbstractIndex segment = new SegmentTaskIndex();
     // file path of only first block is passed as it all table block info path of
     // same task id will be same

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82332b0e/core/src/main/java/org/carbondata/core/carbon/metadata/index/BlockIndexInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/carbon/metadata/index/BlockIndexInfo.java b/core/src/main/java/org/carbondata/core/carbon/metadata/index/BlockIndexInfo.java
new file mode 100644
index 0000000..bfed3dd
--- /dev/null
+++ b/core/src/main/java/org/carbondata/core/carbon/metadata/index/BlockIndexInfo.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.core.carbon.metadata.index;
+
+import org.carbondata.core.carbon.metadata.blocklet.index.BlockletIndex;
+
+/**
+ * Below class will be used hold the information
+ * about block index
+ */
+public class BlockIndexInfo {
+
+  /**
+   * total number of rows present in the file
+   */
+  private long numberOfRows;
+
+  /**
+   * file name
+   */
+  private String fileName;
+
+  /**
+   * offset of metadata in data file
+   */
+  private long offset;
+
+  /**
+   * to store min max and start and end key
+   */
+  private BlockletIndex blockletIndex;
+
+  /**
+   * Constructor
+   *
+   * @param numberOfRows  number of rows
+   * @param fileName      full qualified name
+   * @param offset        offset of the metadata in data file
+   * @param blockletIndex block let index
+   */
+  public BlockIndexInfo(long numberOfRows, String fileName, long offset,
+      BlockletIndex blockletIndex) {
+    this.numberOfRows = numberOfRows;
+    this.fileName = fileName;
+    this.offset = offset;
+    this.blockletIndex = blockletIndex;
+  }
+
+  /**
+   * @return the numberOfRows
+   */
+  public long getNumberOfRows() {
+    return numberOfRows;
+  }
+
+  /**
+   * @return the fileName
+   */
+  public String getFileName() {
+    return fileName;
+  }
+
+  /**
+   * @return the offset
+   */
+  public long getOffset() {
+    return offset;
+  }
+
+  /**
+   * @return the blockletIndex
+   */
+  public BlockletIndex getBlockletIndex() {
+    return blockletIndex;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82332b0e/core/src/main/java/org/carbondata/core/carbon/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/carbon/path/CarbonTablePath.java b/core/src/main/java/org/carbondata/core/carbon/path/CarbonTablePath.java
index 31bc464..8dcd207 100644
--- a/core/src/main/java/org/carbondata/core/carbon/path/CarbonTablePath.java
+++ b/core/src/main/java/org/carbondata/core/carbon/path/CarbonTablePath.java
@@ -22,11 +22,14 @@ import java.io.File;
 
 import org.carbondata.core.constants.CarbonCommonConstants;
 import org.carbondata.core.datastorage.store.filesystem.CarbonFile;
+import org.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
+import org.carbondata.core.datastorage.store.impl.FileFactory;
 
 import static org.carbondata.core.constants.CarbonCommonConstants.INVALID_SEGMENT_ID;
 
 import org.apache.hadoop.fs.Path;
 
+
 /**
  * Helps to get Table content paths.
  */
@@ -44,6 +47,7 @@ public class CarbonTablePath extends Path {
   private static final String PARTITION_PREFIX = "Part";
   private static final String CARBON_DATA_EXT = ".carbondata";
   private static final String DATA_PART_PREFIX = "part";
+  private static final String INDEX_FILE_EXT = ".carbonindex";
 
   private String tablePath;
 
@@ -62,13 +66,6 @@ public class CarbonTablePath extends Path {
   }
 
   /**
-   * gets table path
-   */
-  public String getPath() {
-    return tablePath;
-  }
-
-  /**
    * @param columnId unique column identifier
    * @return name of dictionary file
    */
@@ -78,6 +75,7 @@ public class CarbonTablePath extends Path {
 
   /**
    * whether carbonFile is dictionary file or not
+   *
    * @param carbonFile
    * @return
    */
@@ -86,6 +84,27 @@ public class CarbonTablePath extends Path {
   }
 
   /**
+   * check if it is carbon data file matching extension
+   *
+   * @param fileNameWithPath
+   * @return boolean
+   */
+  public static boolean isCarbonDataFile(String fileNameWithPath) {
+    int pos = fileNameWithPath.lastIndexOf('.');
+    if (pos != -1) {
+      return fileNameWithPath.substring(pos).startsWith(CARBON_DATA_EXT);
+    }
+    return false;
+  }
+
+  /**
+   * gets table path
+   */
+  public String getPath() {
+    return tablePath;
+  }
+
+  /**
    * @param columnId unique column identifier
    * @return absolute path of dictionary file
    */
@@ -148,6 +167,29 @@ public class CarbonTablePath extends Path {
   }
 
   /**
+   * Below method will be used to get the index file present in the segment folder
+   * based on task id
+   *
+   * @param taskId      task id of the file
+   * @param partitionId partition number
+   * @param segmentId   segment number
+   * @return full qualified carbon index path
+   */
+  public String getCarbonIndexFilePath(final String taskId, final String partitionId,
+      final String segmentId) {
+    String segmentDir = getSegmentDir(partitionId, segmentId);
+    CarbonFile carbonFile =
+        FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir));
+
+    CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() {
+      @Override public boolean accept(CarbonFile file) {
+        return file.getName().startsWith(taskId) && file.getName().endsWith(INDEX_FILE_EXT);
+      }
+    });
+    return files[0].getAbsolutePath();
+  }
+
+  /**
    * Gets absolute path of data file
    *
    * @param partitionId unique partition identifier
@@ -189,16 +231,14 @@ public class CarbonTablePath extends Path {
   }
 
   /**
-   * check if it is carbon data file matching extension
-   * @param fileNameWithPath
-   * @return boolean
+   * Below method will be used to get the carbon index filename
+   *
+   * @param taskNo               task number
+   * @param factUpdatedTimeStamp time stamp
+   * @return filename
    */
-  public static boolean isCarbonDataFile(String fileNameWithPath) {
-    int pos = fileNameWithPath.lastIndexOf('.');
-    if( pos != -1 ) {
-      return fileNameWithPath.substring(pos).startsWith(CARBON_DATA_EXT);
-    }
-    return false;
+  public String getCarbonIndexFileName(int taskNo, String factUpdatedTimeStamp) {
+    return taskNo + "-" + factUpdatedTimeStamp + INDEX_FILE_EXT;
   }
 
   private String getSegmentDir(String partitionId, String segmentId) {
@@ -234,7 +274,7 @@ public class CarbonTablePath extends Path {
     if (!(o instanceof CarbonTablePath)) {
       return false;
     }
-    CarbonTablePath path = (CarbonTablePath)o;
+    CarbonTablePath path = (CarbonTablePath) o;
     return tablePath.equals(path.tablePath) && super.equals(o);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82332b0e/core/src/main/java/org/carbondata/core/reader/CarbonIndexFileReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/reader/CarbonIndexFileReader.java b/core/src/main/java/org/carbondata/core/reader/CarbonIndexFileReader.java
new file mode 100644
index 0000000..bb18e9f
--- /dev/null
+++ b/core/src/main/java/org/carbondata/core/reader/CarbonIndexFileReader.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.core.reader;
+
+import java.io.IOException;
+
+import org.carbondata.format.BlockIndex;
+import org.carbondata.format.IndexHeader;
+
+import org.apache.thrift.TBase;
+
+/**
+ * Reader class which will be used to read the index file
+ */
+public class CarbonIndexFileReader {
+
+  /**
+   * reader
+   */
+  private ThriftReader thriftReader;
+
+  /**
+   * Below method will be used to read the index header
+   *
+   * @return index header
+   * @throws IOException if any problem  while reader the header
+   */
+  public IndexHeader readIndexHeader() throws IOException {
+    IndexHeader indexHeader = (IndexHeader) thriftReader.read(new ThriftReader.TBaseCreator() {
+      @Override public TBase create() {
+        return new IndexHeader();
+      }
+    });
+    return indexHeader;
+  }
+
+  /**
+   * Below method will be used to close the reader
+   */
+  public void closeThriftReader() {
+    thriftReader.close();
+  }
+
+  /**
+   * Below method will be used to read the block index from fie
+   *
+   * @return block index info
+   * @throws IOException if problem while reading the block index
+   */
+  public BlockIndex readBlockIndexInfo() throws IOException {
+    BlockIndex blockInfo = (BlockIndex) thriftReader.read(new ThriftReader.TBaseCreator() {
+      @Override public TBase create() {
+        return new BlockIndex();
+      }
+    });
+    return blockInfo;
+  }
+
+  /**
+   * Open the thrift reader
+   *
+   * @param filePath
+   * @throws IOException
+   */
+  public void openThriftReader(String filePath) throws IOException {
+    thriftReader = new ThriftReader(filePath);
+    thriftReader.open();
+  }
+
+  /**
+   * check if any more object is present
+   *
+   * @return true if any more object can be read
+   * @throws IOException
+   */
+  public boolean hasNext() throws IOException {
+    return thriftReader.hasNext();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82332b0e/core/src/main/java/org/carbondata/core/reader/ThriftReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/reader/ThriftReader.java b/core/src/main/java/org/carbondata/core/reader/ThriftReader.java
index 3683a2b..e659919 100644
--- a/core/src/main/java/org/carbondata/core/reader/ThriftReader.java
+++ b/core/src/main/java/org/carbondata/core/reader/ThriftReader.java
@@ -35,34 +35,21 @@ import org.apache.thrift.transport.TIOStreamTransport;
  */
 public class ThriftReader {
   /**
-   * Thrift deserializes by taking an existing object and populating it. ThriftReader
-   * needs a way of obtaining instances of the class to be populated and this interface
-   * defines the mechanism by which a client provides these instances.
-   */
-  public static interface TBaseCreator {
-    TBase create();
-  }
-
-  /**
    * buffer size
    */
   private static final int bufferSize = 2048;
-
   /**
    * File containing the objects.
    */
   private String fileName;
-
   /**
    * Used to create empty objects that will be initialized with values from the fileName.
    */
-  private final TBaseCreator creator;
-
+  private TBaseCreator creator;
   /**
    * For reading the fileName.
    */
   private DataInputStream dataInputStream;
-
   /**
    * For reading the binary thrift objects.
    */
@@ -77,6 +64,13 @@ public class ThriftReader {
   }
 
   /**
+   * Constructor.
+   */
+  public ThriftReader(String fileName) {
+    this.fileName = fileName;
+  }
+
+  /**
    * Opens the fileName for reading.
    */
   public void open() throws IOException {
@@ -118,9 +112,34 @@ public class ThriftReader {
   }
 
   /**
+   * Reads the next object from the fileName.
+   *
+   * @param creator type of object which will be returned
+   * @throws IOException any problem while reading
+   */
+  public TBase read(TBaseCreator creator) throws IOException {
+    TBase t = creator.create();
+    try {
+      t.read(binaryIn);
+    } catch (TException e) {
+      throw new IOException(e);
+    }
+    return t;
+  }
+
+  /**
    * Close the fileName.
    */
   public void close() {
     CarbonUtil.closeStreams(dataInputStream);
   }
+
+  /**
+   * Thrift deserializes by taking an existing object and populating it. ThriftReader
+   * needs a way of obtaining instances of the class to be populated and this interface
+   * defines the mechanism by which a client provides these instances.
+   */
+  public static interface TBaseCreator {
+    TBase create();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82332b0e/core/src/main/java/org/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/carbondata/core/util/CarbonMetadataUtil.java
index 5b9c291..5e8e8a2 100644
--- a/core/src/main/java/org/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/carbondata/core/util/CarbonMetadataUtil.java
@@ -11,10 +11,12 @@ import java.util.List;
 
 import org.carbondata.common.logging.LogService;
 import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.carbon.metadata.index.BlockIndexInfo;
 import org.carbondata.core.constants.CarbonCommonConstants;
 import org.carbondata.core.datastorage.store.compression.ValueCompressionModel;
 import org.carbondata.core.metadata.BlockletInfoColumnar;
 import org.carbondata.core.metadata.ValueEncoderMeta;
+import org.carbondata.format.BlockIndex;
 import org.carbondata.format.BlockletBTreeIndex;
 import org.carbondata.format.BlockletIndex;
 import org.carbondata.format.BlockletInfo;
@@ -25,6 +27,7 @@ import org.carbondata.format.CompressionCodec;
 import org.carbondata.format.DataChunk;
 import org.carbondata.format.Encoding;
 import org.carbondata.format.FileFooter;
+import org.carbondata.format.IndexHeader;
 import org.carbondata.format.PresenceMeta;
 import org.carbondata.format.SegmentInfo;
 import org.carbondata.format.SortState;
@@ -68,6 +71,23 @@ public class CarbonMetadataUtil {
     return footer;
   }
 
+  private static BlockletIndex getBlockletIndex(
+      org.carbondata.core.carbon.metadata.blocklet.index.BlockletIndex info) {
+    BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex();
+
+    for (int i = 0; i < info.getMinMaxIndex().getMaxValues().length; i++) {
+      blockletMinMaxIndex.addToMax_values(ByteBuffer.wrap(info.getMinMaxIndex().getMaxValues()[i]));
+      blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(info.getMinMaxIndex().getMinValues()[i]));
+    }
+    BlockletBTreeIndex blockletBTreeIndex = new BlockletBTreeIndex();
+    blockletBTreeIndex.setStart_key(info.getBtreeIndex().getStartKey());
+    blockletBTreeIndex.setEnd_key(info.getBtreeIndex().getEndKey());
+    BlockletIndex blockletIndex = new BlockletIndex();
+    blockletIndex.setMin_max_index(blockletMinMaxIndex);
+    blockletIndex.setB_tree_index(blockletBTreeIndex);
+    return blockletIndex;
+  }
+
   /**
    * Get total number of rows for the file.
    *
@@ -334,21 +354,53 @@ public class CarbonMetadataUtil {
         min[j] = minMaxIndexList.getMin_values().get(j).array();
         max[j] = minMaxIndexList.getMax_values().get(j).array();
       }
-
-      //      byte[][] min = new byte[minMaxIndexList.getMin_values().size()][];
-      //      List<ByteBuffer> minValues = minMaxIndexList.getMin_values();
-      //      for (int j = 0; j < minValues.size(); j++) {
-      //        min[j] = minValues.get(j).array();
-      //      }
-      //      listOfNodeInfo.get(i).setColumnMinData(min);
-      //
-      //      byte[][] max = new byte[minMaxIndexList.getMax_values().size()][];
-      //      List<ByteBuffer> maxValues = minMaxIndexList.getMax_values();
-      //      for (int j = 0; j < maxValues.size(); j++) {
-      //        max[j] = maxValues.get(j).array();
-      //    }
       listOfNodeInfo.get(i).setColumnMaxData(max);
     }
   }
 
+  /**
+   * Below method will be used to get the index header
+   *
+   * @param columnCardinality cardinality of each column
+   * @param columnSchemaList  list of column present in the table
+   * @return Index header object
+   */
+  public static IndexHeader getIndexHeader(int[] columnCardinality,
+      List<ColumnSchema> columnSchemaList) {
+    // create segment info object
+    SegmentInfo segmentInfo = new SegmentInfo();
+    // set the number of columns
+    segmentInfo.setNum_cols(columnSchemaList.size());
+    // setting the column cardinality
+    segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(columnCardinality));
+    // create index header object
+    IndexHeader indexHeader = new IndexHeader();
+    // set the segment info
+    indexHeader.setSegment_info(segmentInfo);
+    // set the column names
+    indexHeader.setTable_columns(columnSchemaList);
+    return indexHeader;
+  }
+
+  /**
+   * Below method will be used to get the block index info thrift object for each block
+   * present in the segment
+   *
+   * @param blockIndexInfoList block index info list
+   * @return list of block index
+   */
+  public static List<BlockIndex> getBlockIndexInfo(List<BlockIndexInfo> blockIndexInfoList) {
+    List<BlockIndex> thriftBlockIndexList = new ArrayList<BlockIndex>();
+    BlockIndex blockIndex = null;
+    // below code to create block index info object for each block
+    for (BlockIndexInfo blockIndexInfo : blockIndexInfoList) {
+      blockIndex = new BlockIndex();
+      blockIndex.setNum_rows(blockIndexInfo.getNumberOfRows());
+      blockIndex.setOffset(blockIndexInfo.getNumberOfRows());
+      blockIndex.setFile_name(blockIndexInfo.getFileName());
+      blockIndex.setBlock_index(getBlockletIndex(blockIndexInfo.getBlockletIndex()));
+      thriftBlockIndexList.add(blockIndex);
+    }
+    return thriftBlockIndexList;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82332b0e/core/src/main/java/org/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/carbondata/core/util/CarbonUtil.java
index 58e6f42..427fb97 100644
--- a/core/src/main/java/org/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/carbondata/core/util/CarbonUtil.java
@@ -48,12 +48,16 @@ import java.util.concurrent.Executors;
 import org.carbondata.common.logging.LogService;
 import org.carbondata.common.logging.LogServiceFactory;
 import org.carbondata.core.cache.dictionary.Dictionary;
+import org.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
 import org.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
 import org.carbondata.core.carbon.metadata.blocklet.datachunk.DataChunk;
 import org.carbondata.core.carbon.metadata.datatype.DataType;
 import org.carbondata.core.carbon.metadata.encoder.Encoding;
 import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
+import org.carbondata.core.carbon.path.CarbonStorePath;
+import org.carbondata.core.carbon.path.CarbonTablePath;
 import org.carbondata.core.constants.CarbonCommonConstants;
 import org.carbondata.core.datastorage.store.FileHolder;
 import org.carbondata.core.datastorage.store.columnar.ColumnarKeyStoreDataHolder;
@@ -1683,5 +1687,36 @@ public final class CarbonUtil {
     }
   }
 
+  /**
+   * Below method will be used to get all the block index info from index file
+   *
+   * @param taskId                  task id of the file
+   * @param tableBlockInfoList      list of table block
+   * @param absoluteTableIdentifier absolute table identifier
+   * @return list of block info
+   * @throws CarbonUtilException if any problem while reading
+   */
+  public static List<DataFileFooter> readCarbonIndexFile(String taskId,
+      List<TableBlockInfo> tableBlockInfoList, AbsoluteTableIdentifier absoluteTableIdentifier)
+      throws CarbonUtilException {
+    // need to sort the  block info list based for task in ascending  order so
+    // it will be sinkup with block index read from file
+    Collections.sort(tableBlockInfoList);
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier());
+    // geting the index file path
+    //TODO need to pass proper partition number when partiton will be supported
+    String carbonIndexFilePath = carbonTablePath
+        .getCarbonIndexFilePath(taskId, "0", tableBlockInfoList.get(0).getSegmentId());
+    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+    try {
+      // read the index info and return
+      return fileFooterConverter.getIndexInfo(carbonIndexFilePath, tableBlockInfoList);
+    } catch (IOException e) {
+      throw new CarbonUtilException("Problem while reading the file metadata", e);
+    }
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82332b0e/core/src/main/java/org/carbondata/core/writer/CarbonIndexFileWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/writer/CarbonIndexFileWriter.java b/core/src/main/java/org/carbondata/core/writer/CarbonIndexFileWriter.java
new file mode 100644
index 0000000..5ae7b33
--- /dev/null
+++ b/core/src/main/java/org/carbondata/core/writer/CarbonIndexFileWriter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.core.writer;
+
+import java.io.IOException;
+
+import org.apache.thrift.TBase;
+
+/**
+ * Reader class which will be used to read the index file
+ */
+public class CarbonIndexFileWriter {
+
+  /**
+   * thrift writer object
+   */
+  private ThriftWriter thriftWriter;
+
+  /**
+   * It writes thrift object to file
+   *
+   * @param footer
+   * @throws IOException
+   */
+  public void writeThrift(TBase indexObject) throws IOException {
+    thriftWriter.write(indexObject);
+  }
+
+  /**
+   * Below method will be used to open the thrift writer
+   *
+   * @param filePath file path where data need to be written
+   * @throws IOException throws io exception in case of any failure
+   */
+  public void openThriftWriter(String filePath) throws IOException {
+    // create thrift writer instance
+    thriftWriter = new ThriftWriter(filePath, true);
+    // open the file stream
+    thriftWriter.open();
+  }
+
+  /**
+   * Below method will be used to close the thrift object
+   */
+  public void close() {
+    thriftWriter.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82332b0e/core/src/main/java/org/carbondata/query/util/DataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/util/DataFileFooterConverter.java b/core/src/main/java/org/carbondata/query/util/DataFileFooterConverter.java
index 7ca9a3f..fc23a0e 100644
--- a/core/src/main/java/org/carbondata/query/util/DataFileFooterConverter.java
+++ b/core/src/main/java/org/carbondata/query/util/DataFileFooterConverter.java
@@ -29,6 +29,7 @@ import java.util.List;
 
 import org.carbondata.common.logging.LogService;
 import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.carbondata.core.carbon.metadata.blocklet.BlockletInfo;
 import org.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
 import org.carbondata.core.carbon.metadata.blocklet.SegmentInfo;
@@ -47,8 +48,10 @@ import org.carbondata.core.datastorage.store.FileHolder;
 import org.carbondata.core.datastorage.store.impl.FileFactory;
 import org.carbondata.core.metadata.ValueEncoderMeta;
 import org.carbondata.core.reader.CarbonFooterReader;
+import org.carbondata.core.reader.CarbonIndexFileReader;
 import org.carbondata.core.util.ByteUtil;
 import org.carbondata.core.util.CarbonUtil;
+import org.carbondata.format.BlockIndex;
 import org.carbondata.format.FileFooter;
 
 /**
@@ -61,6 +64,47 @@ public class DataFileFooterConverter {
       LogServiceFactory.getLogService(DataFileFooterConverter.class.getName());
 
   /**
+   * Below method will be used to get the index info from index file
+   *
+   * @param filePath           file path of the index file
+   * @param tableBlockInfoList table block index
+   * @return list of index info
+   * @throws IOException problem while reading the index file
+   */
+  public List<DataFileFooter> getIndexInfo(String filePath, List<TableBlockInfo> tableBlockInfoList)
+      throws IOException {
+    CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
+    // open the reader
+    indexReader.openThriftReader(filePath);
+    // get the index header
+    org.carbondata.format.IndexHeader readIndexHeader = indexReader.readIndexHeader();
+    List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+    List<org.carbondata.format.ColumnSchema> table_columns = readIndexHeader.getTable_columns();
+    for (int i = 0; i < table_columns.size(); i++) {
+      columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)));
+    }
+    // get the segment info
+    SegmentInfo segmentInfo = getSegmentInfo(readIndexHeader.getSegment_info());
+    BlockletIndex blockletIndex = null;
+    int counter = 0;
+    DataFileFooter dataFileFooter = null;
+    List<DataFileFooter> dataFileFooters = new ArrayList<DataFileFooter>();
+    // read the block info from file
+    while (indexReader.hasNext()) {
+      BlockIndex readBlockIndexInfo = indexReader.readBlockIndexInfo();
+      blockletIndex = getBlockletIndex(readBlockIndexInfo.getBlock_index());
+      dataFileFooter = new DataFileFooter();
+      dataFileFooter.setBlockletIndex(blockletIndex);
+      dataFileFooter.setColumnInTable(columnSchemaList);
+      dataFileFooter.setNumberOfRows(readBlockIndexInfo.getNum_rows());
+      dataFileFooter.setTableBlockInfo(tableBlockInfoList.get(counter++));
+      dataFileFooter.setSegmentInfo(segmentInfo);
+      dataFileFooters.add(dataFileFooter);
+    }
+    return dataFileFooters;
+  }
+
+  /**
    * Below method will be used to convert thrift file meta to wrapper file meta
    */
   public DataFileFooter readDataFileFooter(String filePath, long blockOffset, long blockLength)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82332b0e/core/src/test/java/org/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java b/core/src/test/java/org/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java
deleted file mode 100644
index 328917d..0000000
--- a/core/src/test/java/org/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java
+++ /dev/null
@@ -1,143 +0,0 @@
-package org.carbondata.core.carbon.datastore;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.carbondata.core.carbon.AbsoluteTableIdentifier;
-import org.carbondata.core.carbon.CarbonTableIdentifier;
-import org.carbondata.core.carbon.datastore.block.AbstractIndex;
-import org.carbondata.core.carbon.datastore.block.TableBlockInfo;
-import org.carbondata.core.carbon.datastore.exception.IndexBuilderException;
-
-import junit.framework.TestCase;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class SegmentTaskIndexStoreTest extends TestCase {
-
-  private SegmentTaskIndexStore indexStore;
-
-  @BeforeClass public void setUp() {
-    indexStore = SegmentTaskIndexStore.getInstance();
-  }
-
-  @Test public void testloadAndGetTaskIdToSegmentsMapForSingleSegment() throws IOException {
-    String canonicalPath =
-        new File(this.getClass().getResource("/").getPath() + "/../../").getCanonicalPath();
-    File file = new File(canonicalPath + "/src/test/resources/part-0-0-1466029397000.carbondata");
-    TableBlockInfo info =
-        new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
-            file.length());
-    CarbonTableIdentifier carbonTableIdentifier = new CarbonTableIdentifier("default", "t3", "1");
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-        new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier);
-    Map<String, List<TableBlockInfo>> mapOfSegmentToTableBlockInfoList = new HashMap<>();
-    mapOfSegmentToTableBlockInfoList.put("0", Arrays.asList(new TableBlockInfo[] { info }));
-    Map<String, AbstractIndex> loadAndGetTaskIdToSegmentsMap = null;
-    try {
-      loadAndGetTaskIdToSegmentsMap = indexStore
-          .loadAndGetTaskIdToSegmentsMap(mapOfSegmentToTableBlockInfoList, absoluteTableIdentifier);
-    } catch (IndexBuilderException e) {
-      assertTrue(false);
-    }
-    assertTrue(loadAndGetTaskIdToSegmentsMap.size() == 1);
-    indexStore.removeTableBlocks(Arrays.asList(new String[] { "0" }), absoluteTableIdentifier);
-  }
-
-  //@Test
-  public void testloadAndGetTaskIdToSegmentsMapForSameSegmentLoadedConcurrently()
-      throws IOException {
-    String canonicalPath =
-        new File(this.getClass().getResource("/").getPath() + "/../../").getCanonicalPath();
-    File file = new File(canonicalPath + "/src/test/resources/part-0-0-1466029397000.carbondata");
-    TableBlockInfo info =
-        new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
-            file.length());
-    CarbonTableIdentifier carbonTableIdentifier = new CarbonTableIdentifier("default", "t3", "1");
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-        new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier);
-    Map<String, List<TableBlockInfo>> mapOfSegmentToTableBlockInfoList = new HashMap<>();
-    mapOfSegmentToTableBlockInfoList.put("0", Arrays.asList(new TableBlockInfo[] { info }));
-    ExecutorService executor = Executors.newFixedThreadPool(2);
-
-    executor
-        .submit(new SegmentLoaderThread(mapOfSegmentToTableBlockInfoList, absoluteTableIdentifier));
-    executor
-        .submit(new SegmentLoaderThread(mapOfSegmentToTableBlockInfoList, absoluteTableIdentifier));
-    executor.shutdown();
-    try {
-      executor.awaitTermination(1, TimeUnit.DAYS);
-    } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
-    assertTrue(indexStore.getSegmentBTreeIfExists(absoluteTableIdentifier, "0").size() == 1);
-    indexStore.removeTableBlocks(Arrays.asList(new String[] { "0" }), absoluteTableIdentifier);
-  }
-
-  @Test public void testloadAndGetTaskIdToSegmentsMapForDifferentSegmentLoadedConcurrently()
-      throws IOException {
-    String canonicalPath =
-        new File(this.getClass().getResource("/").getPath() + "/../../").getCanonicalPath();
-    File file = new File(canonicalPath + "/src/test/resources/part-0-0-1466029397000.carbondata");
-    TableBlockInfo info =
-        new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
-            file.length());
-    TableBlockInfo info1 =
-        new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
-            file.length());
-    CarbonTableIdentifier carbonTableIdentifier = new CarbonTableIdentifier("default", "t3", "1");
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-        new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier);
-    Map<String, List<TableBlockInfo>> mapOfSegmentToTableBlockInfoList = new HashMap<>();
-    Map<String, List<TableBlockInfo>> mapOfSegmentToTableBlockInfoList1 = new HashMap<>();
-    mapOfSegmentToTableBlockInfoList.put("0", Arrays.asList(new TableBlockInfo[] { info }));
-    mapOfSegmentToTableBlockInfoList1.put("1", Arrays.asList(new TableBlockInfo[] { info1 }));
-    ExecutorService executor = Executors.newFixedThreadPool(2);
-
-    executor
-        .submit(new SegmentLoaderThread(mapOfSegmentToTableBlockInfoList, absoluteTableIdentifier));
-    executor.submit(
-        new SegmentLoaderThread(mapOfSegmentToTableBlockInfoList1, absoluteTableIdentifier));
-
-    executor.shutdown();
-    try {
-      executor.awaitTermination(1, TimeUnit.DAYS);
-    } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
-    assertTrue(indexStore.getSegmentBTreeIfExists(absoluteTableIdentifier, "0").size() == 1);
-    assertTrue(indexStore.getSegmentBTreeIfExists(absoluteTableIdentifier, "1").size() == 1);
-    indexStore.removeTableBlocks(Arrays.asList(new String[] { "0" }), absoluteTableIdentifier);
-    indexStore.removeTableBlocks(Arrays.asList(new String[] { "1" }), absoluteTableIdentifier);
-  }
-
-  private class SegmentLoaderThread implements Callable<Void> {
-    private Map<String, List<TableBlockInfo>> mapOfSegmentToTableBlockInfoList;
-
-    private AbsoluteTableIdentifier absoluteTableIdentifier;
-
-    public SegmentLoaderThread(Map<String, List<TableBlockInfo>> mapOfSegmentToTableBlockInfoList,
-        AbsoluteTableIdentifier absoluteTableIdentifier) {
-      // TODO Auto-generated constructor stub
-      this.mapOfSegmentToTableBlockInfoList = mapOfSegmentToTableBlockInfoList;
-      this.absoluteTableIdentifier = absoluteTableIdentifier;
-    }
-
-    @Override public Void call() throws Exception {
-      indexStore
-          .loadAndGetTaskIdToSegmentsMap(mapOfSegmentToTableBlockInfoList, absoluteTableIdentifier);
-      return null;
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82332b0e/format/src/main/thrift/carbondataindex.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/carbondataindex.thrift b/format/src/main/thrift/carbondataindex.thrift
new file mode 100644
index 0000000..14159f1
--- /dev/null
+++ b/format/src/main/thrift/carbondataindex.thrift
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * File format description for the carbon file format
+ */
+namespace java org.carbondata.format
+
+include "schema.thrift"
+include "carbondata.thrift"
+
+/**
+ * header information stored in index file
+ */
+struct IndexHeader{
+  1: required i32 version; // version used for data compatibility
+  2: required list<schema.ColumnSchema> table_columns;	// Description of columns in this file
+  3: required carbondata.SegmentInfo segment_info;	// Segment info (will be same/repeated for all files in this segment)
+}
+
+/**
+ * block index information stored in index file for every block
+ */
+struct BlockIndex{
+  1: required i64 num_rows; // Total number of rows in this file
+  2: required string file_name; // Block file name
+  3: required i64 offset; // Offset of block
+  4: required carbondata.BlockletIndex block_index;	// Block index
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82332b0e/hadoop/src/main/java/org/carbondata/hadoop/CarbonPathFilter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/carbondata/hadoop/CarbonPathFilter.java b/hadoop/src/main/java/org/carbondata/hadoop/CarbonPathFilter.java
index 0e310f3..1bdd3d1 100644
--- a/hadoop/src/main/java/org/carbondata/hadoop/CarbonPathFilter.java
+++ b/hadoop/src/main/java/org/carbondata/hadoop/CarbonPathFilter.java
@@ -18,6 +18,8 @@
  */
 package org.carbondata.hadoop;
 
+import org.carbondata.core.carbon.path.CarbonTablePath;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 
@@ -37,6 +39,6 @@ public class CarbonPathFilter implements PathFilter {
   }
 
   @Override public boolean accept(Path path) {
-    return true;
+    return CarbonTablePath.isCarbonDataFile(path.getName());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82332b0e/processing/src/main/java/org/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/carbondata/processing/store/writer/AbstractFactDataWriter.java
index a0fa842..4e49806 100644
--- a/processing/src/main/java/org/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -40,8 +40,12 @@ import java.util.concurrent.TimeUnit;
 import org.carbondata.common.logging.LogService;
 import org.carbondata.common.logging.LogServiceFactory;
 import org.carbondata.core.carbon.metadata.CarbonMetadata;
+import org.carbondata.core.carbon.metadata.blocklet.index.BlockletBTreeIndex;
+import org.carbondata.core.carbon.metadata.blocklet.index.BlockletIndex;
+import org.carbondata.core.carbon.metadata.blocklet.index.BlockletMinMaxIndex;
 import org.carbondata.core.carbon.metadata.converter.SchemaConverter;
 import org.carbondata.core.carbon.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.carbondata.core.carbon.metadata.index.BlockIndexInfo;
 import org.carbondata.core.carbon.metadata.schema.table.CarbonTable;
 import org.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
 import org.carbondata.core.carbon.path.CarbonStorePath;
@@ -52,17 +56,20 @@ import org.carbondata.core.datastorage.store.impl.FileFactory;
 import org.carbondata.core.file.manager.composite.FileData;
 import org.carbondata.core.file.manager.composite.IFileManagerComposite;
 import org.carbondata.core.metadata.BlockletInfoColumnar;
+import org.carbondata.core.util.ByteUtil;
 import org.carbondata.core.util.CarbonMergerUtil;
 import org.carbondata.core.util.CarbonMetadataUtil;
 import org.carbondata.core.util.CarbonProperties;
 import org.carbondata.core.util.CarbonUtil;
 import org.carbondata.core.writer.CarbonFooterWriter;
+import org.carbondata.core.writer.CarbonIndexFileWriter;
+import org.carbondata.format.BlockIndex;
 import org.carbondata.format.FileFooter;
+import org.carbondata.format.IndexHeader;
 import org.carbondata.processing.store.CarbonDataFileAttributes;
 import org.carbondata.processing.store.writer.exception.CarbonDataWriterException;
 
 import org.apache.commons.lang3.ArrayUtils;
-
 import org.apache.hadoop.io.IOUtils;
 
 public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<T>
@@ -164,6 +171,8 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
   private int spaceReservedForBlockMetaSize;
   private FileOutputStream fileOutputStream;
 
+  private List<BlockIndexInfo> blockIndexInfoList;
+
   public AbstractFactDataWriter(String storeLocation, int measureCount, int mdKeyLength,
       String databaseName, String tableName, IFileManagerComposite fileManager, int[] keyBlockSize,
       CarbonDataFileAttributes carbonDataFileAttributes, List<ColumnSchema> columnSchema,
@@ -180,6 +189,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
     this.storeLocation = storeLocation;
     this.blockletInfoList =
         new ArrayList<BlockletInfoColumnar>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    blockIndexInfoList = new ArrayList<>();
     // get max file size;
     CarbonProperties propInstance = CarbonProperties.getInstance();
     this.fileSizeInBytes = Long.parseLong(propInstance
@@ -223,6 +233,27 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
   }
 
   /**
+   * This method will return max of block size and file size
+   *
+   * @param blockSize
+   * @param fileSize
+   * @return
+   */
+  private static long getMaxOfBlockAndFileSize(long blockSize, long fileSize) {
+    long maxSize = blockSize;
+    if (fileSize > blockSize) {
+      maxSize = fileSize;
+    }
+    // block size should be exactly divisible by 512 which is  maintained by HDFS as bytes
+    // per checksum, dfs.bytes-per-checksum=512 must divide block size
+    long remainder = maxSize % HDFS_CHECKSUM_LENGTH;
+    if (remainder > 0) {
+      maxSize = maxSize + HDFS_CHECKSUM_LENGTH - remainder;
+    }
+    return maxSize;
+  }
+
+  /**
    * @param isNoDictionary the isNoDictionary to set
    */
   public void setIsNoDictionary(boolean[] isNoDictionary) {
@@ -237,7 +268,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    * current file size to 0 close the existing file channel get the new file
    * name and get the channel for new file
    *
-   * @param blockletDataSize  data size of one block
+   * @param blockletDataSize data size of one block
    * @throws CarbonDataWriterException if any problem
    */
   protected void updateBlockletFileChannel(long blockletDataSize) throws CarbonDataWriterException {
@@ -326,11 +357,56 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
       FileFooter convertFileMeta = CarbonMetadataUtil
           .convertFileFooter(infoList, localCardinality.length, localCardinality,
               thriftColumnSchemaList);
+      fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), filePath, currentPosition);
       writer.writeFooter(convertFileMeta, currentPosition);
     } catch (IOException e) {
       throw new CarbonDataWriterException("Problem while writing the carbon file: ", e);
     }
+  }
 
+  /**
+   * Below method will be used to fill the vlock info details
+   *
+   * @param infoList        info list
+   * @param numberOfRows    number of rows in file
+   * @param filePath        file path
+   * @param currentPosition current offset
+   */
+  private void fillBlockIndexInfoDetails(List<BlockletInfoColumnar> infoList, long numberOfRows,
+      String filePath, long currentPosition) {
+
+    // as min-max will change for each blocklet and second blocklet min-max can be lesser than
+    // the first blocklet so we need to calculate the complete block level min-max by taking
+    // the min value of each column and max value of each column
+    byte[][] currentMinValue = infoList.get(0).getColumnMinData().clone();
+    byte[][] currentMaxValue = infoList.get(0).getColumnMaxData().clone();
+    byte[][] minValue = null;
+    byte[][] maxValue = null;
+    for (int i = 1; i < infoList.size(); i++) {
+      minValue = infoList.get(i).getColumnMinData();
+      maxValue = infoList.get(i).getColumnMaxData();
+      for (int j = 0; j < maxValue.length; j++) {
+        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(currentMinValue[j], minValue[j]) > 0) {
+          currentMinValue[j] = minValue[j].clone();
+        }
+        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(currentMaxValue[j], maxValue[j]) < 0) {
+          currentMaxValue[j] = maxValue[j].clone();
+        }
+      }
+    }
+    // start and end key we can take based on first blocklet
+    // start key will be the block start key as
+    // it is the least key and end blocklet end key will be the block end key as it is the max key
+    BlockletBTreeIndex btree = new BlockletBTreeIndex(infoList.get(0).getStartKey(),
+        infoList.get(infoList.size() - 1).getEndKey());
+    BlockletMinMaxIndex minmax = new BlockletMinMaxIndex();
+    minmax.setMinValues(currentMinValue);
+    minmax.setMaxValues(currentMaxValue);
+    BlockletIndex blockletIndex = new BlockletIndex(btree, minmax);
+    BlockIndexInfo blockIndexInfo =
+        new BlockIndexInfo(numberOfRows, filePath.substring(0, filePath.lastIndexOf('.')),
+            currentPosition, blockletIndex);
+    blockIndexInfoList.add(blockIndexInfo);
   }
 
   protected List<org.carbondata.format.ColumnSchema> getColumnSchemaListAndCardinality(
@@ -422,10 +498,44 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
     CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
     renameCarbonDataFile();
     copyCarbonDataFileToCarbonStorePath(this.fileName.substring(0, this.fileName.lastIndexOf('.')));
+    try {
+      writeIndexFile();
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem while writing the index file", e);
+    }
     closeExecutorService();
   }
 
   /**
+   * Below method will be used to write the idex file
+   *
+   * @throws IOException               throws io exception if any problem while writing
+   * @throws CarbonDataWriterException data writing
+   */
+  private void writeIndexFile() throws IOException, CarbonDataWriterException {
+    // get the header
+    IndexHeader indexHeader =
+        CarbonMetadataUtil.getIndexHeader(localCardinality, thriftColumnSchemaList);
+    // get the block index info thrift
+    List<BlockIndex> blockIndexThrift = CarbonMetadataUtil.getBlockIndexInfo(blockIndexInfoList);
+    String fileName = storeLocation + File.separator + carbonTablePath
+        .getCarbonIndexFileName(carbonDataFileAttributes.getTaskId(),
+            carbonDataFileAttributes.getFactTimeStamp());
+    CarbonIndexFileWriter writer = new CarbonIndexFileWriter();
+    // open file
+    writer.openThriftWriter(fileName);
+    // write the header first
+    writer.writeThrift(indexHeader);
+    // write the indexes
+    for (BlockIndex blockIndex : blockIndexThrift) {
+      writer.writeThrift(blockIndex);
+    }
+    writer.close();
+    // copy from temp to actual store location
+    copyCarbonDataFileToCarbonStorePath(fileName);
+  }
+
+  /**
    * This method will close the executor service which is used for copying carbon
    * data files to carbon store path
    *
@@ -517,27 +627,6 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
   }
 
   /**
-   * This method will return max of block size and file size
-   *
-   * @param blockSize
-   * @param fileSize
-   * @return
-   */
-  private static long getMaxOfBlockAndFileSize(long blockSize, long fileSize) {
-    long maxSize = blockSize;
-    if (fileSize > blockSize) {
-      maxSize = fileSize;
-    }
-    // block size should be exactly divisible by 512 which is  maintained by HDFS as bytes
-    // per checksum, dfs.bytes-per-checksum=512 must divide block size
-    long remainder = maxSize % HDFS_CHECKSUM_LENGTH;
-    if (remainder > 0) {
-      maxSize = maxSize + HDFS_CHECKSUM_LENGTH - remainder;
-    }
-    return maxSize;
-  }
-
-  /**
    * Write leaf meta data to File.
    *
    * @throws CarbonDataWriterException