You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/12/01 09:53:57 UTC

[4/5] incubator-carbondata git commit: Improve first time query performance

Improve first time query performance

Rebased


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/d54dc647
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/d54dc647
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/d54dc647

Branch: refs/heads/master
Commit: d54dc647c69496ecaa7e0c8a9cc3d8e9028ab73f
Parents: 9ad98f4
Author: kumarvishal <ku...@gmail.com>
Authored: Thu Oct 27 22:54:49 2016 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Dec 1 15:18:35 2016 +0530

----------------------------------------------------------------------
 .../core/carbon/datastore/BlockIndexStore.java  |   4 +-
 .../core/carbon/datastore/DataRefNode.java      |  18 +-
 .../datastore/block/SegmentProperties.java      |   7 +
 .../carbon/datastore/block/TableBlockInfo.java  |  20 +-
 .../chunk/reader/CarbonDataReaderFactory.java   |  93 +++++
 .../reader/DimensionColumnChunkReader.java      |   2 +-
 .../chunk/reader/MeasureColumnChunkReader.java  |   2 +-
 .../reader/dimension/AbstractChunkReader.java   |  12 +-
 ...CompressedDimensionChunkFileBasedReader.java | 135 -------
 ...mpressedDimensionChunkFileBasedReaderV1.java | 146 +++++++
 ...mpressedDimensionChunkFileBasedReaderV2.java | 308 +++++++++++++++
 .../measure/AbstractMeasureChunkReader.java     |  33 +-
 .../CompressedMeasureChunkFileBasedReader.java  |  92 -----
 ...CompressedMeasureChunkFileBasedReaderV1.java | 106 +++++
 ...CompressedMeasureChunkFileBasedReaderV2.java | 234 +++++++++++
 .../impl/btree/AbstractBTreeLeafNode.java       |   4 +-
 .../datastore/impl/btree/BTreeNonLeafNode.java  |   4 +-
 .../impl/btree/BlockletBTreeLeafNode.java       |  36 +-
 .../carbon/metadata/blocklet/BlockletInfo.java  |  40 ++
 .../metadata/blocklet/DataFileFooter.java       |   6 +-
 .../core/constants/CarbonCommonConstants.java   |  18 +-
 .../util/AbstractDataFileFooterConverter.java   | 390 ++++++++++++++++++
 .../core/util/CarbonMetadataUtil.java           | 158 +++++++-
 .../carbondata/core/util/CarbonProperties.java  | 121 +++---
 .../apache/carbondata/core/util/CarbonUtil.java | 209 ++++++++--
 .../core/util/DataFileFooterConverter.java      | 391 +------------------
 .../core/util/DataFileFooterConverter2.java     | 135 +++++++
 .../util/DataFileFooterConverterFactory.java    |  64 +++
 .../core/writer/CarbonFooterWriter.java         |   4 -
 .../executor/impl/AbstractQueryExecutor.java    |  55 ++-
 .../executor/impl/QueryExecutorProperties.java  |   4 +-
 .../scan/executor/infos/BlockExecutionInfo.java |  12 +-
 .../scan/executor/util/QueryUtil.java           | 102 +++--
 .../scan/scanner/AbstractBlockletScanner.java   |   7 +-
 .../scan/scanner/impl/FilterScanner.java        |  48 ++-
 .../carbon/datastore/block/BlockInfoTest.java   |  12 +-
 .../datastore/block/TableBlockInfoTest.java     |  32 +-
 .../datastore/block/TableTaskInfoTest.java      |   8 +-
 ...ressedDimensionChunkFileBasedReaderTest.java |  23 +-
 ...mpressedMeasureChunkFileBasedReaderTest.java |  30 +-
 .../core/util/CarbonMetadataUtilTest.java       |   3 +-
 .../carbondata/core/util/CarbonUtilTest.java    |  18 +-
 .../core/util/DataFileFooterConverterTest.java  |  11 +-
 format/src/main/thrift/carbondata.thrift        |  30 +-
 .../carbondata/hadoop/CarbonInputFormat.java    |  66 ++--
 .../carbondata/hadoop/CarbonInputSplit.java     | 107 +++--
 .../internal/index/impl/InMemoryBTreeIndex.java |   5 +-
 .../spark/merger/CarbonCompactionUtil.java      |   4 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   4 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   2 +-
 .../spark/src/test/resources/OLDFORMATTABLE.csv |  34 ++
 .../src/test/resources/OLDFORMATTABLEHIVE.csv   |  33 ++
 .../TestQueryWithOldCarbonDataFile.scala        |  70 ++++
 .../store/CarbonDataWriterFactory.java          |  70 ++++
 .../store/CarbonFactDataHandlerColumnar.java    |  54 ++-
 .../store/writer/AbstractFactDataWriter.java    | 316 +++++++--------
 .../store/writer/CarbonDataWriterVo.java        | 321 +++++++++++++++
 .../store/writer/CarbonFactDataWriterImpl2.java | 285 ++++++++++++++
 ...actDataWriterImplForIntIndexAndAggBlock.java | 223 +++--------
 .../processing/store/writer/NodeHolder.java     |  38 +-
 .../carbon/datastore/BlockIndexStoreTest.java   |  43 +-
 61 files changed, 3504 insertions(+), 1358 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
