You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/12/01 09:53:56 UTC

[3/5] incubator-carbondata git commit: Improve first time query performance

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
new file mode 100644
index 0000000..db9c9be
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.datastore.block.BlockInfo;
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.carbon.metadata.blocklet.SegmentInfo;
+import org.apache.carbondata.core.carbon.metadata.blocklet.compressor.ChunkCompressorMeta;
+import org.apache.carbondata.core.carbon.metadata.blocklet.compressor.CompressionCodec;
+import org.apache.carbondata.core.carbon.metadata.blocklet.datachunk.DataChunk;
+import org.apache.carbondata.core.carbon.metadata.blocklet.datachunk.PresenceMeta;
+import org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletBTreeIndex;
+import org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletIndex;
+import org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletMinMaxIndex;
+import org.apache.carbondata.core.carbon.metadata.blocklet.sort.SortState;
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.reader.CarbonIndexFileReader;
+import org.apache.carbondata.format.BlockIndex;
+
+/**
+ * Footer reader class
+ */
+public abstract class AbstractDataFileFooterConverter {
+
+  /**
+   * Below method will be used to convert the thrift presence meta to wrapper
+   * presence meta
+   *
+   * @param presentMetadataThrift
+   * @return wrapper presence meta
+   */
+  private static PresenceMeta getPresenceMeta(
+      org.apache.carbondata.format.PresenceMeta presentMetadataThrift) {
+    PresenceMeta presenceMeta = new PresenceMeta();
+    presenceMeta.setRepresentNullValues(presentMetadataThrift.isRepresents_presence());
+    presenceMeta.setBitSet(BitSet.valueOf(presentMetadataThrift.getPresent_bit_stream()));
+    return presenceMeta;
+  }
+
+  /**
+   * Below method will be used to get the index info from index file
+   *
+   * @param filePath           file path of the index file
+   * @param tableBlockInfoList table block index
+   * @return list of index info
+   * @throws IOException problem while reading the index file
+   */
+  public List<DataFileFooter> getIndexInfo(String filePath, List<TableBlockInfo> tableBlockInfoList)
+      throws IOException, CarbonUtilException {
+    CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
+    List<DataFileFooter> dataFileFooters = new ArrayList<DataFileFooter>();
+    try {
+      // open the reader
+      indexReader.openThriftReader(filePath);
+      // get the index header
+      org.apache.carbondata.format.IndexHeader readIndexHeader = indexReader.readIndexHeader();
+      List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+      List<org.apache.carbondata.format.ColumnSchema> table_columns =
+          readIndexHeader.getTable_columns();
+      for (int i = 0; i < table_columns.size(); i++) {
+        columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)));
+      }
+      // get the segment info
+      SegmentInfo segmentInfo = getSegmentInfo(readIndexHeader.getSegment_info());
+      BlockletIndex blockletIndex = null;
+      int counter = 0;
+      DataFileFooter dataFileFooter = null;
+      // read the block info from file
+      while (indexReader.hasNext()) {
+        BlockIndex readBlockIndexInfo = indexReader.readBlockIndexInfo();
+        blockletIndex = getBlockletIndex(readBlockIndexInfo.getBlock_index());
+        dataFileFooter = new DataFileFooter();
+        TableBlockInfo tableBlockInfo = tableBlockInfoList.get(counter++);
+        tableBlockInfo.setBlockOffset(readBlockIndexInfo.getOffset());
+        tableBlockInfo.setVersion((short) readIndexHeader.getVersion());
+        int blockletSize = getBlockletSize(readBlockIndexInfo);
+        tableBlockInfo.getBlockletInfos().setNoOfBlockLets(blockletSize);
+        dataFileFooter.setBlockletIndex(blockletIndex);
+        dataFileFooter.setColumnInTable(columnSchemaList);
+        dataFileFooter.setNumberOfRows(readBlockIndexInfo.getNum_rows());
+        dataFileFooter.setBlockInfo(new BlockInfo(tableBlockInfo));
+        dataFileFooter.setSegmentInfo(segmentInfo);
+        dataFileFooters.add(dataFileFooter);
+      }
+    } finally {
+      indexReader.closeThriftReader();
+    }
+    return dataFileFooters;
+  }
+
+  /**
+   * the methods returns the number of blocklets in a block
+   *
+   * @param readBlockIndexInfo
+   * @return
+   */
+  protected int getBlockletSize(BlockIndex readBlockIndexInfo) {
+    long num_rows = readBlockIndexInfo.getNum_rows();
+    int blockletSize = Integer.parseInt(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.BLOCKLET_SIZE,
+            CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL));
+    int remainder = (int) (num_rows % blockletSize);
+    int noOfBlockLet = (int) (num_rows / blockletSize);
+    // there could be some blocklets which will not
+    // contain the total records equal to the blockletSize
+    if (remainder > 0) {
+      noOfBlockLet = noOfBlockLet + 1;
+    }
+    return noOfBlockLet;
+  }
+
+  /**
+   * Below method will be used to convert thrift file meta to wrapper file meta
+   */
+  public abstract DataFileFooter readDataFileFooter(TableBlockInfo tableBlockInfo)
+      throws IOException;
+
+  /**
+   * Below method will be used to get blocklet index for data file meta
+   *
+   * @param blockletIndexList
+   * @return blocklet index
+   */
+  protected BlockletIndex getBlockletIndexForDataFileFooter(List<BlockletIndex> blockletIndexList) {
+    BlockletIndex blockletIndex = new BlockletIndex();
+    BlockletBTreeIndex blockletBTreeIndex = new BlockletBTreeIndex();
+    blockletBTreeIndex.setStartKey(blockletIndexList.get(0).getBtreeIndex().getStartKey());
+    blockletBTreeIndex
+        .setEndKey(blockletIndexList.get(blockletIndexList.size() - 1).getBtreeIndex().getEndKey());
+    blockletIndex.setBtreeIndex(blockletBTreeIndex);
+    byte[][] currentMinValue = blockletIndexList.get(0).getMinMaxIndex().getMinValues().clone();
+    byte[][] currentMaxValue = blockletIndexList.get(0).getMinMaxIndex().getMaxValues().clone();
+    byte[][] minValue = null;
+    byte[][] maxValue = null;
+    for (int i = 1; i < blockletIndexList.size(); i++) {
+      minValue = blockletIndexList.get(i).getMinMaxIndex().getMinValues();
+      maxValue = blockletIndexList.get(i).getMinMaxIndex().getMaxValues();
+      for (int j = 0; j < maxValue.length; j++) {
+        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(currentMinValue[j], minValue[j]) > 0) {
+          currentMinValue[j] = minValue[j].clone();
+        }
+        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(currentMaxValue[j], maxValue[j]) < 0) {
+          currentMaxValue[j] = maxValue[j].clone();
+        }
+      }
+    }
+
+    BlockletMinMaxIndex minMax = new BlockletMinMaxIndex();
+    minMax.setMaxValues(currentMaxValue);
+    minMax.setMinValues(currentMinValue);
+    blockletIndex.setMinMaxIndex(minMax);
+    return blockletIndex;
+  }
+
+  protected ColumnSchema thriftColumnSchmeaToWrapperColumnSchema(
+      org.apache.carbondata.format.ColumnSchema externalColumnSchema) {
+    ColumnSchema wrapperColumnSchema = new ColumnSchema();
+    wrapperColumnSchema.setColumnUniqueId(externalColumnSchema.getColumn_id());
+    wrapperColumnSchema.setColumnName(externalColumnSchema.getColumn_name());
+    wrapperColumnSchema.setColumnar(externalColumnSchema.isColumnar());
+    wrapperColumnSchema
+        .setDataType(thriftDataTyopeToWrapperDataType(externalColumnSchema.data_type));
+    wrapperColumnSchema.setDimensionColumn(externalColumnSchema.isDimension());
+    List<Encoding> encoders = new ArrayList<Encoding>();
+    for (org.apache.carbondata.format.Encoding encoder : externalColumnSchema.getEncoders()) {
+      encoders.add(fromExternalToWrapperEncoding(encoder));
+    }
+    wrapperColumnSchema.setEncodingList(encoders);
+    wrapperColumnSchema.setNumberOfChild(externalColumnSchema.getNum_child());
+    wrapperColumnSchema.setPrecision(externalColumnSchema.getPrecision());
+    wrapperColumnSchema.setColumnGroup(externalColumnSchema.getColumn_group_id());
+    wrapperColumnSchema.setScale(externalColumnSchema.getScale());
+    wrapperColumnSchema.setDefaultValue(externalColumnSchema.getDefault_value());
+    wrapperColumnSchema.setAggregateFunction(externalColumnSchema.getAggregate_function());
+    return wrapperColumnSchema;
+  }
+
+  /**
+   * Below method is convert the thrift encoding to wrapper encoding
+   *
+   * @param encoderThrift thrift encoding
+   * @return wrapper encoding
+   */
+  protected Encoding fromExternalToWrapperEncoding(
+      org.apache.carbondata.format.Encoding encoderThrift) {
+    switch (encoderThrift) {
+      case DICTIONARY:
+        return Encoding.DICTIONARY;
+      case DELTA:
+        return Encoding.DELTA;
+      case RLE:
+        return Encoding.RLE;
+      case INVERTED_INDEX:
+        return Encoding.INVERTED_INDEX;
+      case BIT_PACKED:
+        return Encoding.BIT_PACKED;
+      case DIRECT_DICTIONARY:
+        return Encoding.DIRECT_DICTIONARY;
+      default:
+        throw new IllegalArgumentException(encoderThrift.toString() + " is not supported");
+    }
+  }
+
+  /**
+   * Below method will be used to convert the thrift compression to wrapper
+   * compression codec
+   *
+   * @param compressionCodecThrift
+   * @return wrapper compression codec
+   */
+  protected CompressionCodec getCompressionCodec(
+      org.apache.carbondata.format.CompressionCodec compressionCodecThrift) {
+    switch (compressionCodecThrift) {
+      case SNAPPY:
+        return CompressionCodec.SNAPPY;
+      default:
+        return CompressionCodec.SNAPPY;
+    }
+  }
+
+  /**
+   * Below method will be used to convert thrift segment object to wrapper
+   * segment object
+   *
+   * @param segmentInfo thrift segment info object
+   * @return wrapper segment info object
+   */
+  protected SegmentInfo getSegmentInfo(org.apache.carbondata.format.SegmentInfo segmentInfo) {
+    SegmentInfo info = new SegmentInfo();
+    int[] cardinality = new int[segmentInfo.getColumn_cardinalities().size()];
+    for (int i = 0; i < cardinality.length; i++) {
+      cardinality[i] = segmentInfo.getColumn_cardinalities().get(i);
+    }
+    info.setColumnCardinality(cardinality);
+    info.setNumberOfColumns(segmentInfo.getNum_cols());
+    return info;
+  }
+
+  /**
+   * Below method will be used to convert the blocklet index of thrift to
+   * wrapper
+   *
+   * @param blockletIndexThrift
+   * @return blocklet index wrapper
+   */
+  protected BlockletIndex getBlockletIndex(
+      org.apache.carbondata.format.BlockletIndex blockletIndexThrift) {
+    org.apache.carbondata.format.BlockletBTreeIndex btreeIndex =
+        blockletIndexThrift.getB_tree_index();
+    org.apache.carbondata.format.BlockletMinMaxIndex minMaxIndex =
+        blockletIndexThrift.getMin_max_index();
+    return new BlockletIndex(
+        new BlockletBTreeIndex(btreeIndex.getStart_key(), btreeIndex.getEnd_key()),
+        new BlockletMinMaxIndex(minMaxIndex.getMin_values(), minMaxIndex.getMax_values()));
+  }
+
+  /**
+   * Below method will be used to convert the thrift compression meta to
+   * wrapper chunk compression meta
+   *
+   * @param chunkCompressionMetaThrift
+   * @return chunkCompressionMetaWrapper
+   */
+  protected ChunkCompressorMeta getChunkCompressionMeta(
+      org.apache.carbondata.format.ChunkCompressionMeta chunkCompressionMetaThrift) {
+    ChunkCompressorMeta compressorMeta = new ChunkCompressorMeta();
+    compressorMeta
+        .setCompressor(getCompressionCodec(chunkCompressionMetaThrift.getCompression_codec()));
+    compressorMeta.setCompressedSize(chunkCompressionMetaThrift.getTotal_compressed_size());
+    compressorMeta.setUncompressedSize(chunkCompressionMetaThrift.getTotal_uncompressed_size());
+    return compressorMeta;
+  }
+
+  /**
+   * Below method will be used to convert the thrift data type to wrapper data
+   * type
+   *
+   * @param dataTypeThrift
+   * @return dataType wrapper
+   */
+  protected DataType thriftDataTyopeToWrapperDataType(
+      org.apache.carbondata.format.DataType dataTypeThrift) {
+    switch (dataTypeThrift) {
+      case STRING:
+        return DataType.STRING;
+      case SHORT:
+        return DataType.SHORT;
+      case INT:
+        return DataType.INT;
+      case LONG:
+        return DataType.LONG;
+      case DOUBLE:
+        return DataType.DOUBLE;
+      case DECIMAL:
+        return DataType.DECIMAL;
+      case TIMESTAMP:
+        return DataType.TIMESTAMP;
+      case ARRAY:
+        return DataType.ARRAY;
+      case STRUCT:
+        return DataType.STRUCT;
+      default:
+        return DataType.STRING;
+    }
+  }
+
+  /**
+   * Below method will be used to convert the thrift object to wrapper object
+   *
+   * @param sortStateThrift
+   * @return wrapper sort state object
+   */
+  protected SortState getSortState(org.apache.carbondata.format.SortState sortStateThrift) {
+    if (sortStateThrift == org.apache.carbondata.format.SortState.SORT_EXPLICIT) {
+      return SortState.SORT_EXPLICT;
+    } else if (sortStateThrift == org.apache.carbondata.format.SortState.SORT_NATIVE) {
+      return SortState.SORT_NATIVE;
+    } else {
+      return SortState.SORT_NONE;
+    }
+  }
+
+  /**
+   * Below method will be used to convert the thrift data chunk to wrapper
+   * data chunk
+   *
+   * @param datachunkThrift
+   * @return wrapper data chunk
+   */
+  protected DataChunk getDataChunk(org.apache.carbondata.format.DataChunk datachunkThrift,
+      boolean isPresenceMetaPresent) {
+    DataChunk dataChunk = new DataChunk();
+    dataChunk.setColumnUniqueIdList(datachunkThrift.getColumn_ids());
+    dataChunk.setDataPageLength(datachunkThrift.getData_page_length());
+    dataChunk.setDataPageOffset(datachunkThrift.getData_page_offset());
+    if (isPresenceMetaPresent) {
+      dataChunk.setNullValueIndexForColumn(getPresenceMeta(datachunkThrift.getPresence()));
+    }
+    dataChunk.setRlePageLength(datachunkThrift.getRle_page_length());
+    dataChunk.setRlePageOffset(datachunkThrift.getRle_page_offset());
+    dataChunk.setRowMajor(datachunkThrift.isRowMajor());
+    dataChunk.setRowIdPageLength(datachunkThrift.getRowid_page_length());
+    dataChunk.setRowIdPageOffset(datachunkThrift.getRowid_page_offset());
+    dataChunk.setSortState(getSortState(datachunkThrift.getSort_state()));
+    dataChunk.setChunkCompressionMeta(getChunkCompressionMeta(datachunkThrift.getChunk_meta()));
+    List<Encoding> encodingList = new ArrayList<Encoding>(datachunkThrift.getEncoders().size());
+    for (int i = 0; i < datachunkThrift.getEncoders().size(); i++) {
+      encodingList.add(fromExternalToWrapperEncoding(datachunkThrift.getEncoders().get(i)));
+    }
+    dataChunk.setEncoderList(encodingList);
+    if (encodingList.contains(Encoding.DELTA)) {
+      List<ByteBuffer> thriftEncoderMeta = datachunkThrift.getEncoder_meta();
+      List<ValueEncoderMeta> encodeMetaList =
+          new ArrayList<ValueEncoderMeta>(thriftEncoderMeta.size());
+      for (int i = 0; i < thriftEncoderMeta.size(); i++) {
+        encodeMetaList.add(CarbonUtil.deserializeEncoderMeta(thriftEncoderMeta.get(i).array()));
+      }
+      dataChunk.setValueEncoderMeta(encodeMetaList);
+    }
+    return dataChunk;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index de0ea44..4f8a435 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.carbon.metadata.index.BlockIndexInfo;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.compression.SnappyCompression.SnappyByteCompression;
 import org.apache.carbondata.core.datastorage.store.compression.ValueCompressionModel;
 import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
@@ -40,11 +41,13 @@ import org.apache.carbondata.format.BlockIndex;
 import org.apache.carbondata.format.BlockletBTreeIndex;
 import org.apache.carbondata.format.BlockletIndex;
 import org.apache.carbondata.format.BlockletInfo;
+import org.apache.carbondata.format.BlockletInfo2;
 import org.apache.carbondata.format.BlockletMinMaxIndex;
 import org.apache.carbondata.format.ChunkCompressionMeta;
 import org.apache.carbondata.format.ColumnSchema;
 import org.apache.carbondata.format.CompressionCodec;
 import org.apache.carbondata.format.DataChunk;
+import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
 import org.apache.carbondata.format.FileFooter;
 import org.apache.carbondata.format.IndexHeader;
@@ -72,22 +75,60 @@ public class CarbonMetadataUtil {
    * @return FileFooter
    */
   public static FileFooter convertFileFooter(List<BlockletInfoColumnar> infoList, int numCols,
-      int[] cardinalities, List<ColumnSchema> columnSchemaList,
-      SegmentProperties segmentProperties) throws IOException {
+      int[] cardinalities, List<ColumnSchema> columnSchemaList, SegmentProperties segmentProperties)
+      throws IOException {
+    FileFooter footer = getFileFooter(infoList, cardinalities, columnSchemaList);
+    for (BlockletInfoColumnar info : infoList) {
+      footer.addToBlocklet_info_list(getBlockletInfo(info, columnSchemaList, segmentProperties));
+    }
+    return footer;
+  }
 
+  /**
+   * Below method will be used to get the file footer object
+   *
+   * @param infoList         blocklet info
+   * @param cardinalities    cardinlaity of dimension columns
+   * @param columnSchemaList column schema list
+   * @return file footer
+   */
+  private static FileFooter getFileFooter(List<BlockletInfoColumnar> infoList, int[] cardinalities,
+      List<ColumnSchema> columnSchemaList) {
     SegmentInfo segmentInfo = new SegmentInfo();
     segmentInfo.setNum_cols(columnSchemaList.size());
     segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(cardinalities));
-
+    short version = Short.parseShort(
+        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION));
     FileFooter footer = new FileFooter();
+    footer.setVersion(version);
     footer.setNum_rows(getTotalNumberOfRows(infoList));
     footer.setSegment_info(segmentInfo);
+    footer.setTable_columns(columnSchemaList);
     for (BlockletInfoColumnar info : infoList) {
       footer.addToBlocklet_index_list(getBlockletIndex(info));
     }
-    footer.setTable_columns(columnSchemaList);
+    return footer;
+  }
+
+  /**
+   * Below method will be used to get the file footer object for
+   *
+   * @param infoList         blocklet info
+   * @param cardinalities    cardinality of each column
+   * @param columnSchemaList column schema list
+   * @param dataChunksOffset data chunks offsets
+   * @param dataChunksLength data chunks length
+   * @return filefooter thrift object
+   */
+  public static FileFooter convertFilterFooter2(List<BlockletInfoColumnar> infoList,
+      int[] cardinalities, List<ColumnSchema> columnSchemaList, List<List<Long>> dataChunksOffset,
+      List<List<Short>> dataChunksLength) {
+    FileFooter footer = getFileFooter(infoList, cardinalities, columnSchemaList);
+    int index = 0;
     for (BlockletInfoColumnar info : infoList) {
-      footer.addToBlocklet_info_list(getBlockletInfo(info, columnSchemaList, segmentProperties));
+      footer.addToBlocklet_info_list2(
+          getBlockletInfo2(info, dataChunksOffset.get(index), dataChunksLength.get(index)));
+      index++;
     }
     return footer;
   }
@@ -142,15 +183,31 @@ public class CarbonMetadataUtil {
     return blockletIndex;
   }
 
+  /**
+   * Below method will be used to get the blocklet info object for
+   * data version 2 file
+   *
+   * @param blockletInfoColumnar blocklet info
+   * @param dataChunkOffsets     data chunks offsets
+   * @param dataChunksLength     data chunks length
+   * @return blocklet info version 2
+   */
+  private static BlockletInfo2 getBlockletInfo2(BlockletInfoColumnar blockletInfoColumnar,
+      List<Long> dataChunkOffsets, List<Short> dataChunksLength) {
+    BlockletInfo2 blockletInfo = new BlockletInfo2();
+    blockletInfo.setNum_rows(blockletInfoColumnar.getNumberOfKeys());
+    blockletInfo.setColumn_data_chunks_length(dataChunksLength);
+    blockletInfo.setColumn_data_chunks_offsets(dataChunkOffsets);
+    return blockletInfo;
+  }
+
   private static BlockletInfo getBlockletInfo(BlockletInfoColumnar blockletInfoColumnar,
-      List<ColumnSchema> columnSchenma,
-      SegmentProperties segmentProperties) throws IOException {
+      List<ColumnSchema> columnSchenma, SegmentProperties segmentProperties) throws IOException {
 
     BlockletInfo blockletInfo = new BlockletInfo();
     blockletInfo.setNum_rows(blockletInfoColumnar.getNumberOfKeys());
 
     List<DataChunk> colDataChunks = new ArrayList<DataChunk>();
-    blockletInfoColumnar.getKeyLengths();
     int j = 0;
     int aggregateIndex = 0;
     boolean[] isSortedKeyColumn = blockletInfoColumnar.getIsSortedKeyColumn();
@@ -419,6 +476,9 @@ public class CarbonMetadataUtil {
     segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(columnCardinality));
     // create index header object
     IndexHeader indexHeader = new IndexHeader();
+    short version = Short.parseShort(
+        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION));
+    indexHeader.setVersion(version);
     // set the segment info
     indexHeader.setSegment_info(segmentInfo);
     // set the column names