index 34c2983..d7ba318 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/BlockIndexStore.java
@@ -265,9 +265,7 @@ public class BlockIndexStore {
     AbstractIndex tableBlock;
     DataFileFooter footer;
     // getting the data file meta data of the block
-    footer = CarbonUtil.readMetadatFile(blockInfo.getTableBlockInfo().getFilePath(),
-        blockInfo.getTableBlockInfo().getBlockOffset(),
-        blockInfo.getTableBlockInfo().getBlockLength());
+    footer = CarbonUtil.readMetadatFile(blockInfo.getTableBlockInfo());
     tableBlock = new BlockIndex();
     footer.setBlockInfo(blockInfo);
     // building the block

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/carbon/datastore/DataRefNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/DataRefNode.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/DataRefNode.java
index e81a9a6..0ddd8c5 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/DataRefNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/DataRefNode.java
@@ -71,10 +71,15 @@ public interface DataRefNode {
    * Below method will be used to get the dimension chunks
    *
    * @param fileReader   file reader to read the chunks from file
-   * @param blockIndexes indexes of the blocks need to be read
+   * @param blockIndexes range indexes of the blocks need to be read
+   *                     value can be {{0,10},{11,12},{13,13}}
+   *                     here 0 to 10 and 11 to 12 column blocks will be read in one
+   *                     IO operation 13th column block will be read separately
+   *                     This will be helpful to reduce IO by reading bigger chunk of
+   *                     data in On IO
    * @return dimension data chunks
    */
-  DimensionColumnDataChunk[] getDimensionChunks(FileHolder fileReader, int[] blockIndexes);
+  DimensionColumnDataChunk[] getDimensionChunks(FileHolder fileReader, int[][] blockIndexes);
 
   /**
    * Below method will be used to get the dimension chunk
@@ -89,10 +94,15 @@ public interface DataRefNode {
    * Below method will be used to get the measure chunk
    *
    * @param fileReader   file reader to read the chunk from file
-   * @param blockIndexes block indexes to be read from file
+   * @param blockIndexes range indexes of the blocks need to be read
+   *                     value can be {{0,10},{11,12},{13,13}}
+   *                     here 0 to 10 and 11 to 12 column blocks will be read in one
+   *                     IO operation 13th column block will be read separately
+   *                     This will be helpful to reduce IO by reading bigger chunk of
+   *                     data in On IO
    * @return measure column data chunk
    */
-  MeasureColumnDataChunk[] getMeasureChunks(FileHolder fileReader, int[] blockIndexes);
+  MeasureColumnDataChunk[] getMeasureChunks(FileHolder fileReader, int[][] blockIndexes);
 
   /**
    * Below method will be used to read the measure chunk

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/SegmentProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/SegmentProperties.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/SegmentProperties.java
index 816ca3a..05ad4e6 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/SegmentProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/SegmentProperties.java
@@ -745,4 +745,11 @@ public class SegmentProperties {
     return blockTodimensionOrdinalMapping.get(blockIndex);
   }
 
+  /**
+   * @return It returns block index to dimension ordinal mapping
+   */
+  public Map<Integer, Set<Integer>> getBlockTodimensionOrdinalMapping() {
+    return blockTodimensionOrdinalMapping;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
index 4bf0047..0d60567 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
@@ -56,18 +56,21 @@ public class TableBlockInfo implements Distributable, Serializable {
   private String segmentId;
 
   private String[] locations;
+
+  private short version;
   /**
    * The class holds the blockletsinfo
    */
   private BlockletInfos blockletInfos = new BlockletInfos();
 
   public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
-      long blockLength) {
+      long blockLength, short version) {
     this.filePath = FileFactory.getUpdatedFilePath(filePath);
     this.blockOffset = blockOffset;
     this.segmentId = segmentId;
     this.locations = locations;
     this.blockLength = blockLength;
+    this.version = version;
   }
 
   /**
@@ -81,13 +84,14 @@ public class TableBlockInfo implements Distributable, Serializable {
    * @param blockletInfos
    */
   public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
-      long blockLength, BlockletInfos blockletInfos) {
+      long blockLength, BlockletInfos blockletInfos, short version) {
     this.filePath = FileFactory.getUpdatedFilePath(filePath);
     this.blockOffset = blockOffset;
     this.segmentId = segmentId;
     this.locations = locations;
     this.blockLength = blockLength;
     this.blockletInfos = blockletInfos;
+    this.version = version;
   }
 
   /**
@@ -104,6 +108,10 @@ public class TableBlockInfo implements Distributable, Serializable {
     return blockOffset;
   }
 
+  public void setBlockOffset(long blockOffset) {
+    this.blockOffset = blockOffset;
+  }
+
   /**
    * @return the segmentId
    */
@@ -250,4 +258,12 @@ public class TableBlockInfo implements Distributable, Serializable {
   public void setBlockletInfos(BlockletInfos blockletInfos) {
     this.blockletInfos = blockletInfos;
   }
+
+  public short getVersion() {
+    return version;
+  }
+
+  public void setVersion(short version) {
+    this.version = version;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/CarbonDataReaderFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/CarbonDataReaderFactory.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/CarbonDataReaderFactory.java
new file mode 100644
index 0000000..08a1869
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/CarbonDataReaderFactory.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.carbon.datastore.chunk.reader;
+
+import org.apache.carbondata.core.carbon.datastore.chunk.reader.dimension.v1.CompressedDimensionChunkFileBasedReaderV1;
+import org.apache.carbondata.core.carbon.datastore.chunk.reader.dimension.v2.CompressedDimensionChunkFileBasedReaderV2;
+import org.apache.carbondata.core.carbon.datastore.chunk.reader.measure.v1.CompressedMeasureChunkFileBasedReaderV1;
+import org.apache.carbondata.core.carbon.datastore.chunk.reader.measure.v2.CompressedMeasureChunkFileBasedReaderV2;
+import org.apache.carbondata.core.carbon.metadata.blocklet.BlockletInfo;
+
+/**
+ * Factory class to get the data reader instance based on version
+ */
+public class CarbonDataReaderFactory {
+
+  /**
+   * static instance
+   */
+  private static final CarbonDataReaderFactory CARBON_DATA_READER_FACTORY =
+      new CarbonDataReaderFactory();
+
+  /**
+   * private constructor
+   */
+  private CarbonDataReaderFactory() {
+
+  }
+
+  /**
+   * To get the instance of the reader factor
+   *
+   * @return reader factory
+   */
+  public static CarbonDataReaderFactory getInstance() {
+    return CARBON_DATA_READER_FACTORY;
+  }
+
+  /**
+   * Below method will be used to get the dimension column chunk reader based on version number
+   *
+   * @param version             reader version
+   * @param blockletInfo        blocklet info
+   * @param eachColumnValueSize size of each dimension column
+   * @param filePath            carbon data file path
+   * @return dimension column data reader based on version number
+   */
+  public DimensionColumnChunkReader getDimensionColumnChunkReader(short version,
+      BlockletInfo blockletInfo, int[] eachColumnValueSize, String filePath) {
+    switch (version) {
+      case 2:
+        return new CompressedDimensionChunkFileBasedReaderV2(blockletInfo, eachColumnValueSize,
+            filePath);
+      default:
+        return new CompressedDimensionChunkFileBasedReaderV1(blockletInfo, eachColumnValueSize,
+            filePath);
+    }
+  }
+
+  /**
+   * Below method will be used to get the measure column chunk reader based version number
+   *
+   * @param version      reader version
+   * @param blockletInfo blocklet info
+   * @param filePath     carbon data file path
+   * @return measure column data reader based on version number
+   */
+  public MeasureColumnChunkReader getMeasureColumnChunkReader(short version,
+      BlockletInfo blockletInfo, String filePath) {
+    switch (version) {
+      case 2:
+        return new CompressedMeasureChunkFileBasedReaderV2(blockletInfo, filePath);
+      default:
+        return new CompressedMeasureChunkFileBasedReaderV1(blockletInfo, filePath);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/DimensionColumnChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/DimensionColumnChunkReader.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/DimensionColumnChunkReader.java
index b958245..0153211 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/DimensionColumnChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/DimensionColumnChunkReader.java
@@ -35,7 +35,7 @@ public interface DimensionColumnChunkReader {
    * @param blockIndexes blocks to be read
    * @return dimension column chunks
    */
-  DimensionColumnDataChunk[] readDimensionChunks(FileHolder fileReader, int... blockIndexes);
+  DimensionColumnDataChunk[] readDimensionChunks(FileHolder fileReader, int[][] blockIndexes);
 
   /**
    * Below method will be used to read the chunk based on block index

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/MeasureColumnChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/MeasureColumnChunkReader.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/MeasureColumnChunkReader.java
index 8a7c8ef..523a14e 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/MeasureColumnChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/MeasureColumnChunkReader.java
@@ -33,7 +33,7 @@ public interface MeasureColumnChunkReader {
    * @param blockIndexes blocks to be read
    * @return measure data chunks
    */
-  MeasureColumnDataChunk[] readMeasureChunks(FileHolder fileReader, int... blockIndexes);
+  MeasureColumnDataChunk[] readMeasureChunks(FileHolder fileReader, int[][] blockIndexes);
 
   /**
    * Method to read the blocks data based on block index

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/AbstractChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/AbstractChunkReader.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/AbstractChunkReader.java
index 59dcd38..ced33fe 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/AbstractChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/AbstractChunkReader.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.core.carbon.datastore.chunk.reader.DimensionColumnChunkReader;
-import org.apache.carbondata.core.carbon.metadata.blocklet.datachunk.DataChunk;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastorage.store.compression.Compressor;
 import org.apache.carbondata.core.datastorage.store.compression.SnappyCompression;
@@ -43,12 +42,6 @@ public abstract class AbstractChunkReader implements DimensionColumnChunkReader
       SnappyCompression.SnappyByteCompression.INSTANCE;
 
   /**
-   * data chunk list which holds the information
-   * about the data block metadata
-   */
-  protected List<DataChunk> dimensionColumnChunk;
-
-  /**
    * size of the each column value
    * for no dictionary column it will be -1
    */
@@ -79,9 +72,8 @@ public abstract class AbstractChunkReader implements DimensionColumnChunkReader
    * @param eachColumnValueSize  size of the each column value
    * @param filePath             file from which data will be read
    */
-  public AbstractChunkReader(List<DataChunk> dimensionColumnChunk, int[] eachColumnValueSize,
-      String filePath) {
-    this.dimensionColumnChunk = dimensionColumnChunk;
+  public AbstractChunkReader(final int[] eachColumnValueSize,
+      final String filePath) {
     this.eachColumnValueSize = eachColumnValueSize;
     this.filePath = filePath;
     int numberOfElement = 0;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/CompressedDimensionChunkFileBasedReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/CompressedDimensionChunkFileBasedReader.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/CompressedDimensionChunkFileBasedReader.java
deleted file mode 100644
index 209217b..0000000
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/CompressedDimensionChunkFileBasedReader.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.core.carbon.datastore.chunk.reader.dimension;
-
-import java.util.List;
-
-import org.apache.carbondata.core.carbon.datastore.chunk.DimensionChunkAttributes;
-import org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
-import org.apache.carbondata.core.carbon.datastore.chunk.impl.ColumnGroupDimensionDataChunk;
-import org.apache.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
-import org.apache.carbondata.core.carbon.datastore.chunk.impl.VariableLengthDimensionDataChunk;
-import org.apache.carbondata.core.carbon.metadata.blocklet.datachunk.DataChunk;
-import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
-import org.apache.carbondata.core.datastorage.store.FileHolder;
-import org.apache.carbondata.core.datastorage.store.columnar.UnBlockIndexer;
-import org.apache.carbondata.core.util.CarbonUtil;
-
-/**
- * Compressed dimension chunk reader class
- */
-public class CompressedDimensionChunkFileBasedReader extends AbstractChunkReader {
-
-  /**
-   * Constructor to get minimum parameter to create instance of this class
-   *
-   * @param dimensionColumnChunk dimension chunk metadata
-   * @param eachColumnValueSize  size of the each column value
-   * @param filePath             file from which data will be read
-   */
-  public CompressedDimensionChunkFileBasedReader(List<DataChunk> dimensionColumnChunk,
-      int[] eachColumnValueSize, String filePath) {
-    super(dimensionColumnChunk, eachColumnValueSize, filePath);
-  }
-
-  /**
-   * Below method will be used to read the chunk based on block indexes
-   *
-   * @param fileReader   file reader to read the blocks from file
-   * @param blockIndexes blocks to be read
-   * @return dimension column chunks
-   */
-  @Override public DimensionColumnDataChunk[] readDimensionChunks(FileHolder fileReader,
-      int... blockIndexes) {
-    // read the column chunk based on block index and add
-    DimensionColumnDataChunk[] dataChunks =
-        new DimensionColumnDataChunk[dimensionColumnChunk.size()];
-    for (int i = 0; i < blockIndexes.length; i++) {
-      dataChunks[blockIndexes[i]] = readDimensionChunk(fileReader, blockIndexes[i]);
-    }
-    return dataChunks;
-  }
-
-  /**
-   * Below method will be used to read the chunk based on block index
-   *
-   * @param fileReader file reader to read the blocks from file
-   * @param blockIndex block to be read
-   * @return dimension column chunk
-   */
-  @Override public DimensionColumnDataChunk readDimensionChunk(FileHolder fileReader,
-      int blockIndex) {
-    byte[] dataPage = null;
-    int[] invertedIndexes = null;
-    int[] invertedIndexesReverse = null;
-    int[] rlePage = null;
-
-    // first read the data and uncompressed it
-    dataPage = COMPRESSOR.unCompress(fileReader
-        .readByteArray(filePath, dimensionColumnChunk.get(blockIndex).getDataPageOffset(),
-            dimensionColumnChunk.get(blockIndex).getDataPageLength()));
-    // if row id block is present then read the row id chunk and uncompress it
-    if (CarbonUtil.hasEncoding(dimensionColumnChunk.get(blockIndex).getEncodingList(),
-        Encoding.INVERTED_INDEX)) {
-      invertedIndexes = CarbonUtil
-          .getUnCompressColumnIndex(dimensionColumnChunk.get(blockIndex).getRowIdPageLength(),
-              fileReader.readByteArray(filePath,
-                  dimensionColumnChunk.get(blockIndex).getRowIdPageOffset(),
-                  dimensionColumnChunk.get(blockIndex).getRowIdPageLength()), numberComressor);
-      // get the reverse index
-      invertedIndexesReverse = getInvertedReverseIndex(invertedIndexes);
-    }
-    // if rle is applied then read the rle block chunk and then uncompress
-    //then actual data based on rle block
-    if (CarbonUtil
-        .hasEncoding(dimensionColumnChunk.get(blockIndex).getEncodingList(), Encoding.RLE)) {
-      // read and uncompress the rle block
-      rlePage = numberComressor.unCompress(fileReader
-          .readByteArray(filePath, dimensionColumnChunk.get(blockIndex).getRlePageOffset(),
-              dimensionColumnChunk.get(blockIndex).getRlePageLength()));
-      // uncompress the data with rle indexes
-      dataPage = UnBlockIndexer.uncompressData(dataPage, rlePage, eachColumnValueSize[blockIndex]);
-      rlePage = null;
-    }
-    // fill chunk attributes
-    DimensionChunkAttributes chunkAttributes = new DimensionChunkAttributes();
-    chunkAttributes.setEachRowSize(eachColumnValueSize[blockIndex]);
-    chunkAttributes.setInvertedIndexes(invertedIndexes);
-    chunkAttributes.setInvertedIndexesReverse(invertedIndexesReverse);
-    DimensionColumnDataChunk columnDataChunk = null;
-
-    if (dimensionColumnChunk.get(blockIndex).isRowMajor()) {
-      // to store fixed length column chunk values
-      columnDataChunk = new ColumnGroupDimensionDataChunk(dataPage, chunkAttributes);
-    }
-    // if no dictionary column then first create a no dictionary column chunk
-    // and set to data chunk instance
-    else if (!CarbonUtil
-        .hasEncoding(dimensionColumnChunk.get(blockIndex).getEncodingList(), Encoding.DICTIONARY)) {
-      columnDataChunk =
-          new VariableLengthDimensionDataChunk(getNoDictionaryDataChunk(dataPage), chunkAttributes);
-      chunkAttributes.setNoDictionary(true);
-    } else {
-      // to store fixed length column chunk values
-      columnDataChunk = new FixedLengthDimensionDataChunk(dataPage, chunkAttributes);
-    }
-    return columnDataChunk;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
new file mode 100644
index 0000000..3ead985
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.carbon.datastore.chunk.reader.dimension.v1;
+
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.datastore.chunk.DimensionChunkAttributes;
+import org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.carbon.datastore.chunk.impl.ColumnGroupDimensionDataChunk;
+import org.apache.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import org.apache.carbondata.core.carbon.datastore.chunk.impl.VariableLengthDimensionDataChunk;
+import org.apache.carbondata.core.carbon.datastore.chunk.reader.dimension.AbstractChunkReader;
+import org.apache.carbondata.core.carbon.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.carbon.metadata.blocklet.datachunk.DataChunk;
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.apache.carbondata.core.datastorage.store.FileHolder;
+import org.apache.carbondata.core.datastorage.store.columnar.UnBlockIndexer;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+/**
+ * Compressed dimension chunk reader class
+ */
+public class CompressedDimensionChunkFileBasedReaderV1 extends AbstractChunkReader {
+
+  /**
+   * data chunk list which holds the information
+   * about the data block metadata
+   */
+  private final List<DataChunk> dimensionColumnChunk;
+
+  /**
+   * Constructor to get minimum parameter to create instance of this class
+   *
+   * @param blockletInfo        blocklet info
+   * @param eachColumnValueSize size of the each column value
+   * @param filePath            file from which data will be read
+   */
+  public CompressedDimensionChunkFileBasedReaderV1(final BlockletInfo blockletInfo,
+      final int[] eachColumnValueSize, final String filePath) {
+    super(eachColumnValueSize, filePath);
+    this.dimensionColumnChunk = blockletInfo.getDimensionColumnChunk();
+  }
+
+  /**
+   * Below method will be used to read the chunk based on block indexes
+   *
+   * @param fileReader   file reader to read the blocks from file
+   * @param blockIndexes blocks to be read
+   * @return dimension column chunks
+   */
+  @Override public DimensionColumnDataChunk[] readDimensionChunks(FileHolder fileReader,
+      int[][] blockIndexes) {
+    // read the column chunk based on block index and add
+    DimensionColumnDataChunk[] dataChunks =
+        new DimensionColumnDataChunk[dimensionColumnChunk.size()];
+    for (int i = 0; i < blockIndexes.length; i++) {
+      for (int j = blockIndexes[i][0]; j <= blockIndexes[i][1]; j++) {
+        dataChunks[j] = readDimensionChunk(fileReader, j);
+      }
+    }
+    return dataChunks;
+  }
+
+  /**
+   * Below method will be used to read the chunk based on block index
+   *
+   * @param fileReader file reader to read the blocks from file
+   * @param blockIndex block to be read
+   * @return dimension column chunk
+   */
+  @Override public DimensionColumnDataChunk readDimensionChunk(FileHolder fileReader,
+      int blockIndex) {
+    byte[] dataPage = null;
+    int[] invertedIndexes = null;
+    int[] invertedIndexesReverse = null;
+    int[] rlePage = null;
+
+    // first read the data and uncompressed it
+    dataPage = COMPRESSOR.unCompress(fileReader
+        .readByteArray(filePath, dimensionColumnChunk.get(blockIndex).getDataPageOffset(),
+            dimensionColumnChunk.get(blockIndex).getDataPageLength()));
+    // if row id block is present then read the row id chunk and uncompress it
+    if (CarbonUtil.hasEncoding(dimensionColumnChunk.get(blockIndex).getEncodingList(),
+        Encoding.INVERTED_INDEX)) {
+      invertedIndexes = CarbonUtil
+          .getUnCompressColumnIndex(dimensionColumnChunk.get(blockIndex).getRowIdPageLength(),
+              fileReader.readByteArray(filePath,
+                  dimensionColumnChunk.get(blockIndex).getRowIdPageOffset(),
+                  dimensionColumnChunk.get(blockIndex).getRowIdPageLength()), numberComressor);
+      // get the reverse index
+      invertedIndexesReverse = getInvertedReverseIndex(invertedIndexes);
+    }
+    // if rle is applied then read the rle block chunk and then uncompress
+    //then actual data based on rle block
+    if (CarbonUtil
+        .hasEncoding(dimensionColumnChunk.get(blockIndex).getEncodingList(), Encoding.RLE)) {
+      // read and uncompress the rle block
+      rlePage = numberComressor.unCompress(fileReader
+          .readByteArray(filePath, dimensionColumnChunk.get(blockIndex).getRlePageOffset(),
+              dimensionColumnChunk.get(blockIndex).getRlePageLength()));
+      // uncompress the data with rle indexes
+      dataPage = UnBlockIndexer.uncompressData(dataPage, rlePage, eachColumnValueSize[blockIndex]);
+      rlePage = null;
+    }
+    // fill chunk attributes
+    DimensionChunkAttributes chunkAttributes = new DimensionChunkAttributes();
+    chunkAttributes.setEachRowSize(eachColumnValueSize[blockIndex]);
+    chunkAttributes.setInvertedIndexes(invertedIndexes);
+    chunkAttributes.setInvertedIndexesReverse(invertedIndexesReverse);
+    DimensionColumnDataChunk columnDataChunk = null;
+
+    if (dimensionColumnChunk.get(blockIndex).isRowMajor()) {
+      // to store fixed length column chunk values
+      columnDataChunk = new ColumnGroupDimensionDataChunk(dataPage, chunkAttributes);
+    }
+    // if no dictionary column then first create a no dictionary column chunk
+    // and set to data chunk instance
+    else if (!CarbonUtil
+        .hasEncoding(dimensionColumnChunk.get(blockIndex).getEncodingList(), Encoding.DICTIONARY)) {
+      columnDataChunk =
+          new VariableLengthDimensionDataChunk(getNoDictionaryDataChunk(dataPage), chunkAttributes);
+      chunkAttributes.setNoDictionary(true);
+    } else {
+      // to store fixed length column chunk values
+      columnDataChunk = new FixedLengthDimensionDataChunk(dataPage, chunkAttributes);
+    }
+    return columnDataChunk;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
new file mode 100644
index 0000000..af83514
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.carbon.datastore.chunk.reader.dimension.v2;
+
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.datastore.chunk.DimensionChunkAttributes;
+import org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.carbon.datastore.chunk.impl.ColumnGroupDimensionDataChunk;
+import org.apache.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import org.apache.carbondata.core.carbon.datastore.chunk.impl.VariableLengthDimensionDataChunk;
+import org.apache.carbondata.core.carbon.datastore.chunk.reader.dimension.AbstractChunkReader;
+import org.apache.carbondata.core.carbon.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.datastorage.store.FileHolder;
+import org.apache.carbondata.core.datastorage.store.columnar.UnBlockIndexer;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.DataChunk2;
+import org.apache.carbondata.format.Encoding;
+
+/**
+ * Compressed dimension chunk reader class for version 2
+ */
+public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkReader {
+
+  /**
+   * dimension chunks offset
+   */
+  private List<Long> dimensionChunksOffset;
+
+  /**
+   * dimension chunks length
+   */
+  private List<Short> dimensionChunksLength;
+
+  /**
+   * Constructor to get minimum parameter to create instance of this class
+   *
+   * @param blockletInfo
+   * @param eachColumnValueSize
+   * @param filePath
+   */
+  public CompressedDimensionChunkFileBasedReaderV2(final BlockletInfo blockletInfo,
+      final int[] eachColumnValueSize, final String filePath) {
+    super(eachColumnValueSize, filePath);
+    this.dimensionChunksOffset = blockletInfo.getDimensionChunkOffsets();
+    this.dimensionChunksLength = blockletInfo.getDimensionChunksLength();
+
+  }
+
+  /**
+   * Below method will be used to read the chunk based on block indexes
+   * Reading logic of below method is:
+   * Except last column all the column chunk can be read in group
+   * if not last column then read data of all the column present in block index
+   * together then process it.
+   * For last column read is separately and process
+   *
+   * @param fileReader   file reader to read the blocks from file
+   * @param blockIndexes blocks range to be read
+   * @return dimension column chunks
+   */
+  @Override public DimensionColumnDataChunk[] readDimensionChunks(final FileHolder fileReader,
+      final int[][] blockIndexes) {
+    // read the column chunk based on block index and add
+    DimensionColumnDataChunk[] dataChunks =
+        new DimensionColumnDataChunk[dimensionChunksOffset.size()];
+    // if blocklet index is empty then return empry data chunk
+    if (blockIndexes.length == 0) {
+      return dataChunks;
+    }
+    DimensionColumnDataChunk[] groupChunk = null;
+    int index = 0;
+    // iterate till block indexes -1 as block index will be in sorted order, so to avoid
+    // the last column reading in group
+    for (int i = 0; i < blockIndexes.length - 1; i++) {
+      index = 0;
+      groupChunk = readDimensionChunksInGroup(fileReader, blockIndexes[i][0], blockIndexes[i][1]);
+      for (int j = blockIndexes[i][0]; j <= blockIndexes[i][1]; j++) {
+        dataChunks[j] = groupChunk[index++];
+      }
+    }
+    // check last index is present in block index, if it is present then read separately
+    if (blockIndexes[blockIndexes.length - 1][0] == dimensionChunksOffset.size() - 1) {
+      dataChunks[blockIndexes[blockIndexes.length - 1][0]] =
+          readDimensionChunk(fileReader, blockIndexes[blockIndexes.length - 1][0]);
+    }
+    // otherwise read the data in group
+    else {
+      groupChunk = readDimensionChunksInGroup(fileReader, blockIndexes[blockIndexes.length - 1][0],
+          blockIndexes[blockIndexes.length - 1][1]);
+      index = 0;
+      for (int j = blockIndexes[blockIndexes.length - 1][0];
+           j <= blockIndexes[blockIndexes.length - 1][1]; j++) {
+        dataChunks[j] = groupChunk[index++];
+      }
+    }
+    return dataChunks;
+  }
+
+  /**
+   * Below method will be used to read the chunk based on block index
+   *
+   * @param fileReader file reader to read the blocks from file
+   * @param blockIndex block to be read
+   * @return dimension column chunk
+   */
+  @Override public DimensionColumnDataChunk readDimensionChunk(FileHolder fileReader,
+      int blockIndex) {
+    byte[] dataPage = null;
+    int[] invertedIndexes = null;
+    int[] invertedIndexesReverse = null;
+    int[] rlePage = null;
+    DataChunk2 dimensionColumnChunk = null;
+    byte[] data = null;
+    int copySourcePoint = 0;
+    byte[] dimensionChunk = null;
+    if (dimensionChunksOffset.size() - 1 == blockIndex) {
+      dimensionChunk = fileReader.readByteArray(filePath, dimensionChunksOffset.get(blockIndex),
+          dimensionChunksLength.get(blockIndex));
+      dimensionColumnChunk = CarbonUtil.readDataChunk(dimensionChunk);
+      int totalDimensionDataLength =
+          dimensionColumnChunk.data_page_length + dimensionColumnChunk.rle_page_length
+              + dimensionColumnChunk.rowid_page_length;
+      data = fileReader.readByteArray(filePath,
+          dimensionChunksOffset.get(blockIndex) + dimensionChunksLength.get(blockIndex),
+          totalDimensionDataLength);
+    } else {
+      long currentDimensionOffset = dimensionChunksOffset.get(blockIndex);
+      data = fileReader.readByteArray(filePath, currentDimensionOffset,
+          (int) (dimensionChunksOffset.get(blockIndex + 1) - currentDimensionOffset));
+      dimensionChunk = new byte[dimensionChunksLength.get(blockIndex)];
+      System.arraycopy(data, copySourcePoint, dimensionChunk, 0,
+          dimensionChunksLength.get(blockIndex));
+      dimensionColumnChunk = CarbonUtil.readDataChunk(dimensionChunk);
+      copySourcePoint += dimensionChunksLength.get(blockIndex);
+    }
+
+    byte[] compressedDataPage = new byte[dimensionColumnChunk.data_page_length];
+    System.arraycopy(data, copySourcePoint, compressedDataPage, 0,
+        dimensionColumnChunk.data_page_length);
+    copySourcePoint += dimensionColumnChunk.data_page_length;
+    // first read the data and uncompressed it
+    dataPage = COMPRESSOR.unCompress(compressedDataPage);
+    // if row id block is present then read the row id chunk and uncompress it
+    if (hasEncoding(dimensionColumnChunk.encoders, Encoding.INVERTED_INDEX)) {
+      byte[] compressedIndexPage = new byte[dimensionColumnChunk.rowid_page_length];
+      System.arraycopy(data, copySourcePoint, compressedIndexPage, 0,
+          dimensionColumnChunk.rowid_page_length);
+      copySourcePoint += dimensionColumnChunk.rowid_page_length;
+      invertedIndexes = CarbonUtil
+          .getUnCompressColumnIndex(dimensionColumnChunk.rowid_page_length, compressedIndexPage,
+              numberComressor);
+      // get the reverse index
+      invertedIndexesReverse = getInvertedReverseIndex(invertedIndexes);
+    }
+    // if rle is applied then read the rle block chunk and then uncompress
+    //then actual data based on rle block
+    if (hasEncoding(dimensionColumnChunk.encoders, Encoding.RLE)) {
+      // read and uncompress the rle block
+      byte[] compressedRLEPage = new byte[dimensionColumnChunk.rle_page_length];
+      System.arraycopy(data, copySourcePoint, compressedRLEPage, 0,
+          dimensionColumnChunk.rle_page_length);
+      rlePage = numberComressor.unCompress(compressedRLEPage);
+      // uncompress the data with rle indexes
+      dataPage = UnBlockIndexer.uncompressData(dataPage, rlePage, eachColumnValueSize[blockIndex]);
+      rlePage = null;
+    }
+    // fill chunk attributes
+    DimensionChunkAttributes chunkAttributes = new DimensionChunkAttributes();
+    chunkAttributes.setEachRowSize(eachColumnValueSize[blockIndex]);
+    chunkAttributes.setInvertedIndexes(invertedIndexes);
+    chunkAttributes.setInvertedIndexesReverse(invertedIndexesReverse);
+    DimensionColumnDataChunk columnDataChunk = null;
+
+    if (dimensionColumnChunk.isRowMajor()) {
+      // to store fixed length column chunk values
+      columnDataChunk = new ColumnGroupDimensionDataChunk(dataPage, chunkAttributes);
+    }
+    // if no dictionary column then first create a no dictionary column chunk
+    // and set to data chunk instance
+    else if (!hasEncoding(dimensionColumnChunk.encoders, Encoding.DICTIONARY)) {
+      columnDataChunk =
+          new VariableLengthDimensionDataChunk(getNoDictionaryDataChunk(dataPage), chunkAttributes);
+      chunkAttributes.setNoDictionary(true);
+    } else {
+      // to store fixed length column chunk values
+      columnDataChunk = new FixedLengthDimensionDataChunk(dataPage, chunkAttributes);
+    }
+    return columnDataChunk;
+  }
+
+  /**
+   * Below method will be used to read the dimension chunks in group.
+   * This is to enhance the IO performance. Will read the data from start index
+   * to end index(including)
+   *
+   * @param fileReader      stream used for reading
+   * @param startBlockIndex start block index
+   * @param endBlockIndex   end block index
+   * @return dimension column chunk array
+   */
+  private DimensionColumnDataChunk[] readDimensionChunksInGroup(FileHolder fileReader,
+      int startBlockIndex, int endBlockIndex) {
+    long currentDimensionOffset = dimensionChunksOffset.get(startBlockIndex);
+    byte[] data = fileReader.readByteArray(filePath, currentDimensionOffset,
+        (int) (dimensionChunksOffset.get(endBlockIndex + 1) - currentDimensionOffset));
+    int copySourcePoint = 0;
+    // read the column chunk based on block index and add
+    DimensionColumnDataChunk[] dataChunks =
+        new DimensionColumnDataChunk[endBlockIndex - startBlockIndex + 1];
+    byte[] dataPage = null;
+    int[] invertedIndexes = null;
+    int[] invertedIndexesReverse = null;
+    int[] rlePage = null;
+    byte[] dimensionChunk = null;
+    DataChunk2 dimensionColumnChunk = null;
+    int index = 0;
+    for (int i = startBlockIndex; i <= endBlockIndex; i++) {
+      invertedIndexes = null;
+      invertedIndexesReverse = null;
+      dimensionChunk = new byte[dimensionChunksLength.get(i)];
+      System.arraycopy(data, copySourcePoint, dimensionChunk, 0, dimensionChunksLength.get(i));
+      dimensionColumnChunk = CarbonUtil.readDataChunk(dimensionChunk);
+      copySourcePoint += dimensionChunksLength.get(i);
+      byte[] compressedDataPage = new byte[dimensionColumnChunk.data_page_length];
+      System.arraycopy(data, copySourcePoint, compressedDataPage, 0,
+          dimensionColumnChunk.data_page_length);
+      copySourcePoint += dimensionColumnChunk.data_page_length;
+      // first read the data and uncompressed it
+      dataPage = COMPRESSOR.unCompress(compressedDataPage);
+      // if row id block is present then read the row id chunk and uncompress it
+      if (hasEncoding(dimensionColumnChunk.encoders, Encoding.INVERTED_INDEX)) {
+        byte[] compressedIndexPage = new byte[dimensionColumnChunk.rowid_page_length];
+        System.arraycopy(data, copySourcePoint, compressedIndexPage, 0,
+            dimensionColumnChunk.rowid_page_length);
+        copySourcePoint += dimensionColumnChunk.rowid_page_length;
+        invertedIndexes = CarbonUtil
+            .getUnCompressColumnIndex(dimensionColumnChunk.rowid_page_length, compressedIndexPage,
+                numberComressor);
+        // get the reverse index
+        invertedIndexesReverse = getInvertedReverseIndex(invertedIndexes);
+      }
+      // if rle is applied then read the rle block chunk and then uncompress
+      //then actual data based on rle block
+      if (hasEncoding(dimensionColumnChunk.encoders, Encoding.RLE)) {
+        // read and uncompress the rle block
+        byte[] compressedRLEPage = new byte[dimensionColumnChunk.rle_page_length];
+        System.arraycopy(data, copySourcePoint, compressedRLEPage, 0,
+            dimensionColumnChunk.rle_page_length);
+        copySourcePoint += dimensionColumnChunk.rle_page_length;
+        rlePage = numberComressor.unCompress(compressedRLEPage);
+        // uncompress the data with rle indexes
+        dataPage = UnBlockIndexer.uncompressData(dataPage, rlePage, eachColumnValueSize[i]);
+        rlePage = null;
+      }
+      // fill chunk attributes
+      DimensionChunkAttributes chunkAttributes = new DimensionChunkAttributes();
+      chunkAttributes.setEachRowSize(eachColumnValueSize[i]);
+      chunkAttributes.setInvertedIndexes(invertedIndexes);
+      chunkAttributes.setInvertedIndexesReverse(invertedIndexesReverse);
+      DimensionColumnDataChunk columnDataChunk = null;
+      if (dimensionColumnChunk.isRowMajor()) {
+        // to store fixed length column chunk values
+        columnDataChunk = new ColumnGroupDimensionDataChunk(dataPage, chunkAttributes);
+      }
+      // if no dictionary column then first create a no dictionary column chunk
+      // and set to data chunk instance
+      else if (!hasEncoding(dimensionColumnChunk.encoders, Encoding.DICTIONARY)) {
+        columnDataChunk = new VariableLengthDimensionDataChunk(getNoDictionaryDataChunk(dataPage),
+            chunkAttributes);
+        chunkAttributes.setNoDictionary(true);
+      } else {
+        // to store fixed length column chunk values
+        columnDataChunk = new FixedLengthDimensionDataChunk(dataPage, chunkAttributes);
+      }
+      dataChunks[index++] = columnDataChunk;
+    }
+    return dataChunks;
+  }
+
+  /**
+   * Below method will be used to check whether particular encoding is present
+   * in the dimension or not
+   *
+   * @param encoding encoding to search
+   * @return if encoding is present in dimension
+   */
+  private boolean hasEncoding(List<Encoding> encodings, Encoding encoding) {
+    return encodings.contains(encoding);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
index dc8771f..6c74379 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
@@ -18,13 +18,7 @@
  */
 package org.apache.carbondata.core.carbon.datastore.chunk.reader.measure;
 
-import java.util.List;
-
 import org.apache.carbondata.core.carbon.datastore.chunk.reader.MeasureColumnChunkReader;
-import org.apache.carbondata.core.carbon.metadata.blocklet.datachunk.DataChunk;
-import org.apache.carbondata.core.datastorage.store.compression.ValueCompressionModel;
-import org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
-import org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder.UnCompressValue;
 
 /**
  * Measure block reader abstract class
@@ -32,44 +26,19 @@ import org.apache.carbondata.core.datastorage.store.compression.ValueCompressonH
 public abstract class AbstractMeasureChunkReader implements MeasureColumnChunkReader {
 
   /**
-   * metadata which was to used to compress and uncompress the measure value
-   */
-  protected ValueCompressionModel compressionModel;
-
-  /**
    * file path from which blocks will be read
    */
   protected String filePath;
 
   /**
-   * measure chunk have the information about the metadata present in the file
-   */
-  protected List<DataChunk> measureColumnChunk;
-
-  /**
-   * type of valu comprssion model selected for each measure column
-   */
-  protected UnCompressValue[] values;
-
-  /**
    * Constructor to get minimum parameter to create instance of this class
    *
    * @param measureColumnChunk measure chunk metadata
    * @param compression        model metadata which was to used to compress and uncompress
    *                           the measure value
    * @param filePath           file from which data will be read
-   * @param isInMemory         in case of in memory it will read and holds the data and when
-   *                           query request will come it will uncompress and the data
    */
-  public AbstractMeasureChunkReader(List<DataChunk> measureColumnChunk,
-      ValueCompressionModel compressionModel, String filePath, boolean isInMemory) {
-    this.measureColumnChunk = measureColumnChunk;
-    this.compressionModel = compressionModel;
+  public AbstractMeasureChunkReader(String filePath) {
     this.filePath = filePath;
-    values =
-        new ValueCompressonHolder.UnCompressValue[compressionModel.getUnCompressValues().length];
-    for (int i = 0; i < values.length; i++) {
-      values[i] = compressionModel.getUnCompressValues()[i].getNew().getCompressorObject();
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/CompressedMeasureChunkFileBasedReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/CompressedMeasureChunkFileBasedReader.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/CompressedMeasureChunkFileBasedReader.java
deleted file mode 100644
index 31c470d..0000000
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/CompressedMeasureChunkFileBasedReader.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.core.carbon.datastore.chunk.reader.measure;
-
-import java.util.List;
-
-import org.apache.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
-import org.apache.carbondata.core.carbon.metadata.blocklet.datachunk.DataChunk;
-import org.apache.carbondata.core.datastorage.store.FileHolder;
-import org.apache.carbondata.core.datastorage.store.compression.ValueCompressionModel;
-import org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
-import org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
-
-/**
- * Compressed measure chunk reader
- */
-public class CompressedMeasureChunkFileBasedReader extends AbstractMeasureChunkReader {
-
-  /**
-   * Constructor to get minimum parameter to create instance of this class
-   *
-   * @param measureColumnChunk measure chunk metadata
-   * @param compression        model metadata which was to used to compress and uncompress
-   *                           the measure value
-   * @param filePath           file from which data will be read
-   */
-  public CompressedMeasureChunkFileBasedReader(List<DataChunk> measureColumnChunk,
-      ValueCompressionModel compressionModel, String filePath) {
-    super(measureColumnChunk, compressionModel, filePath, false);
-  }
-
-  /**
-   * Method to read the blocks data based on block indexes
-   *
-   * @param fileReader   file reader to read the blocks
-   * @param blockIndexes blocks to be read
-   * @return measure data chunks
-   */
-  @Override public MeasureColumnDataChunk[] readMeasureChunks(FileHolder fileReader,
-      int... blockIndexes) {
-    MeasureColumnDataChunk[] datChunk = new MeasureColumnDataChunk[values.length];
-    for (int i = 0; i < blockIndexes.length; i++) {
-      datChunk[blockIndexes[i]] = readMeasureChunk(fileReader, blockIndexes[i]);
-    }
-    return datChunk;
-  }
-
-  /**
-   * Method to read the blocks data based on block index
-   *
-   * @param fileReader file reader to read the blocks
-   * @param blockIndex block to be read
-   * @return measure data chunk
-   */
-  @Override public MeasureColumnDataChunk readMeasureChunk(FileHolder fileReader, int blockIndex) {
-    MeasureColumnDataChunk datChunk = new MeasureColumnDataChunk();
-    // create a new uncompressor
-    ValueCompressonHolder.UnCompressValue copy = values[blockIndex].getNew();
-    // read data from file and set to uncompressor
-    copy.setValue(fileReader
-        .readByteArray(filePath, measureColumnChunk.get(blockIndex).getDataPageOffset(),
-            measureColumnChunk.get(blockIndex).getDataPageLength()));
-    // get the data holder after uncompressing
-    CarbonReadDataHolder measureDataHolder =
-        copy.uncompress(compressionModel.getChangedDataType()[blockIndex])
-            .getValues(compressionModel.getDecimal()[blockIndex],
-                compressionModel.getMaxValue()[blockIndex]);
-    // set the data chunk
-    datChunk.setMeasureDataHolder(measureDataHolder);
-    // set the enun value indexes
-    datChunk
-        .setNullValueIndexHolder(measureColumnChunk.get(blockIndex).getNullValueIndexForColumn());
-    return datChunk;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
new file mode 100644
index 0000000..d989852
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.carbon.datastore.chunk.reader.measure.v1;
+
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.carbon.datastore.chunk.reader.measure.AbstractMeasureChunkReader;
+import org.apache.carbondata.core.carbon.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.carbon.metadata.blocklet.datachunk.DataChunk;
+import org.apache.carbondata.core.datastorage.store.FileHolder;
+import org.apache.carbondata.core.datastorage.store.compression.ValueCompressionModel;
+import org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder.UnCompressValue;
+import org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+/**
+ * Compressed measure chunk reader
+ */
+public class CompressedMeasureChunkFileBasedReaderV1 extends AbstractMeasureChunkReader {
+
+  /**
+   * measure chunk have the information about the metadata present in the file
+   */
+  private final List<DataChunk> measureColumnChunks;
+
+  /**
+   * Constructor to get minimum parameter to create instance of this class
+   *
+   * @param blockletInfo BlockletInfo
+   * @param filePath     file from which data will be read
+   */
+  public CompressedMeasureChunkFileBasedReaderV1(final BlockletInfo blockletInfo,
+      final String filePath) {
+    super(filePath);
+    this.measureColumnChunks = blockletInfo.getMeasureColumnChunk();
+  }
+
+  /**
+   * Method to read the blocks data based on block indexes
+   *
+   * @param fileReader   file reader to read the blocks
+   * @param blockIndexes blocks to be read
+   * @return measure data chunks
+   */
+  @Override public MeasureColumnDataChunk[] readMeasureChunks(final FileHolder fileReader,
+      final int[][] blockIndexes) {
+    MeasureColumnDataChunk[] datChunk = new MeasureColumnDataChunk[measureColumnChunks.size()];
+    for (int i = 0; i < blockIndexes.length; i++) {
+      for (int j = blockIndexes[i][0]; j <= blockIndexes[i][1]; j++) {
+        datChunk[j] = readMeasureChunk(fileReader, j);
+      }
+    }
+    return datChunk;
+  }
+
+  /**
+   * Method to read the blocks data based on block index
+   *
+   * @param fileReader file reader to read the blocks
+   * @param blockIndex block to be read
+   * @return measure data chunk
+   */
+  @Override public MeasureColumnDataChunk readMeasureChunk(final FileHolder fileReader,
+      final int blockIndex) {
+    MeasureColumnDataChunk datChunk = new MeasureColumnDataChunk();
+    // create a new uncompressor
+    final ValueCompressionModel compressionModel = CarbonUtil
+        .getValueCompressionModel(measureColumnChunks.get(blockIndex).getValueEncoderMeta());
+    UnCompressValue values =
+        compressionModel.getUnCompressValues()[0].getNew().getCompressorObject();
+    // create a new uncompressor
+    // read data from file and set to uncompressor
+    // read data from file and set to uncompressor
+    values.setValue(fileReader
+        .readByteArray(filePath, measureColumnChunks.get(blockIndex).getDataPageOffset(),
+            measureColumnChunks.get(blockIndex).getDataPageLength()));
+    // get the data holder after uncompressing
+    CarbonReadDataHolder measureDataHolder =
+        values.uncompress(compressionModel.getChangedDataType()[0])
+            .getValues(compressionModel.getDecimal()[0], compressionModel.getMaxValue()[0]);
+    // set the data chunk
+    datChunk.setMeasureDataHolder(measureDataHolder);
+    // set the enun value indexes
+    datChunk
+        .setNullValueIndexHolder(measureColumnChunks.get(blockIndex).getNullValueIndexForColumn());
+    return datChunk;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
new file mode 100644
index 0000000..71b7c8f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.carbon.datastore.chunk.reader.measure.v2;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.carbon.datastore.chunk.reader.measure.AbstractMeasureChunkReader;
+import org.apache.carbondata.core.carbon.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.carbon.metadata.blocklet.datachunk.PresenceMeta;
+import org.apache.carbondata.core.datastorage.store.FileHolder;
+import org.apache.carbondata.core.datastorage.store.compression.SnappyCompression.SnappyByteCompression;
+import org.apache.carbondata.core.datastorage.store.compression.ValueCompressionModel;
+import org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder.UnCompressValue;
+import org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
+import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.DataChunk2;
+
+/**
+ * Class to read the measure column data for version 2
+ */
+public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChunkReader {
+
+  /**
+   * measure column chunks offset
+   */
+  private List<Long> measureColumnChunkOffsets;
+
+  /**
+   * measure column chunks length
+   */
+  private List<Short> measureColumnChunkLength;
+
+  /**
+   * Constructor to get minimum parameter to create instance of this class
+   *
+   * @param blockletInfo BlockletInfo
+   * @param filePath     file from which data will be read
+   */
+  public CompressedMeasureChunkFileBasedReaderV2(final BlockletInfo blockletInfo,
+      final String filePath) {
+    super(filePath);
+    this.measureColumnChunkOffsets = blockletInfo.getMeasureChunkOffsets();
+    this.measureColumnChunkLength = blockletInfo.getMeasureChunksLength();
+  }
+
+  /**
+   * Below method will be used to convert the thrift presence meta to wrapper
+   * presence meta
+   *
+   * @param presentMetadataThrift
+   * @return wrapper presence meta
+   */
+  private static PresenceMeta getPresenceMeta(
+      org.apache.carbondata.format.PresenceMeta presentMetadataThrift) {
+    PresenceMeta presenceMeta = new PresenceMeta();
+    presenceMeta.setRepresentNullValues(presentMetadataThrift.isRepresents_presence());
+    presenceMeta.setBitSet(BitSet.valueOf(
+        SnappyByteCompression.INSTANCE.unCompress(presentMetadataThrift.getPresent_bit_stream())));
+    return presenceMeta;
+  }
+
+  /**
+   * Below method will be used to read the chunk based on block indexes
+   * Reading logic of below method is:
+   * Except last column all the column chunk can be read in group
+   * if not last column then read data of all the column present in block index
+   * together then process it.
+   * For last column read is separately and process
+   *
+   * @param fileReader   file reader to read the blocks from file
+   * @param blockIndexes blocks range to be read
+   * @return measure column chunks
+   */
+  public MeasureColumnDataChunk[] readMeasureChunks(FileHolder fileReader, int[][] blockIndexes) {
+    // read the column chunk based on block index and add
+    MeasureColumnDataChunk[] dataChunks =
+        new MeasureColumnDataChunk[measureColumnChunkOffsets.size()];
+    if (blockIndexes.length == 0) {
+      return dataChunks;
+    }
+    MeasureColumnDataChunk[] groupChunk = null;
+    int index = 0;
+    for (int i = 0; i < blockIndexes.length - 1; i++) {
+      index = 0;
+      groupChunk = readMeasureChunksInGroup(fileReader, blockIndexes[i][0], blockIndexes[i][1]);
+      for (int j = blockIndexes[i][0]; j <= blockIndexes[i][1]; j++) {
+        dataChunks[j] = groupChunk[index++];
+      }
+    }
+    if (blockIndexes[blockIndexes.length - 1][0] == measureColumnChunkOffsets.size() - 1) {
+      dataChunks[blockIndexes[blockIndexes.length - 1][0]] =
+          readMeasureChunk(fileReader, blockIndexes[blockIndexes.length - 1][0]);
+    } else {
+      groupChunk = readMeasureChunksInGroup(fileReader, blockIndexes[blockIndexes.length - 1][0],
+          blockIndexes[blockIndexes.length - 1][1]);
+      index = 0;
+      for (int j = blockIndexes[blockIndexes.length - 1][0];
+           j <= blockIndexes[blockIndexes.length - 1][1]; j++) {
+        dataChunks[j] = groupChunk[index++];
+      }
+    }
+    return dataChunks;
+  }
+
+  /**
+   * Method to read the blocks data based on block index
+   *
+   * @param fileReader file reader to read the blocks
+   * @param blockIndex block to be read
+   * @return measure data chunk
+   */
+  @Override public MeasureColumnDataChunk readMeasureChunk(FileHolder fileReader, int blockIndex) {
+    MeasureColumnDataChunk datChunk = new MeasureColumnDataChunk();
+    DataChunk2 measureColumnChunk = null;
+    byte[] measureDataChunk = null;
+    byte[] data = null;
+    byte[] dataPage = null;
+    if (measureColumnChunkOffsets.size() - 1 == blockIndex) {
+      measureDataChunk = fileReader
+          .readByteArray(filePath, measureColumnChunkOffsets.get(blockIndex),
+              measureColumnChunkLength.get(blockIndex));
+      measureColumnChunk = CarbonUtil.readDataChunk(measureDataChunk);
+      dataPage = fileReader.readByteArray(filePath,
+          measureColumnChunkOffsets.get(blockIndex) + measureColumnChunkLength.get(blockIndex),
+          measureColumnChunk.data_page_length);
+    } else {
+      long currentMeasureOffset = measureColumnChunkOffsets.get(blockIndex);
+      data = fileReader.readByteArray(filePath, currentMeasureOffset,
+          (int) (measureColumnChunkOffsets.get(blockIndex + 1) - currentMeasureOffset));
+      measureDataChunk = new byte[measureColumnChunkLength.get(blockIndex)];
+      System.arraycopy(data, 0, measureDataChunk, 0, measureColumnChunkLength.get(blockIndex));
+      measureColumnChunk = CarbonUtil.readDataChunk(measureDataChunk);
+      dataPage = new byte[measureColumnChunk.data_page_length];
+      System.arraycopy(data, measureColumnChunkLength.get(blockIndex), dataPage, 0,
+          measureColumnChunk.data_page_length);
+    }
+    List<ValueEncoderMeta> valueEncodeMeta = new ArrayList<>();
+    for (int i = 0; i < measureColumnChunk.getEncoder_meta().size(); i++) {
+      valueEncodeMeta.add(
+          CarbonUtil.deserializeEncoderMeta(measureColumnChunk.getEncoder_meta().get(i).array()));
+    }
+    ValueCompressionModel compressionModel = CarbonUtil.getValueCompressionModel(valueEncodeMeta);
+    UnCompressValue values =
+        compressionModel.getUnCompressValues()[0].getNew().getCompressorObject();
+    // create a new uncompressor
+    // read data from file and set to uncompressor
+    values.setValue(dataPage);
+    // get the data holder after uncompressing
+    CarbonReadDataHolder measureDataHolder =
+        values.uncompress(compressionModel.getChangedDataType()[0])
+            .getValues(compressionModel.getDecimal()[0], compressionModel.getMaxValue()[0]);
+    // set the data chunk
+    datChunk.setMeasureDataHolder(measureDataHolder);
+    // set the enun value indexes
+    datChunk.setNullValueIndexHolder(getPresenceMeta(measureColumnChunk.presence));
+    return datChunk;
+  }
+
+  /**
+   * Below method will be used to read the dimension chunks in group.
+   * This is to enhance the IO performance. Will read the data from start index
+   * to end index(including)
+   *
+   * @param fileReader      stream used for reading
+   * @param startBlockIndex start block index
+   * @param endBlockIndex   end block index
+   * @return measure column chunk array
+   */
+  private MeasureColumnDataChunk[] readMeasureChunksInGroup(FileHolder fileReader,
+      int startBlockIndex, int endBlockIndex) {
+    long currentMeasureOffset = measureColumnChunkOffsets.get(startBlockIndex);
+    byte[] data = fileReader.readByteArray(filePath, currentMeasureOffset,
+        (int) (measureColumnChunkOffsets.get(endBlockIndex + 1) - currentMeasureOffset));
+    MeasureColumnDataChunk[] dataChunks =
+        new MeasureColumnDataChunk[endBlockIndex - startBlockIndex + 1];
+    MeasureColumnDataChunk dataChunk = new MeasureColumnDataChunk();
+    int index = 0;
+    int copyPoint = 0;
+    byte[] measureDataChunk = null;
+    byte[] dataPage = null;
+    DataChunk2 measureColumnChunk = null;
+    for (int i = startBlockIndex; i <= endBlockIndex; i++) {
+      dataChunk = new MeasureColumnDataChunk();
+      measureDataChunk = new byte[measureColumnChunkLength.get(i)];
+      System.arraycopy(data, copyPoint, measureDataChunk, 0, measureColumnChunkLength.get(i));
+      measureColumnChunk = CarbonUtil.readDataChunk(measureDataChunk);
+      dataPage = new byte[measureColumnChunk.data_page_length];
+      copyPoint += measureColumnChunkLength.get(i);
+      System.arraycopy(data, copyPoint, dataPage, 0, measureColumnChunk.data_page_length);
+      copyPoint += measureColumnChunk.data_page_length;
+      List<ValueEncoderMeta> valueEncodeMeta = new ArrayList<>();
+      for (int j = 0; j < measureColumnChunk.getEncoder_meta().size(); j++) {
+        valueEncodeMeta.add(
+            CarbonUtil.deserializeEncoderMeta(measureColumnChunk.getEncoder_meta().get(j).array()));
+      }
+      ValueCompressionModel compressionModel = CarbonUtil.getValueCompressionModel(valueEncodeMeta);
+      UnCompressValue values =
+          compressionModel.getUnCompressValues()[0].getNew().getCompressorObject();
+      // create a new uncompressor
+      // read data from file and set to uncompressor
+      values.setValue(dataPage);
+      // get the data holder after uncompressing
+      CarbonReadDataHolder measureDataHolder =
+          values.uncompress(compressionModel.getChangedDataType()[0])
+              .getValues(compressionModel.getDecimal()[0], compressionModel.getMaxValue()[0]);
+      // set the data chunk
+      dataChunk.setMeasureDataHolder(measureDataHolder);
+      // set the enun value indexes
+      dataChunk.setNullValueIndexHolder(getPresenceMeta(measureColumnChunk.presence));
+      dataChunks[index++] = dataChunk;
+    }
+    return dataChunks;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/AbstractBTreeLeafNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/AbstractBTreeLeafNode.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/AbstractBTreeLeafNode.java
index de476ad..bff3286 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/AbstractBTreeLeafNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/AbstractBTreeLeafNode.java
@@ -172,7 +172,7 @@ public abstract class AbstractBTreeLeafNode implements BTreeNode {
    * @return dimension data chunks
    */
   @Override public DimensionColumnDataChunk[] getDimensionChunks(FileHolder fileReader,
-      int[] blockIndexes) {
+      int[][] blockIndexes) {
     // No required here as leaf which will will be use this class will implement its own get
     // dimension chunks
     return null;
@@ -200,7 +200,7 @@ public abstract class AbstractBTreeLeafNode implements BTreeNode {
    * @return measure column data chunk
    */
   @Override public MeasureColumnDataChunk[] getMeasureChunks(FileHolder fileReader,
-      int[] blockIndexes) {
+      int[][] blockIndexes) {
     // No required here as leaf which will will be use this class will implement its own get
     // measure chunks
     return null;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/BTreeNonLeafNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/BTreeNonLeafNode.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/BTreeNonLeafNode.java
index cfbe06d..c558763 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/BTreeNonLeafNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/BTreeNonLeafNode.java
@@ -169,7 +169,7 @@ public class BTreeNonLeafNode implements BTreeNode {
    * @return dimension data chunks
    */
   @Override public DimensionColumnDataChunk[] getDimensionChunks(FileHolder fileReader,
-      int[] blockIndexes) {
+      int[][] blockIndexes) {
 
     // operation of getting the dimension chunks is not supported as its a
     // non leaf node
@@ -204,7 +204,7 @@ public class BTreeNonLeafNode implements BTreeNode {
    * @return measure column data chunk
    */
   @Override public MeasureColumnDataChunk[] getMeasureChunks(FileHolder fileReader,
-      int[] blockIndexes) {
+      int[][] blockIndexes) {
     // operation of getting the measure chunk is not supported as its a non
     // leaf node
     // and in case of B+Tree data will be stored only in leaf node and

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/BlockletBTreeLeafNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/BlockletBTreeLeafNode.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/BlockletBTreeLeafNode.java
index 4293610..79ee008 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/BlockletBTreeLeafNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/impl/btree/BlockletBTreeLeafNode.java
@@ -21,14 +21,11 @@ package org.apache.carbondata.core.carbon.datastore.impl.btree;
 import org.apache.carbondata.core.carbon.datastore.BTreeBuilderInfo;
 import org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.carbon.datastore.chunk.reader.CarbonDataReaderFactory;
 import org.apache.carbondata.core.carbon.datastore.chunk.reader.DimensionColumnChunkReader;
 import org.apache.carbondata.core.carbon.datastore.chunk.reader.MeasureColumnChunkReader;
-import org.apache.carbondata.core.carbon.datastore.chunk.reader.dimension.CompressedDimensionChunkFileBasedReader;
-import org.apache.carbondata.core.carbon.datastore.chunk.reader.measure.CompressedMeasureChunkFileBasedReader;
 import org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletMinMaxIndex;
 import org.apache.carbondata.core.datastorage.store.FileHolder;
-import org.apache.carbondata.core.datastorage.store.compression.ValueCompressionModel;
-import org.apache.carbondata.core.util.CarbonUtil;
 
 /**
  * Leaf node class of a Blocklet btree
@@ -57,28 +54,25 @@ public class BlockletBTreeLeafNode extends AbstractBTreeLeafNode {
   public BlockletBTreeLeafNode(BTreeBuilderInfo builderInfos, int leafIndex, long nodeNumber) {
     // get a lead node min max
     BlockletMinMaxIndex minMaxIndex =
-        builderInfos.getFooterList().get(0).getBlockletList().get(leafIndex)
-            .getBlockletIndex().getMinMaxIndex();
+        builderInfos.getFooterList().get(0).getBlockletList().get(leafIndex).getBlockletIndex()
+            .getMinMaxIndex();
     // max key of the columns
     maxKeyOfColumns = minMaxIndex.getMaxValues();
     // min keys of the columns
     minKeyOfColumns = minMaxIndex.getMinValues();
     // number of keys present in the leaf
-    numberOfKeys = builderInfos.getFooterList().get(0).getBlockletList().get(leafIndex)
-        .getNumberOfRows();
+    numberOfKeys =
+        builderInfos.getFooterList().get(0).getBlockletList().get(leafIndex).getNumberOfRows();
     // create a instance of dimension chunk
-    dimensionChunksReader = new CompressedDimensionChunkFileBasedReader(
-        builderInfos.getFooterList().get(0).getBlockletList().get(leafIndex)
-            .getDimensionColumnChunk(), builderInfos.getDimensionColumnValueSize(),
-        builderInfos.getFooterList().get(0).getBlockInfo().getTableBlockInfo().getFilePath());
-    // get the value compression model which was used to compress the measure values
-    ValueCompressionModel valueCompressionModel = CarbonUtil.getValueCompressionModel(
-        builderInfos.getFooterList().get(0).getBlockletList().get(leafIndex)
-            .getMeasureColumnChunk());
+    dimensionChunksReader = CarbonDataReaderFactory.getInstance()
+        .getDimensionColumnChunkReader(builderInfos.getFooterList().get(0).getVersionId(),
+            builderInfos.getFooterList().get(0).getBlockletList().get(leafIndex),
+            builderInfos.getDimensionColumnValueSize(),
+            builderInfos.getFooterList().get(0).getBlockInfo().getTableBlockInfo().getFilePath());
     // create a instance of measure column chunk reader
-    measureColumnChunkReader = new CompressedMeasureChunkFileBasedReader(
-        builderInfos.getFooterList().get(0).getBlockletList().get(leafIndex)
-            .getMeasureColumnChunk(), valueCompressionModel,
+    measureColumnChunkReader = CarbonDataReaderFactory.getInstance()
+        .getMeasureColumnChunkReader(builderInfos.getFooterList().get(0).getVersionId(),
+            builderInfos.getFooterList().get(0).getBlockletList().get(leafIndex),
             builderInfos.getFooterList().get(0).getBlockInfo().getTableBlockInfo().getFilePath());
     this.nodeNumber = nodeNumber;
   }
@@ -91,7 +85,7 @@ public class BlockletBTreeLeafNode extends AbstractBTreeLeafNode {
    * @return dimension data chunks
    */
   @Override public DimensionColumnDataChunk[] getDimensionChunks(FileHolder fileReader,
-      int[] blockIndexes) {
+      int[][] blockIndexes) {
     return dimensionChunksReader.readDimensionChunks(fileReader, blockIndexes);
   }
 
@@ -115,7 +109,7 @@ public class BlockletBTreeLeafNode extends AbstractBTreeLeafNode {
    * @return measure column data chunk
    */
   @Override public MeasureColumnDataChunk[] getMeasureChunks(FileHolder fileReader,
-      int[] blockIndexes) {
+      int[][] blockIndexes) {
     return measureColumnChunkReader.readMeasureChunks(fileReader, blockIndexes);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/carbon/metadata/blocklet/BlockletInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/blocklet/BlockletInfo.java b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/blocklet/BlockletInfo.java
index b2c72aa..314f7e2 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/blocklet/BlockletInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/blocklet/BlockletInfo.java
@@ -50,6 +50,14 @@ public class BlockletInfo implements Serializable {
    */
   private List<DataChunk> measureColumnChunk;
 
+  private List<Long> dimensionChunkOffsets;
+
+  private List<Short> dimensionChunksLength;
+
+  private List<Long> measureChunkOffsets;
+
+  private List<Short> measureChunksLength;
+
   /**
    * to store the index like min max and start and end key of each column of the blocklet
    */
@@ -111,4 +119,36 @@ public class BlockletInfo implements Serializable {
     this.blockletIndex = blockletIndex;
   }
 
+  public List<Long> getDimensionChunkOffsets() {
+    return dimensionChunkOffsets;
+  }
+
+  public void setDimensionChunkOffsets(List<Long> dimensionChunkOffsets) {
+    this.dimensionChunkOffsets = dimensionChunkOffsets;
+  }
+
+  public List<Short> getDimensionChunksLength() {
+    return dimensionChunksLength;
+  }
+
+  public void setDimensionChunksLength(List<Short> dimensionChunksLength) {
+    this.dimensionChunksLength = dimensionChunksLength;
+  }
+
+  public List<Long> getMeasureChunkOffsets() {
+    return measureChunkOffsets;
+  }
+
+  public void setMeasureChunkOffsets(List<Long> measureChunkOffsets) {
+    this.measureChunkOffsets = measureChunkOffsets;
+  }
+
+  public List<Short> getMeasureChunksLength() {
+    return measureChunksLength;
+  }
+
+  public void setMeasureChunksLength(List<Short> measureChunksLength) {
+    this.measureChunksLength = measureChunksLength;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/carbon/metadata/blocklet/DataFileFooter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/blocklet/DataFileFooter.java b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/blocklet/DataFileFooter.java
index d4741eb..be235ba 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/blocklet/DataFileFooter.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/blocklet/DataFileFooter.java
@@ -38,7 +38,7 @@ public class DataFileFooter implements Serializable {
   /**
    * version used for data compatibility
    */
-  private int versionId;
+  private short versionId;
 
   /**
    * total number of rows in this file
@@ -73,14 +73,14 @@ public class DataFileFooter implements Serializable {
   /**
    * @return the versionId
    */
-  public int getVersionId() {
+  public short getVersionId() {
     return versionId;
   }
 
   /**
    * @param versionId the versionId to set
    */
-  public void setVersionId(int versionId) {
+  public void setVersionId(short versionId) {
     this.versionId = versionId;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index e217d5d..a9e0442 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -893,7 +893,23 @@ public final class CarbonCommonConstants {
    * Default size of data load batch size.
    */
   public static final String DATA_LOAD_BATCH_SIZE_DEFAULT = "1000";
-
+  /**
+   * carbon data file version property
+   */
+  public static final String CARBON_DATA_FILE_VERSION = "carbon.data.file.version";
+  /**
+   * current data file version
+   */
+  public static final short CARBON_DATA_FILE_DEFAULT_VERSION = 2;
+  /**
+   * number of column data will read in IO operation
+   * during query execution
+   */
+  public static final short NUMBER_OF_COLUMN_READ_IN_IO = 10;
+  /**
+   * data file version header
+   */
+  public static final String CARBON_DATA_VERSION_HEADER = "CARBONDATAVERSION#";
   private CarbonCommonConstants() {
   }
 }