@@ -440,11 +500,91 @@ public class CarbonMetadataUtil {
     for (BlockIndexInfo blockIndexInfo : blockIndexInfoList) {
       blockIndex = new BlockIndex();
       blockIndex.setNum_rows(blockIndexInfo.getNumberOfRows());
-      blockIndex.setOffset(blockIndexInfo.getNumberOfRows());
+      blockIndex.setOffset(blockIndexInfo.getOffset());
       blockIndex.setFile_name(blockIndexInfo.getFileName());
       blockIndex.setBlock_index(getBlockletIndex(blockIndexInfo.getBlockletIndex()));
       thriftBlockIndexList.add(blockIndex);
     }
     return thriftBlockIndexList;
   }
+
+  /**
+   * Below method will be used to get the data chunk object for all the
+   * columns
+   *
+   * @param blockletInfoColumnar blocklet info
+   * @param columnSchenma        list of columns
+   * @param segmentProperties    segment properties
+   * @return list of data chunks
+   * @throws IOException
+   */
+  public static List<DataChunk2> getDatachunk2(BlockletInfoColumnar blockletInfoColumnar,
+      List<ColumnSchema> columnSchenma, SegmentProperties segmentProperties) throws IOException {
+    List<DataChunk2> colDataChunks = new ArrayList<DataChunk2>();
+    int rowIdIndex = 0;
+    int aggregateIndex = 0;
+    boolean[] isSortedKeyColumn = blockletInfoColumnar.getIsSortedKeyColumn();
+    boolean[] aggKeyBlock = blockletInfoColumnar.getAggKeyBlock();
+    boolean[] colGrpblock = blockletInfoColumnar.getColGrpBlocks();
+    for (int i = 0; i < blockletInfoColumnar.getKeyLengths().length; i++) {
+      DataChunk2 dataChunk = new DataChunk2();
+      dataChunk.setChunk_meta(getChunkCompressionMeta());
+      List<Encoding> encodings = new ArrayList<Encoding>();
+      if (containsEncoding(i, Encoding.DICTIONARY, columnSchenma, segmentProperties)) {
+        encodings.add(Encoding.DICTIONARY);
+      }
+      if (containsEncoding(i, Encoding.DIRECT_DICTIONARY, columnSchenma, segmentProperties)) {
+        encodings.add(Encoding.DIRECT_DICTIONARY);
+      }
+      dataChunk.setRowMajor(colGrpblock[i]);
+      //TODO : Once schema PR is merged and information needs to be passed here.
+      dataChunk.setData_page_length(blockletInfoColumnar.getKeyLengths()[i]);
+      if (aggKeyBlock[i]) {
+        dataChunk.setRle_page_length(blockletInfoColumnar.getDataIndexMapLength()[aggregateIndex]);
+        encodings.add(Encoding.RLE);
+        aggregateIndex++;
+      }
+      dataChunk
+          .setSort_state(isSortedKeyColumn[i] ? SortState.SORT_EXPLICIT : SortState.SORT_NATIVE);
+
+      if (!isSortedKeyColumn[i]) {
+        dataChunk.setRowid_page_length(blockletInfoColumnar.getKeyBlockIndexLength()[rowIdIndex]);
+        encodings.add(Encoding.INVERTED_INDEX);
+        rowIdIndex++;
+      }
+
+      //TODO : Right now the encodings are happening at runtime. change as per this encoders.
+      dataChunk.setEncoders(encodings);
+
+      colDataChunks.add(dataChunk);
+    }
+
+    for (int i = 0; i < blockletInfoColumnar.getMeasureLength().length; i++) {
+      DataChunk2 dataChunk = new DataChunk2();
+      dataChunk.setChunk_meta(getChunkCompressionMeta());
+      dataChunk.setRowMajor(false);
+      //TODO : Once schema PR is merged and information needs to be passed here.
+      dataChunk.setData_page_length(blockletInfoColumnar.getMeasureLength()[i]);
+      //TODO : Right now the encodings are happening at runtime. change as per this encoders.
+      List<Encoding> encodings = new ArrayList<Encoding>();
+      encodings.add(Encoding.DELTA);
+      dataChunk.setEncoders(encodings);
+      //TODO writing dummy presence meta need to set actual presence
+      //meta
+      PresenceMeta presenceMeta = new PresenceMeta();
+      presenceMeta.setPresent_bit_streamIsSet(true);
+      presenceMeta.setPresent_bit_stream(SnappyByteCompression.INSTANCE
+          .compress(blockletInfoColumnar.getMeasureNullValueIndex()[i].toByteArray()));
+      dataChunk.setPresence(presenceMeta);
+      //TODO : PresenceMeta needs to be implemented and set here
+      // dataChunk.setPresence(new PresenceMeta());
+      //TODO : Need to write ValueCompression meta here.
+      List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>();
+      encoderMetaList.add(ByteBuffer.wrap(serializeEncoderMeta(
+          createValueEncoderMeta(blockletInfoColumnar.getCompressionModel(), i))));
+      dataChunk.setEncoder_meta(encoderMetaList);
+      colDataChunks.add(dataChunk);
+    }
+    return colDataChunks;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index b856928..adb0e6a 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -83,6 +83,7 @@ public final class CarbonProperties {
     validateHighCardinalityIdentify();
     validateHighCardinalityThreshold();
     validateHighCardinalityInRowCountPercentage();
+    validateCarbonDataFileVersion();
   }
 
   private void validateBadRecordsLocation() {
@@ -106,15 +107,15 @@ public final class CarbonProperties {
       if (blockletSize < CarbonCommonConstants.BLOCKLET_SIZE_MIN_VAL
           || blockletSize > CarbonCommonConstants.BLOCKLET_SIZE_MAX_VAL) {
         LOGGER.info("The blocklet size value \"" + blockletSizeStr
-                + "\" is invalid. Using the default value \""
-                + CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL);
+            + "\" is invalid. Using the default value \""
+            + CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL);
         carbonProperties.setProperty(CarbonCommonConstants.BLOCKLET_SIZE,
             CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL);
       }
     } catch (NumberFormatException e) {
       LOGGER.info("The blocklet size value \"" + blockletSizeStr
-              + "\" is invalid. Using the default value \""
-              + CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL);
+          + "\" is invalid. Using the default value \""
+          + CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL);
       carbonProperties.setProperty(CarbonCommonConstants.BLOCKLET_SIZE,
           CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL);
     }
@@ -131,16 +132,16 @@ public final class CarbonProperties {
 
       if (numCores < CarbonCommonConstants.NUM_CORES_MIN_VAL
           || numCores > CarbonCommonConstants.NUM_CORES_MAX_VAL) {
-        LOGGER.info("The num Cores  value \"" + numCoresStr
-            + "\" is invalid. Using the default value \""
-            + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+        LOGGER.info(
+            "The num Cores  value \"" + numCoresStr + "\" is invalid. Using the default value \""
+                + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
         carbonProperties.setProperty(CarbonCommonConstants.NUM_CORES,
             CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
       }
     } catch (NumberFormatException e) {
-      LOGGER.info("The num Cores  value \"" + numCoresStr
-          + "\" is invalid. Using the default value \""
-          + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+      LOGGER.info(
+          "The num Cores  value \"" + numCoresStr + "\" is invalid. Using the default value \""
+              + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
       carbonProperties.setProperty(CarbonCommonConstants.NUM_CORES,
           CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
     }
@@ -150,9 +151,8 @@ public final class CarbonProperties {
    * This method validates the number cores specified for mdk block sort
    */
   private void validateNumCoresBlockSort() {
-    String numCoresStr = carbonProperties
-        .getProperty(CarbonCommonConstants.NUM_CORES_BLOCK_SORT,
-            CarbonCommonConstants.NUM_CORES_BLOCK_SORT_DEFAULT_VAL);
+    String numCoresStr = carbonProperties.getProperty(CarbonCommonConstants.NUM_CORES_BLOCK_SORT,
+        CarbonCommonConstants.NUM_CORES_BLOCK_SORT_DEFAULT_VAL);
     try {
       int numCores = Integer.parseInt(numCoresStr);
 
@@ -183,25 +183,25 @@ public final class CarbonProperties {
       int sortSize = Integer.parseInt(sortSizeStr);
 
       if (sortSize < CarbonCommonConstants.SORT_SIZE_MIN_VAL) {
-        LOGGER.info("The batch size value \"" + sortSizeStr
-            + "\" is invalid. Using the default value \""
-            + CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL);
+        LOGGER.info(
+            "The batch size value \"" + sortSizeStr + "\" is invalid. Using the default value \""
+                + CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL);
         carbonProperties.setProperty(CarbonCommonConstants.SORT_SIZE,
             CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL);
       }
     } catch (NumberFormatException e) {
-      LOGGER.info("The batch size value \"" + sortSizeStr
-          + "\" is invalid. Using the default value \""
-          + CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL);
+      LOGGER.info(
+          "The batch size value \"" + sortSizeStr + "\" is invalid. Using the default value \""
+              + CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL);
       carbonProperties.setProperty(CarbonCommonConstants.SORT_SIZE,
           CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL);
     }
   }
 
   private void validateHighCardinalityIdentify() {
-    String highcardIdentifyStr = carbonProperties.getProperty(
-        CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE,
-        CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE_DEFAULT);
+    String highcardIdentifyStr = carbonProperties
+        .getProperty(CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE,
+            CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE_DEFAULT);
     try {
       Boolean.parseBoolean(highcardIdentifyStr);
     } catch (NumberFormatException e) {
@@ -214,12 +214,12 @@ public final class CarbonProperties {
   }
 
   private void validateHighCardinalityThreshold() {
-    String highcardThresholdStr = carbonProperties.getProperty(
-        CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD,
-        CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD_DEFAULT);
+    String highcardThresholdStr = carbonProperties
+        .getProperty(CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD,
+            CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD_DEFAULT);
     try {
       int highcardThreshold = Integer.parseInt(highcardThresholdStr);
-      if(highcardThreshold < CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD_MIN){
+      if (highcardThreshold < CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD_MIN) {
         LOGGER.info("The high cardinality threshold value \"" + highcardThresholdStr
             + "\" is invalid. Using the min value \""
             + CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD_MIN);
@@ -236,22 +236,22 @@ public final class CarbonProperties {
   }
 
   private void validateHighCardinalityInRowCountPercentage() {
-    String highcardPercentageStr = carbonProperties.getProperty(
-        CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE,
-        CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT);
+    String highcardPercentageStr = carbonProperties
+        .getProperty(CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE,
+            CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT);
     try {
       double highcardPercentage = Double.parseDouble(highcardPercentageStr);
-      if(highcardPercentage <= 0){
-        LOGGER.info("The percentage of high cardinality in row count value \""
-            + highcardPercentageStr + "\" is invalid. Using the default value \""
-            + CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT);
-        carbonProperties.setProperty(
-            CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE,
+      if (highcardPercentage <= 0) {
+        LOGGER.info(
+            "The percentage of high cardinality in row count value \"" + highcardPercentageStr
+                + "\" is invalid. Using the default value \""
+                + CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT);
+        carbonProperties.setProperty(CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE,
             CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT);
       }
     } catch (NumberFormatException e) {
-      LOGGER.info("The percentage of high cardinality in row count value \""
-          + highcardPercentageStr + "\" is invalid. Using the default value \""
+      LOGGER.info("The percentage of high cardinality in row count value \"" + highcardPercentageStr
+          + "\" is invalid. Using the default value \""
           + CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT);
       carbonProperties.setProperty(CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE,
           CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT);
@@ -259,6 +259,34 @@ public final class CarbonProperties {
   }
 
   /**
+   * Below method will be used to validate the data file version parameter
+   * if parameter is invalid current version will be set
+   */
+  private void validateCarbonDataFileVersion() {
+    short carbondataFileVersion = CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION;
+    String carbondataFileVersionString =
+        carbonProperties.getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION);
+    try {
+      carbondataFileVersion = Short.parseShort(carbondataFileVersionString);
+    } catch (NumberFormatException e) {
+      carbondataFileVersion = CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION;
+      LOGGER.info("Current Data file version property is invalid  \"" + carbondataFileVersionString
+          + "\" is invalid. Using the Current data file version value \"" + carbondataFileVersion);
+      carbonProperties
+          .setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, carbondataFileVersion + "");
+    }
+    if (carbondataFileVersion > CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION
+        || carbondataFileVersion < 0) {
+      LOGGER.info("Current Data file version property is invalid  \"" + carbondataFileVersionString
+          + "\" is invalid. Using the Current data file version value \"" + carbondataFileVersion);
+      carbondataFileVersion = CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION;
+      carbonProperties
+          .setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, carbondataFileVersion + "");
+    }
+
+  }
+
+  /**
    * This method will read all the properties from file and load it into
    * memory
    */
@@ -278,18 +306,18 @@ public final class CarbonProperties {
         carbonProperties.load(fis);
       }
     } catch (FileNotFoundException e) {
-      LOGGER.error("The file: " + CarbonCommonConstants.CARBON_PROPERTIES_FILE_PATH
-          + " does not exist");
+      LOGGER.error(
+          "The file: " + CarbonCommonConstants.CARBON_PROPERTIES_FILE_PATH + " does not exist");
     } catch (IOException e) {
-      LOGGER.error("Error while reading the file: "
-          + CarbonCommonConstants.CARBON_PROPERTIES_FILE_PATH);
+      LOGGER.error(
+          "Error while reading the file: " + CarbonCommonConstants.CARBON_PROPERTIES_FILE_PATH);
     } finally {
       if (null != fis) {
         try {
           fis.close();
         } catch (IOException e) {
           LOGGER.error("Error while closing the file stream for file: "
-                  + CarbonCommonConstants.CARBON_PROPERTIES_FILE_PATH);
+              + CarbonCommonConstants.CARBON_PROPERTIES_FILE_PATH);
         }
       }
     }
@@ -402,6 +430,7 @@ public final class CarbonProperties {
 
   /**
    * gettting the unmerged segment numbers to be merged.
+   *
    * @return
    */
   public int[] getCompactionSegmentLevelCount() {
@@ -411,7 +440,7 @@ public final class CarbonProperties {
         CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD);
     int[] compactionSize = getIntArray(commaSeparatedLevels);
 
-    if(null == compactionSize){
+    if (null == compactionSize) {
       compactionSize = getIntArray(CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD);
     }
 
@@ -419,7 +448,6 @@ public final class CarbonProperties {
   }
 
   /**
-   *
    * @param commaSeparatedLevels
    * @return
    */
@@ -430,13 +458,12 @@ public final class CarbonProperties {
     for (String levelSize : levels) {
       try {
         int size = Integer.parseInt(levelSize.trim());
-        if(validate(size,100,0,-1) < 0 ){
+        if (validate(size, 100, 0, -1) < 0) {
           // if given size is out of boundary then take default value for all levels.
           return null;
         }
         compactionSize[i++] = size;
-      }
-      catch(NumberFormatException e){
+      } catch (NumberFormatException e) {
         LOGGER.error(
             "Given value for property" + CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD
                 + " is not proper. Taking the default value "

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 3c976db..162e9b9 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -21,6 +21,8 @@
 package org.apache.carbondata.core.util;
 
 import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.File;
@@ -28,6 +30,7 @@ import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.ObjectInputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.charset.Charset;
@@ -49,7 +52,6 @@ import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
 import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.carbon.metadata.blocklet.datachunk.DataChunk;
 import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
 import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
@@ -67,11 +69,20 @@ import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
 import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.reader.ThriftReader.TBaseCreator;
+import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.scan.model.QueryDimension;
 
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+
 import org.pentaho.di.core.exception.KettleException;
 
 
@@ -946,29 +957,25 @@ public final class CarbonUtil {
    * @return value compression model
    */
   public static ValueCompressionModel getValueCompressionModel(
-      List<DataChunk> measureDataChunkList) {
-    Object[] maxValue = new Object[measureDataChunkList.size()];
-    Object[] minValue = new Object[measureDataChunkList.size()];
-    Object[] uniqueValue = new Object[measureDataChunkList.size()];
-    int[] decimal = new int[measureDataChunkList.size()];
-    char[] type = new char[measureDataChunkList.size()];
-    byte[] dataTypeSelected = new byte[measureDataChunkList.size()];
+      List<ValueEncoderMeta> encodeMetaList) {
+    Object[] maxValue = new Object[encodeMetaList.size()];
+    Object[] minValue = new Object[encodeMetaList.size()];
+    Object[] uniqueValue = new Object[encodeMetaList.size()];
+    int[] decimal = new int[encodeMetaList.size()];
+    char[] type = new char[encodeMetaList.size()];
+    byte[] dataTypeSelected = new byte[encodeMetaList.size()];
 
     /**
      * to fill the meta data required for value compression model
      */
     for (int i = 0; i < dataTypeSelected.length; i++) {
-      int indexOf = measureDataChunkList.get(i).getEncodingList().indexOf(Encoding.DELTA);
-      if (indexOf > -1) {
-        ValueEncoderMeta valueEncoderMeta =
-            measureDataChunkList.get(i).getValueEncoderMeta().get(indexOf);
-        maxValue[i] = valueEncoderMeta.getMaxValue();
-        minValue[i] = valueEncoderMeta.getMinValue();
-        uniqueValue[i] = valueEncoderMeta.getUniqueValue();
-        decimal[i] = valueEncoderMeta.getDecimal();
-        type[i] = valueEncoderMeta.getType();
-        dataTypeSelected[i] = valueEncoderMeta.getDataTypeSelected();
-      }
+      ValueEncoderMeta valueEncoderMeta = encodeMetaList.get(i);
+      maxValue[i] = valueEncoderMeta.getMaxValue();
+      minValue[i] = valueEncoderMeta.getMinValue();
+      uniqueValue[i] = valueEncoderMeta.getUniqueValue();
+      decimal[i] = valueEncoderMeta.getDecimal();
+      type[i] = valueEncoderMeta.getType();
+      dataTypeSelected[i] = valueEncoderMeta.getDataTypeSelected();
     }
     MeasureMetaDataModel measureMetadataModel =
         new MeasureMetaDataModel(minValue, maxValue, decimal, dataTypeSelected.length, uniqueValue,
@@ -1055,11 +1062,13 @@ public final class CarbonUtil {
    * @return Data file metadata instance
    * @throws CarbonUtilException
    */
-  public static DataFileFooter readMetadatFile(String filePath, long blockOffset, long blockLength)
+  public static DataFileFooter readMetadatFile(TableBlockInfo tableBlockInfo)
       throws CarbonUtilException {
-    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+    AbstractDataFileFooterConverter fileFooterConverter =
+        DataFileFooterConverterFactory.getInstance()
+            .getDataFileFooterConverter(tableBlockInfo.getVersion());
     try {
-      return fileFooterConverter.readDataFileFooter(filePath, blockOffset, blockLength);
+      return fileFooterConverter.readDataFileFooter(tableBlockInfo);
     } catch (IOException e) {
       throw new CarbonUtilException("Problem while reading the file metadata", e);
     }
@@ -1462,5 +1471,161 @@ public final class CarbonUtil {
     return segmentStringbuilder.toString();
   }
 
+  /**
+   * Below method will be used to convert the thrift object to byte array.
+   */
+  public static byte[] getByteArray(TBase t) {
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    byte[] thriftByteArray = null;
+    TProtocol binaryOut = new TCompactProtocol(new TIOStreamTransport(stream));
+    try {
+      t.write(binaryOut);
+      stream.flush();
+      thriftByteArray = stream.toByteArray();
+    } catch (TException | IOException e) {
+      closeStreams(stream);
+    } finally {
+      closeStreams(stream);
+    }
+    return thriftByteArray;
+  }
+
+  /**
+   * Below method will be used to convert the bytearray to data chunk object
+   *
+   * @param dataChunkBytes datachunk thrift object in bytes
+   * @return data chunk thrift object
+   */
+  public static DataChunk2 readDataChunk(byte[] dataChunkBytes) {
+    try {
+      return (DataChunk2) read(dataChunkBytes, new ThriftReader.TBaseCreator() {
+        @Override public TBase create() {
+          return new DataChunk2();
+        }
+      });
+    } catch (IOException e) {
+      LOGGER.error(e);
+    }
+    return null;
+  }
+
+  /**
+   * Below method will be used to convert the byte array value to thrift object for
+   * data chunk
+   *
+   * @param data    thrift byte array
+   * @param creator type of thrift
+   * @return thrift object
+   * @throws IOException any problem while converting the object
+   */
+  private static TBase read(byte[] data, TBaseCreator creator) throws IOException {
+    ByteArrayInputStream stream = new ByteArrayInputStream(data);
+    TProtocol binaryIn = new TCompactProtocol(new TIOStreamTransport(stream));
+    TBase t = creator.create();
+    try {
+      t.read(binaryIn);
+    } catch (TException e) {
+      throw new IOException(e);
+    } finally {
+      CarbonUtil.closeStreams(stream);
+    }
+    return t;
+  }
+
+  /**
+   * Below method will be used to convert the encode metadata to
+   * ValueEncoderMeta object
+   *
+   * @param encoderMeta
+   * @return ValueEncoderMeta object
+   */
+  public static ValueEncoderMeta deserializeEncoderMeta(byte[] encoderMeta) {
+    // TODO : should remove the unnecessary fields.
+    ByteArrayInputStream aos = null;
+    ObjectInputStream objStream = null;
+    ValueEncoderMeta meta = null;
+    try {
+      aos = new ByteArrayInputStream(encoderMeta);
+      objStream = new ObjectInputStream(aos);
+      meta = (ValueEncoderMeta) objStream.readObject();
+    } catch (ClassNotFoundException e) {
+      LOGGER.error(e);
+    } catch (IOException e) {
+      CarbonUtil.closeStreams(objStream);
+    }
+    return meta;
+  }
+
+  /**
+   * Below method will be used to convert indexes in range
+   * Indexes=[0,1,2,3,4,5,6,7,8,9]
+   * Length=9
+   * number of element in group =5
+   * then output will be [0,1,2,3,4],[5,6,7,8],[9]
+   *
+   * @param indexes                indexes
+   * @param length                 number of element to be considered
+   * @param numberOfElementInGroup number of element in group
+   * @return range indexes
+   */
+  public static int[][] getRangeIndex(int[] indexes, int length, int numberOfElementInGroup) {
+    List<List<Integer>> rangeList = new ArrayList<>();
+    int[][] outputArray = null;
+    int k = 0;
+    int index = 1;
+    if (indexes.length == 1) {
+      outputArray = new int[1][2];
+      outputArray[0][0] = indexes[0];
+      outputArray[0][1] = indexes[0];
+      return outputArray;
+    }
+    while (index < length) {
+      if (indexes[index] - indexes[index - 1] == 1 && k < numberOfElementInGroup - 1) {
+        k++;
+      } else {
+        if (k > 0) {
+          List<Integer> range = new ArrayList<>();
+          rangeList.add(range);
+          range.add(indexes[index - k - 1]);
+          range.add(indexes[index - 1]);
+        } else {
+          List<Integer> range = new ArrayList<>();
+          rangeList.add(range);
+          range.add(indexes[index - 1]);
+        }
+        k = 0;
+      }
+      index++;
+    }
+    if (k > 0) {
+      List<Integer> range = new ArrayList<>();
+      rangeList.add(range);
+      range.add(indexes[index - k - 1]);
+      range.add(indexes[index - 1]);
+    } else {
+      List<Integer> range = new ArrayList<>();
+      rangeList.add(range);
+      range.add(indexes[index - 1]);
+
+    }
+    if (length != indexes.length) {
+      List<Integer> range = new ArrayList<>();
+      rangeList.add(range);
+      range.add(indexes[indexes.length - 1]);
+    }
+
+    // as diving in range so array size will be always 2
+    outputArray = new int[rangeList.size()][2];
+    for (int i = 0; i < outputArray.length; i++) {
+      if (rangeList.get(i).size() == 1) {
+        outputArray[i][0] = rangeList.get(i).get(0);
+        outputArray[i][1] = rangeList.get(i).get(0);
+      } else {
+        outputArray[i][0] = rangeList.get(i).get(0);
+        outputArray[i][1] = rangeList.get(i).get(1);
+      }
+    }
+    return outputArray;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
index 5f3565c..ea1324e 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
@@ -18,135 +18,44 @@
  */
 package org.apache.carbondata.core.util;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.BitSet;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.carbon.datastore.block.BlockInfo;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.carbon.metadata.blocklet.SegmentInfo;
-import org.apache.carbondata.core.carbon.metadata.blocklet.compressor.ChunkCompressorMeta;
-import org.apache.carbondata.core.carbon.metadata.blocklet.compressor.CompressionCodec;
 import org.apache.carbondata.core.carbon.metadata.blocklet.datachunk.DataChunk;
-import org.apache.carbondata.core.carbon.metadata.blocklet.datachunk.PresenceMeta;
-import org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletBTreeIndex;
 import org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletIndex;
-import org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletMinMaxIndex;
-import org.apache.carbondata.core.carbon.metadata.blocklet.sort.SortState;
-import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
-import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastorage.store.FileHolder;
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
-import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.reader.CarbonFooterReader;
-import org.apache.carbondata.core.reader.CarbonIndexFileReader;
-import org.apache.carbondata.format.BlockIndex;
 import org.apache.carbondata.format.FileFooter;
 
 /**
  * Below class will be used to convert the thrift object of data file
  * meta data to wrapper object
  */
-public class DataFileFooterConverter {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(DataFileFooterConverter.class.getName());
-
-  /**
-   * Below method will be used to get the index info from index file
-   *
-   * @param filePath           file path of the index file
-   * @param tableBlockInfoList table block index
-   * @return list of index info
-   * @throws IOException problem while reading the index file
-   */
-  public List<DataFileFooter> getIndexInfo(String filePath, List<TableBlockInfo> tableBlockInfoList)
-      throws IOException, CarbonUtilException {
-    CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
-    List<DataFileFooter> dataFileFooters = new ArrayList<DataFileFooter>();
-    try {
-      // open the reader
-      indexReader.openThriftReader(filePath);
-      // get the index header
-      org.apache.carbondata.format.IndexHeader readIndexHeader = indexReader.readIndexHeader();
-      List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
-      List<org.apache.carbondata.format.ColumnSchema> table_columns =
-          readIndexHeader.getTable_columns();
-      for (int i = 0; i < table_columns.size(); i++) {
-        columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)));
-      }
-      // get the segment info
-      SegmentInfo segmentInfo = getSegmentInfo(readIndexHeader.getSegment_info());
-      BlockletIndex blockletIndex = null;
-      int counter = 0;
-      DataFileFooter dataFileFooter = null;
-      // read the block info from file
-      while (indexReader.hasNext()) {
-        BlockIndex readBlockIndexInfo = indexReader.readBlockIndexInfo();
-        blockletIndex = getBlockletIndex(readBlockIndexInfo.getBlock_index());
-        dataFileFooter = new DataFileFooter();
-        TableBlockInfo tableBlockInfo = tableBlockInfoList.get(counter++);
-        int blockletSize = getBlockletSize(readBlockIndexInfo);
-        tableBlockInfo.getBlockletInfos().setNoOfBlockLets(blockletSize);
-        dataFileFooter.setBlockletIndex(blockletIndex);
-        dataFileFooter.setColumnInTable(columnSchemaList);
-        dataFileFooter.setNumberOfRows(readBlockIndexInfo.getNum_rows());
-        dataFileFooter.setBlockInfo(new BlockInfo(tableBlockInfo));
-        dataFileFooter.setSegmentInfo(segmentInfo);
-        dataFileFooters.add(dataFileFooter);
-      }
-    } finally {
-      indexReader.closeThriftReader();
-    }
-    return dataFileFooters;
-  }
-
-  /**
-   * the methods returns the number of blocklets in a block
-   * @param readBlockIndexInfo
-   * @return
-   */
-  private int getBlockletSize(BlockIndex readBlockIndexInfo) {
-    long num_rows = readBlockIndexInfo.getNum_rows();
-    int blockletSize = Integer.parseInt(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.BLOCKLET_SIZE,
-            CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL));
-    int remainder = (int) (num_rows % blockletSize);
-    int noOfBlockLet = (int) (num_rows / blockletSize);
-    // there could be some blocklets which will not
-    // contain the total records equal to the blockletSize
-    if (remainder > 0) {
-      noOfBlockLet = noOfBlockLet + 1;
-    }
-    return noOfBlockLet;
-  }
+public class DataFileFooterConverter extends AbstractDataFileFooterConverter {
 
   /**
    * Below method will be used to convert thrift file meta to wrapper file meta
    */
-  public DataFileFooter readDataFileFooter(String filePath, long blockOffset, long blockLength)
+  @Override public DataFileFooter readDataFileFooter(TableBlockInfo tableBlockInfo)
       throws IOException {
     DataFileFooter dataFileFooter = new DataFileFooter();
     FileHolder fileReader = null;
     try {
-      long completeBlockLength = blockOffset + blockLength;
+      long completeBlockLength = tableBlockInfo.getBlockLength();
       long footerPointer = completeBlockLength - 8;
-      fileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath));
-      long actualFooterOffset = fileReader.readLong(filePath, footerPointer);
-      CarbonFooterReader reader = new CarbonFooterReader(filePath, actualFooterOffset);
+      fileReader = FileFactory.getFileHolder(FileFactory.getFileType(tableBlockInfo.getFilePath()));
+      long actualFooterOffset = fileReader.readLong(tableBlockInfo.getFilePath(), footerPointer);
+      CarbonFooterReader reader =
+          new CarbonFooterReader(tableBlockInfo.getFilePath(), actualFooterOffset);
       FileFooter footer = reader.readFooter();
-      dataFileFooter.setVersionId(footer.getVersion());
+      dataFileFooter.setVersionId((short) footer.getVersion());
       dataFileFooter.setNumberOfRows(footer.getNum_rows());
       dataFileFooter.setSegmentInfo(getSegmentInfo(footer.getSegment_info()));
       List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
@@ -183,66 +92,6 @@ public class DataFileFooterConverter {
   }
 
   /**
-   * Below method will be used to get blocklet index for data file meta
-   *
-   * @param blockletIndexList
-   * @return blocklet index
-   */
-  private BlockletIndex getBlockletIndexForDataFileFooter(List<BlockletIndex> blockletIndexList) {
-    BlockletIndex blockletIndex = new BlockletIndex();
-    BlockletBTreeIndex blockletBTreeIndex = new BlockletBTreeIndex();
-    blockletBTreeIndex.setStartKey(blockletIndexList.get(0).getBtreeIndex().getStartKey());
-    blockletBTreeIndex
-        .setEndKey(blockletIndexList.get(blockletIndexList.size() - 1).getBtreeIndex().getEndKey());
-    blockletIndex.setBtreeIndex(blockletBTreeIndex);
-    byte[][] currentMinValue = blockletIndexList.get(0).getMinMaxIndex().getMinValues().clone();
-    byte[][] currentMaxValue = blockletIndexList.get(0).getMinMaxIndex().getMaxValues().clone();
-    byte[][] minValue = null;
-    byte[][] maxValue = null;
-    for (int i = 1; i < blockletIndexList.size(); i++) {
-      minValue = blockletIndexList.get(i).getMinMaxIndex().getMinValues();
-      maxValue = blockletIndexList.get(i).getMinMaxIndex().getMaxValues();
-      for (int j = 0; j < maxValue.length; j++) {
-        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(currentMinValue[j], minValue[j]) > 0) {
-          currentMinValue[j] = minValue[j].clone();
-        }
-        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(currentMaxValue[j], maxValue[j]) < 0) {
-          currentMaxValue[j] = maxValue[j].clone();
-        }
-      }
-    }
-
-    BlockletMinMaxIndex minMax = new BlockletMinMaxIndex();
-    minMax.setMaxValues(currentMaxValue);
-    minMax.setMinValues(currentMinValue);
-    blockletIndex.setMinMaxIndex(minMax);
-    return blockletIndex;
-  }
-
-  private ColumnSchema thriftColumnSchmeaToWrapperColumnSchema(
-      org.apache.carbondata.format.ColumnSchema externalColumnSchema) {
-    ColumnSchema wrapperColumnSchema = new ColumnSchema();
-    wrapperColumnSchema.setColumnUniqueId(externalColumnSchema.getColumn_id());
-    wrapperColumnSchema.setColumnName(externalColumnSchema.getColumn_name());
-    wrapperColumnSchema.setColumnar(externalColumnSchema.isColumnar());
-    wrapperColumnSchema
-        .setDataType(thriftDataTyopeToWrapperDataType(externalColumnSchema.data_type));
-    wrapperColumnSchema.setDimensionColumn(externalColumnSchema.isDimension());
-    List<Encoding> encoders = new ArrayList<Encoding>();
-    for (org.apache.carbondata.format.Encoding encoder : externalColumnSchema.getEncoders()) {
-      encoders.add(fromExternalToWrapperEncoding(encoder));
-    }
-    wrapperColumnSchema.setEncodingList(encoders);
-    wrapperColumnSchema.setNumberOfChild(externalColumnSchema.getNum_child());
-    wrapperColumnSchema.setPrecision(externalColumnSchema.getPrecision());
-    wrapperColumnSchema.setColumnGroup(externalColumnSchema.getColumn_group_id());
-    wrapperColumnSchema.setScale(externalColumnSchema.getScale());
-    wrapperColumnSchema.setDefaultValue(externalColumnSchema.getDefault_value());
-    wrapperColumnSchema.setAggregateFunction(externalColumnSchema.getAggregate_function());
-    return wrapperColumnSchema;
-  }
-
-  /**
    * Below method is to convert the blocklet info of the thrift to wrapper
    * blocklet info
    *
@@ -273,228 +122,4 @@ public class DataFileFooterConverter {
     blockletInfo.setNumberOfRows(blockletInfoThrift.getNum_rows());
     return blockletInfo;
   }
-
-  /**
-   * Below method is convert the thrift encoding to wrapper encoding
-   *
-   * @param encoderThrift thrift encoding
-   * @return wrapper encoding
-   */
-  private Encoding fromExternalToWrapperEncoding(
-      org.apache.carbondata.format.Encoding encoderThrift) {
-    switch (encoderThrift) {
-      case DICTIONARY:
-        return Encoding.DICTIONARY;
-      case DELTA:
-        return Encoding.DELTA;
-      case RLE:
-        return Encoding.RLE;
-      case INVERTED_INDEX:
-        return Encoding.INVERTED_INDEX;
-      case BIT_PACKED:
-        return Encoding.BIT_PACKED;
-      case DIRECT_DICTIONARY:
-        return Encoding.DIRECT_DICTIONARY;
-      default:
-        return Encoding.DICTIONARY;
-    }
-  }
-
-  /**
-   * Below method will be used to convert the thrift compression to wrapper
-   * compression codec
-   *
-   * @param compressionCodecThrift
-   * @return wrapper compression codec
-   */
-  private CompressionCodec getCompressionCodec(
-      org.apache.carbondata.format.CompressionCodec compressionCodecThrift) {
-    switch (compressionCodecThrift) {
-      case SNAPPY:
-        return CompressionCodec.SNAPPY;
-      default:
-        return CompressionCodec.SNAPPY;
-    }
-  }
-
-  /**
-   * Below method will be used to convert thrift segment object to wrapper
-   * segment object
-   *
-   * @param segmentInfo thrift segment info object
-   * @return wrapper segment info object
-   */
-  private SegmentInfo getSegmentInfo(org.apache.carbondata.format.SegmentInfo segmentInfo) {
-    SegmentInfo info = new SegmentInfo();
-    int[] cardinality = new int[segmentInfo.getColumn_cardinalities().size()];
-    for (int i = 0; i < cardinality.length; i++) {
-      cardinality[i] = segmentInfo.getColumn_cardinalities().get(i);
-    }
-    info.setColumnCardinality(cardinality);
-    info.setNumberOfColumns(segmentInfo.getNum_cols());
-    return info;
-  }
-
-  /**
-   * Below method will be used to convert the blocklet index of thrift to
-   * wrapper
-   *
-   * @param blockletIndexThrift
-   * @return blocklet index wrapper
-   */
-  private BlockletIndex getBlockletIndex(
-      org.apache.carbondata.format.BlockletIndex blockletIndexThrift) {
-    org.apache.carbondata.format.BlockletBTreeIndex btreeIndex =
-        blockletIndexThrift.getB_tree_index();
-    org.apache.carbondata.format.BlockletMinMaxIndex minMaxIndex =
-        blockletIndexThrift.getMin_max_index();
-    return new BlockletIndex(
-        new BlockletBTreeIndex(btreeIndex.getStart_key(), btreeIndex.getEnd_key()),
-        new BlockletMinMaxIndex(minMaxIndex.getMin_values(), minMaxIndex.getMax_values()));
-  }
-
-  /**
-   * Below method will be used to convert the thrift compression meta to
-   * wrapper chunk compression meta
-   *
-   * @param chunkCompressionMetaThrift
-   * @return chunkCompressionMetaWrapper
-   */
-  private ChunkCompressorMeta getChunkCompressionMeta(
-      org.apache.carbondata.format.ChunkCompressionMeta chunkCompressionMetaThrift) {
-    ChunkCompressorMeta compressorMeta = new ChunkCompressorMeta();
-    compressorMeta
-        .setCompressor(getCompressionCodec(chunkCompressionMetaThrift.getCompression_codec()));
-    compressorMeta.setCompressedSize(chunkCompressionMetaThrift.getTotal_compressed_size());
-    compressorMeta.setUncompressedSize(chunkCompressionMetaThrift.getTotal_uncompressed_size());
-    return compressorMeta;
-  }
-
-  /**
-   * Below method will be used to convert the thrift data type to wrapper data
-   * type
-   *
-   * @param dataTypeThrift
-   * @return dataType wrapper
-   */
-  private DataType thriftDataTyopeToWrapperDataType(
-      org.apache.carbondata.format.DataType dataTypeThrift) {
-    switch (dataTypeThrift) {
-      case STRING:
-        return DataType.STRING;
-      case SHORT:
-        return DataType.SHORT;
-      case INT:
-        return DataType.INT;
-      case LONG:
-        return DataType.LONG;
-      case DOUBLE:
-        return DataType.DOUBLE;
-      case DECIMAL:
-        return DataType.DECIMAL;
-      case TIMESTAMP:
-        return DataType.TIMESTAMP;
-      case ARRAY:
-        return DataType.ARRAY;
-      case STRUCT:
-        return DataType.STRUCT;
-      default:
-        return DataType.STRING;
-    }
-  }
-
-  /**
-   * Below method will be used to convert the thrift presence meta to wrapper
-   * presence meta
-   *
-   * @param presentMetadataThrift
-   * @return wrapper presence meta
-   */
-  private PresenceMeta getPresenceMeta(
-      org.apache.carbondata.format.PresenceMeta presentMetadataThrift) {
-    PresenceMeta presenceMeta = new PresenceMeta();
-    presenceMeta.setRepresentNullValues(presentMetadataThrift.isRepresents_presence());
-    presenceMeta.setBitSet(BitSet.valueOf(presentMetadataThrift.getPresent_bit_stream()));
-    return presenceMeta;
-  }
-
-  /**
-   * Below method will be used to convert the thrift object to wrapper object
-   *
-   * @param sortStateThrift
-   * @return wrapper sort state object
-   */
-  private SortState getSortState(org.apache.carbondata.format.SortState sortStateThrift) {
-    if (sortStateThrift == org.apache.carbondata.format.SortState.SORT_EXPLICIT) {
-      return SortState.SORT_EXPLICT;
-    } else if (sortStateThrift == org.apache.carbondata.format.SortState.SORT_NATIVE) {
-      return SortState.SORT_NATIVE;
-    } else {
-      return SortState.SORT_NONE;
-    }
-  }
-
-  /**
-   * Below method will be used to convert the thrift data chunk to wrapper
-   * data chunk
-   *
-   * @param datachunkThrift
-   * @return wrapper data chunk
-   */
-  private DataChunk getDataChunk(org.apache.carbondata.format.DataChunk datachunkThrift,
-      boolean isPresenceMetaPresent) {
-    DataChunk dataChunk = new DataChunk();
-    dataChunk.setColumnUniqueIdList(datachunkThrift.getColumn_ids());
-    dataChunk.setDataPageLength(datachunkThrift.getData_page_length());
-    dataChunk.setDataPageOffset(datachunkThrift.getData_page_offset());
-    if (isPresenceMetaPresent) {
-      dataChunk.setNullValueIndexForColumn(getPresenceMeta(datachunkThrift.getPresence()));
-    }
-    dataChunk.setRlePageLength(datachunkThrift.getRle_page_length());
-    dataChunk.setRlePageOffset(datachunkThrift.getRle_page_offset());
-    dataChunk.setRowMajor(datachunkThrift.isRowMajor());
-    dataChunk.setRowIdPageLength(datachunkThrift.getRowid_page_length());
-    dataChunk.setRowIdPageOffset(datachunkThrift.getRowid_page_offset());
-    dataChunk.setSortState(getSortState(datachunkThrift.getSort_state()));
-    dataChunk.setChunkCompressionMeta(getChunkCompressionMeta(datachunkThrift.getChunk_meta()));
-    List<Encoding> encodingList = new ArrayList<Encoding>(datachunkThrift.getEncoders().size());
-    for (int i = 0; i < datachunkThrift.getEncoders().size(); i++) {
-      encodingList.add(fromExternalToWrapperEncoding(datachunkThrift.getEncoders().get(i)));
-    }
-    dataChunk.setEncoderList(encodingList);
-    if (encodingList.contains(Encoding.DELTA)) {
-      List<ByteBuffer> thriftEncoderMeta = datachunkThrift.getEncoder_meta();
-      List<ValueEncoderMeta> encodeMetaList =
-          new ArrayList<ValueEncoderMeta>(thriftEncoderMeta.size());
-      for (int i = 0; i < thriftEncoderMeta.size(); i++) {
-        encodeMetaList.add(deserializeEncoderMeta(thriftEncoderMeta.get(i).array()));
-      }
-      dataChunk.setValueEncoderMeta(encodeMetaList);
-    }
-    return dataChunk;
-  }
-
-  /**
-   * Below method will be used to convert the encode metadata to
-   * ValueEncoderMeta object
-   *
-   * @param encoderMeta
-   * @return ValueEncoderMeta object
-   */
-  private ValueEncoderMeta deserializeEncoderMeta(byte[] encoderMeta) {
-    // TODO : should remove the unnecessary fields.
-    ByteArrayInputStream aos = null;
-    ObjectInputStream objStream = null;
-    ValueEncoderMeta meta = null;
-    try {
-      aos = new ByteArrayInputStream(encoderMeta);
-      objStream = new ObjectInputStream(aos);
-      meta = (ValueEncoderMeta) objStream.readObject();
-    } catch (ClassNotFoundException e) {
-      LOGGER.error(e);
-    } catch (IOException e) {
-      CarbonUtil.closeStreams(objStream);
-    }
-    return meta;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
new file mode 100644
index 0000000..d971756
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.carbon.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletIndex;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.reader.CarbonFooterReader;
+import org.apache.carbondata.format.FileFooter;
+
+/**
+ * Below class will be used to convert the thrift object of data file
+ * meta data to wrapper object for version 2 data file
+ */
+
+public class DataFileFooterConverter2 extends AbstractDataFileFooterConverter {
+
+  /**
+   * Below method will be used to convert thrift file meta to wrapper file meta
+   */
+  @Override public DataFileFooter readDataFileFooter(TableBlockInfo tableBlockInfo)
+      throws IOException {
+    DataFileFooter dataFileFooter = new DataFileFooter();
+    CarbonFooterReader reader =
+        new CarbonFooterReader(tableBlockInfo.getFilePath(), tableBlockInfo.getBlockOffset());
+    FileFooter footer = reader.readFooter();
+    dataFileFooter.setVersionId((short) footer.getVersion());
+    dataFileFooter.setNumberOfRows(footer.getNum_rows());
+    dataFileFooter.setSegmentInfo(getSegmentInfo(footer.getSegment_info()));
+    List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+    List<org.apache.carbondata.format.ColumnSchema> table_columns = footer.getTable_columns();
+    for (int i = 0; i < table_columns.size(); i++) {
+      columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)));
+    }
+    dataFileFooter.setColumnInTable(columnSchemaList);
+
+    List<org.apache.carbondata.format.BlockletIndex> leaf_node_indices_Thrift =
+        footer.getBlocklet_index_list();
+    List<BlockletIndex> blockletIndexList = new ArrayList<BlockletIndex>();
+    for (int i = 0; i < leaf_node_indices_Thrift.size(); i++) {
+      BlockletIndex blockletIndex = getBlockletIndex(leaf_node_indices_Thrift.get(i));
+      blockletIndexList.add(blockletIndex);
+    }
+    List<org.apache.carbondata.format.BlockletInfo2> leaf_node_infos_Thrift =
+        footer.getBlocklet_info_list2();
+    List<BlockletInfo> blockletInfoList = new ArrayList<BlockletInfo>();
+    for (int i = 0; i < leaf_node_infos_Thrift.size(); i++) {
+      BlockletInfo blockletInfo = getBlockletInfo(leaf_node_infos_Thrift.get(i),
+          getNumberOfDimensionColumns(columnSchemaList));
+      blockletInfo.setBlockletIndex(blockletIndexList.get(i));
+      blockletInfoList.add(blockletInfo);
+    }
+    dataFileFooter.setBlockletList(blockletInfoList);
+    dataFileFooter.setBlockletIndex(getBlockletIndexForDataFileFooter(blockletIndexList));
+    return dataFileFooter;
+  }
+
+  /**
+   * Below method is to convert the blocklet info of the thrift to wrapper
+   * blocklet info
+   *
+   * @param blockletInfoThrift blocklet info of the thrift
+   * @return blocklet info wrapper
+   */
+  private BlockletInfo getBlockletInfo(
+      org.apache.carbondata.format.BlockletInfo2 blockletInfoThrift, int numberOfDimensionColumns) {
+    BlockletInfo blockletInfo = new BlockletInfo();
+    List<Long> dimensionColumnChunkOffsets =
+        blockletInfoThrift.getColumn_data_chunks_offsets().subList(0, numberOfDimensionColumns);
+    List<Long> measureColumnChunksOffsets = blockletInfoThrift.getColumn_data_chunks_offsets()
+        .subList(numberOfDimensionColumns,
+            blockletInfoThrift.getColumn_data_chunks_offsets().size());
+    List<Short> dimensionColumnChunkLength =
+        blockletInfoThrift.getColumn_data_chunks_length().subList(0, numberOfDimensionColumns);
+    List<Short> measureColumnChunksLength = blockletInfoThrift.getColumn_data_chunks_length()
+        .subList(numberOfDimensionColumns,
+            blockletInfoThrift.getColumn_data_chunks_offsets().size());
+    blockletInfo.setDimensionChunkOffsets(dimensionColumnChunkOffsets);
+    blockletInfo.setMeasureChunkOffsets(measureColumnChunksOffsets);
+    blockletInfo.setDimensionChunksLength(dimensionColumnChunkLength);
+    blockletInfo.setMeasureChunksLength(measureColumnChunksLength);
+    blockletInfo.setNumberOfRows(blockletInfoThrift.getNum_rows());
+    return blockletInfo;
+  }
+
+  /**
+   * Below method will be used to get the number of dimension column
+   * in carbon column schema
+   *
+   * @param columnSchemaList column schema list
+   * @return number of dimension column
+   */
+  private int getNumberOfDimensionColumns(List<ColumnSchema> columnSchemaList) {
+    int numberOfDimensionColumns = 0;
+    int previousColumnGroupId = -1;
+    ColumnSchema columnSchema = null;
+    for (int i = 0; i < columnSchemaList.size(); i++) {
+      columnSchema = columnSchemaList.get(i);
+      if (columnSchema.isDimensionColumn() && columnSchema.isColumnar()) {
+        numberOfDimensionColumns++;
+      } else if (columnSchema.isDimensionColumn()) {
+        if (previousColumnGroupId != columnSchema.getColumnGroupId()) {
+          previousColumnGroupId = columnSchema.getColumnGroupId();
+          numberOfDimensionColumns++;
+        }
+      } else {
+        break;
+      }
+    }
+    return numberOfDimensionColumns;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
new file mode 100644
index 0000000..a079ad7
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.util;
+
+
+/**
+ * Factory class to get the thrift reader object based on version
+ */
+public class DataFileFooterConverterFactory {
+
+  /**
+   * static instance
+   */
+  private static final DataFileFooterConverterFactory FOOTER_CONVERTER_FACTORY =
+      new DataFileFooterConverterFactory();
+
+  /**
+   * private constructor
+   */
+  private DataFileFooterConverterFactory() {
+
+  }
+
+  /**
+   * Below method will be used to get the instance of this class
+   *
+   * @return DataFileFooterConverterFactory instance
+   */
+  public static DataFileFooterConverterFactory getInstance() {
+    return FOOTER_CONVERTER_FACTORY;
+  }
+
+  /**
+   * Method will be used to get the file footer converter instance based on version
+   *
+   * @param versionNumber
+   * @return footer reader instance
+   */
+  public AbstractDataFileFooterConverter getDataFileFooterConverter(final short versionNumber) {
+    switch (versionNumber) {
+      case 2:
+        return new DataFileFooterConverter2();
+      default:
+        return new DataFileFooterConverter();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/writer/CarbonFooterWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonFooterWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonFooterWriter.java
index 04d2b97..758c2d9 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonFooterWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonFooterWriter.java
@@ -28,9 +28,6 @@ import org.apache.carbondata.format.FileFooter;
  */
 public class CarbonFooterWriter {
 
-  // It is version number of this format class.
-  private static int VERSION_NUMBER = 1;
-
   // Fact file path
   private String filePath;
 
@@ -48,7 +45,6 @@ public class CarbonFooterWriter {
   public void writeFooter(FileFooter footer, long currentPosition) throws IOException {
 
     ThriftWriter thriftWriter = openThriftWriter(filePath);
-    footer.setVersion(VERSION_NUMBER);
     try {
       thriftWriter.write(footer);
       thriftWriter.writeOffset(currentPosition);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
index 85e979a..8289c8b 100644
--- a/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
@@ -18,7 +18,12 @@
  */
 package org.apache.carbondata.scan.executor.impl;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.Executors;
 
 import org.apache.carbondata.common.logging.LogService;
@@ -33,7 +38,8 @@ import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
 import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.carbon.querystatistics.*;
+import org.apache.carbondata.core.carbon.querystatistics.QueryStatistic;
+import org.apache.carbondata.core.carbon.querystatistics.QueryStatisticsConstants;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
@@ -90,7 +96,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     // Initializing statistics list to record the query statistics
     // creating copy on write to handle concurrent scenario
     queryProperties.queryStatisticsRecorder =
-            CarbonTimeStatisticsFactory.createExecutorRecorder(queryModel.getQueryId());
+        CarbonTimeStatisticsFactory.createExecutorRecorder(queryModel.getQueryId());
     queryModel.setStatisticsRecorder(queryProperties.queryStatisticsRecorder);
     QueryUtil.resolveQueryModel(queryModel);
     QueryStatistic queryStatistic = new QueryStatistic();
@@ -143,9 +149,11 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     // and measure column start index
     queryProperties.aggExpressionStartIndex = queryModel.getQueryMeasures().size();
     queryProperties.measureStartIndex = aggTypes.length - queryModel.getQueryMeasures().size();
+    queryProperties.filterMeasures = new HashSet<>();
+    queryProperties.complexFilterDimension = new HashSet<>();
+    QueryUtil.getAllFilterDimensions(queryModel.getFilterExpressionResolverTree(),
+        queryProperties.complexFilterDimension, queryProperties.filterMeasures);
 
-    queryProperties.complexFilterDimension =
-        QueryUtil.getAllFilterDimensions(queryModel.getFilterExpressionResolverTree());
     queryStatistic = new QueryStatistic();
     // dictionary column unique column id to dictionary mapping
     // which will be used to get column actual data
@@ -314,13 +322,38 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     List<CarbonMeasure> expressionMeasures =
         new ArrayList<CarbonMeasure>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     // setting all the dimension chunk indexes to be read from file
-    blockExecutionInfo.setAllSelectedDimensionBlocksIndexes(QueryUtil
-        .getDimensionsBlockIndexes(updatedQueryDimension,
-            segmentProperties.getDimensionOrdinalToBlockMapping(), expressionDimensions));
-    // setting all the measure chunk indexes to be read from file
-    blockExecutionInfo.setAllSelectedMeasureBlocksIndexes(QueryUtil
+    int numberOfElementToConsider = 0;
+    int[] dimensionsBlockIndexes = QueryUtil.getDimensionsBlockIndexes(updatedQueryDimension,
+        segmentProperties.getDimensionOrdinalToBlockMapping(), expressionDimensions,
+        queryProperties.complexFilterDimension);
+    if (dimensionsBlockIndexes.length > 0) {
+      numberOfElementToConsider = dimensionsBlockIndexes[dimensionsBlockIndexes.length - 1]
+          == segmentProperties.getBlockTodimensionOrdinalMapping().size() - 1 ?
+          dimensionsBlockIndexes.length - 1 :
+          dimensionsBlockIndexes.length;
+      blockExecutionInfo.setAllSelectedDimensionBlocksIndexes(CarbonUtil
+          .getRangeIndex(dimensionsBlockIndexes, numberOfElementToConsider,
+              CarbonCommonConstants.NUMBER_OF_COLUMN_READ_IN_IO));
+    } else {
+      blockExecutionInfo.setAllSelectedDimensionBlocksIndexes(new int[0][0]);
+    }
+
+    int[] measureBlockIndexes = QueryUtil
         .getMeasureBlockIndexes(queryModel.getQueryMeasures(), expressionMeasures,
-            segmentProperties.getMeasuresOrdinalToBlockMapping()));
+            segmentProperties.getMeasuresOrdinalToBlockMapping(), queryProperties.filterMeasures);
+    if (measureBlockIndexes.length > 0) {
+
+      numberOfElementToConsider = measureBlockIndexes[measureBlockIndexes.length - 1]
+          == segmentProperties.getMeasures().size() - 1 ?
+          measureBlockIndexes.length - 1 :
+          measureBlockIndexes.length;
+      // setting all the measure chunk indexes to be read from file
+      blockExecutionInfo.setAllSelectedMeasureBlocksIndexes(CarbonUtil
+          .getRangeIndex(measureBlockIndexes, numberOfElementToConsider,
+              CarbonCommonConstants.NUMBER_OF_COLUMN_READ_IN_IO));
+    } else {
+      blockExecutionInfo.setAllSelectedMeasureBlocksIndexes(new int[0][0]);
+    }
     // setting the key structure info which will be required
     // to update the older block key with new key generator
     blockExecutionInfo.setKeyStructureInfo(queryProperties.keyStructureInfo);