You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/02/26 14:54:31 UTC

[3/4] incubator-carbondata git commit: Added V3 Format Writer and Reader Code

Added V3 Format Writer and Reader Code

Added code to support V3 Writer + Reader


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/2cf1104d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/2cf1104d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/2cf1104d

Branch: refs/heads/master
Commit: 2cf1104db43a5591fe2bcabb97ba02202428132a
Parents: 922683e
Author: kumarvishal <ku...@gmail.com>
Authored: Thu Feb 23 16:44:41 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Sun Feb 26 22:42:32 2017 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  38 +-
 .../constants/CarbonV3DataFormatConstants.java  |  84 ++++
 .../datastore/chunk/AbstractRawColumnChunk.java |  11 +
 .../chunk/reader/CarbonDataReaderFactory.java   |  14 +-
 .../AbstractChunkReaderV2V3Format.java          | 126 +++++
 ...mpressedDimensionChunkFileBasedReaderV2.java | 127 ++---
 ...mpressedDimensionChunkFileBasedReaderV3.java | 268 ++++++++++
 .../AbstractMeasureChunkReaderV2V3Format.java   | 124 +++++
 ...CompressedMeasureChunkFileBasedReaderV2.java | 106 +---
 ...CompressedMeasureChunkFileBasedReaderV3.java | 239 +++++++++
 .../SafeFixedLengthDimensionDataChunkStore.java |   9 +-
 .../columnar/BlockIndexerStorageForShort.java   | 228 +++++++++
 .../columnar/ColumnWithShortIndex.java          |  76 +++
 .../ColumnWithShortIndexForNoDictionay.java     |  46 ++
 .../core/metadata/ColumnarFormatVersion.java    |   9 +-
 .../executor/impl/AbstractQueryExecutor.java    |  10 +-
 .../IncludeColGroupFilterExecuterImpl.java      |  24 +
 ...velRangeLessThanEqualFilterExecuterImpl.java |  14 +-
 .../RowLevelRangeLessThanFiterExecuterImpl.java |  14 +-
 .../carbondata/core/util/BitSetGroup.java       |   6 +-
 .../core/util/CarbonMetadataUtil.java           | 347 ++++++++++++-
 .../carbondata/core/util/CarbonProperties.java  | 110 +++-
 .../apache/carbondata/core/util/CarbonUtil.java | 110 ++++
 .../util/DataFileFooterConverterFactory.java    |   5 +-
 .../core/util/DataFileFooterConverterV3.java    | 141 ++++++
 .../apache/carbondata/core/util/NodeHolder.java | 430 ++++++++++++++++
 .../core/util/CarbonMetadataUtilTest.java       |   2 +-
 format/src/main/thrift/carbondata.thrift        |  27 +-
 .../store/CarbonDataWriterFactory.java          |   7 +-
 .../store/CarbonFactDataHandlerColumnar.java    |  56 ++-
 .../store/writer/AbstractFactDataWriter.java    |  54 +-
 .../store/writer/CarbonFactDataWriter.java      |   1 +
 .../processing/store/writer/NodeHolder.java     | 410 ---------------
 .../writer/v1/CarbonFactDataWriterImplV1.java   |   9 +-
 .../writer/v2/CarbonFactDataWriterImplV2.java   |   8 +-
 .../writer/v3/CarbonFactDataWriterImplV3.java   | 499 +++++++++++++++++++
 .../store/writer/v3/DataWriterHolder.java       |  68 +++
 37 files changed, 3114 insertions(+), 743 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 1142c4e..146b78e 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -83,7 +83,7 @@ public final class CarbonCommonConstants {
   /**
    * min blocklet size
    */
-  public static final int BLOCKLET_SIZE_MIN_VAL = 50;
+  public static final int BLOCKLET_SIZE_MIN_VAL = 2000;
   /**
    * max blocklet size
    */
@@ -791,7 +791,7 @@ public final class CarbonCommonConstants {
   public static final String CARBON_MERGE_SORT_PREFETCH_DEFAULT = "true";
 
   /**
-   *  default name of data base
+   * default name of data base
    */
   public static final String DATABASE_DEFAULT_NAME = "default";
 
@@ -808,8 +808,7 @@ public final class CarbonCommonConstants {
   /**
    * this variable is to enable/disable identify high cardinality during first data loading
    */
-  public static final String HIGH_CARDINALITY_IDENTIFY_ENABLE =
-      "high.cardinality.identify.enable";
+  public static final String HIGH_CARDINALITY_IDENTIFY_ENABLE = "high.cardinality.identify.enable";
   public static final String HIGH_CARDINALITY_IDENTIFY_ENABLE_DEFAULT = "true";
 
   /**
@@ -843,26 +842,23 @@ public final class CarbonCommonConstants {
   /**
    * ZOOKEEPERLOCK TYPE
    */
-  public static final String CARBON_LOCK_TYPE_ZOOKEEPER =
-      "ZOOKEEPERLOCK";
+  public static final String CARBON_LOCK_TYPE_ZOOKEEPER = "ZOOKEEPERLOCK";
 
   /**
    * LOCALLOCK TYPE
    */
-  public static final String CARBON_LOCK_TYPE_LOCAL =
-      "LOCALLOCK";
+  public static final String CARBON_LOCK_TYPE_LOCAL = "LOCALLOCK";
 
   /**
    * HDFSLOCK TYPE
    */
-  public static final String CARBON_LOCK_TYPE_HDFS =
-      "HDFSLOCK";
+  public static final String CARBON_LOCK_TYPE_HDFS = "HDFSLOCK";
 
   /**
    * Invalid filter member log string
    */
-  public static final String FILTER_INVALID_MEMBER = " Invalid Record(s) are present "
-                                                     + "while filter evaluation. ";
+  public static final String FILTER_INVALID_MEMBER =
+      " Invalid Record(s) are present while filter evaluation. ";
 
   /**
    * Number of unmerged segments to be merged.
@@ -880,25 +876,23 @@ public final class CarbonCommonConstants {
    * Only accepted Range is 0 - 10000. Outside this range system will pick default value.
    */
   public static final String UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION =
-          "carbon.horizontal.update.compaction.threshold";
+      "carbon.horizontal.update.compaction.threshold";
   /**
    * Default count of segments which act as a threshold for IUD compaction merge.
    */
   public static final String DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION = "1";
 
-
   /**
    * Number of Delete Delta files which is the Threshold for IUD compaction.
    * Only accepted Range is 0 - 10000. Outside this range system will pick default value.
    */
-  public static final String DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION  =
+  public static final String DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION =
       "carbon.horizontal.delete.compaction.threshold";
   /**
    * Default count of segments which act as a threshold for IUD compaction merge.
    */
   public static final String DEFAULT_DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION = "1";
 
-
   /**
    * default location of the carbon metastore db
    */
@@ -943,8 +937,7 @@ public final class CarbonCommonConstants {
    * @Deprecated : This property has been deprecated.
    * Property for enabling system level compaction lock.1 compaction can run at once.
    */
-  public static String ENABLE_CONCURRENT_COMPACTION =
-      "carbon.concurrent.compaction";
+  public static String ENABLE_CONCURRENT_COMPACTION = "carbon.concurrent.compaction";
 
   /**
    * Default value of Property for enabling system level compaction lock.1 compaction can run
@@ -1024,12 +1017,8 @@ public final class CarbonCommonConstants {
   /**
    * current data file version
    */
-  public static final String CARBON_DATA_FILE_DEFAULT_VERSION = "V2";
-  /**
-   * number of column data will read in IO operation
-   * during query execution
-   */
-  public static final short NUMBER_OF_COLUMN_READ_IN_IO = 10;
+  public static final String CARBON_DATA_FILE_DEFAULT_VERSION = "V3";
+
   /**
    * data file version header
    */
@@ -1105,7 +1094,6 @@ public final class CarbonCommonConstants {
 
   /**
    * Default carbon dictionary server port
-
    */
   public static final String DICTIONARY_SERVER_PORT_DEFAULT = "2030";
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/constants/CarbonV3DataFormatConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonV3DataFormatConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonV3DataFormatConstants.java
new file mode 100644
index 0000000..060b55c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonV3DataFormatConstants.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.constants;
+
+/**
+ * Constants for V3 data format
+ */
+public interface CarbonV3DataFormatConstants {
+
+  /**
+   * number of page per blocklet column
+   */
+  String NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN = "carbon.number.of.page.in.blocklet.column";
+
+  /**
+   * number of page per blocklet column default value
+   */
+  String NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE = "10";
+
+  /**
+   * number of page per blocklet column max value
+   */
+  short NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_MAX = 20;
+
+  /**
+   * number of page per blocklet column min value
+   */
+  short NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_MIN = 1;
+
+  /**
+   * number of column to be read in one IO in query
+   */
+  String NUMBER_OF_COLUMN_TO_READ_IN_IO = "number.of.column.to.read.in.io";
+
+  /**
+   * number of column to be read in one IO in query default value
+   */
+  String NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE = "10";
+
+  /**
+   * number of column to be read in one IO in query max value
+   */
+  short NUMBER_OF_COLUMN_TO_READ_IN_IO_MAX = 20;
+
+  /**
+   * number of column to be read in one IO in query min value
+   */
+  short NUMBER_OF_COLUMN_TO_READ_IN_IO_MIN = 1;
+
+  /**
+   * number of rows per blocklet column page
+   */
+  String NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE = "number.of.rows.per.blocklet.column.page";
+
+  /**
+   * number of rows per blocklet column page default value
+   */
+  String NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT = "32000";
+
+  /**
+   * number of rows per blocklet column page max value
+   */
+  short NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_MAX = 32000;
+
+  /**
+   * number of rows per blocklet column page min value
+   */
+  short NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_MIN = 8000;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
index d04077c..eebb382 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.core.datastore.chunk;
 
 import java.nio.ByteBuffer;
 
+import org.apache.carbondata.format.DataChunk3;
 
 /**
  * It contains group of uncompressed blocklets on one column.
@@ -44,6 +45,8 @@ public abstract class AbstractRawColumnChunk {
 
   protected int length;
 
+  protected DataChunk3 dataChunkV3;
+
   public AbstractRawColumnChunk(int blockletId, ByteBuffer rawData, int offSet, int length) {
     this.blockletId = blockletId;
     this.rawData = rawData;
@@ -121,4 +124,12 @@ public abstract class AbstractRawColumnChunk {
     return length;
   }
 
+  public DataChunk3 getDataChunkV3() {
+    return dataChunkV3;
+  }
+
+  public void setDataChunkV3(DataChunk3 dataChunkV3) {
+    this.dataChunkV3 = dataChunkV3;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/CarbonDataReaderFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/CarbonDataReaderFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/CarbonDataReaderFactory.java
index e20fcbe..8fee760 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/CarbonDataReaderFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/CarbonDataReaderFactory.java
@@ -18,8 +18,10 @@ package org.apache.carbondata.core.datastore.chunk.reader;
 
 import org.apache.carbondata.core.datastore.chunk.reader.dimension.v1.CompressedDimensionChunkFileBasedReaderV1;
 import org.apache.carbondata.core.datastore.chunk.reader.dimension.v2.CompressedDimensionChunkFileBasedReaderV2;
+import org.apache.carbondata.core.datastore.chunk.reader.dimension.v3.CompressedDimensionChunkFileBasedReaderV3;
 import org.apache.carbondata.core.datastore.chunk.reader.measure.v1.CompressedMeasureChunkFileBasedReaderV1;
 import org.apache.carbondata.core.datastore.chunk.reader.measure.v2.CompressedMeasureChunkFileBasedReaderV2;
+import org.apache.carbondata.core.datastore.chunk.reader.measure.v3.CompressedMeasureChunkFileBasedReaderV3;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 
@@ -65,9 +67,13 @@ public class CarbonDataReaderFactory {
       case V1:
         return new CompressedDimensionChunkFileBasedReaderV1(blockletInfo, eachColumnValueSize,
             filePath);
-      default:
+      case V2:
         return new CompressedDimensionChunkFileBasedReaderV2(blockletInfo, eachColumnValueSize,
             filePath);
+      case V3:
+      default:
+        return new CompressedDimensionChunkFileBasedReaderV3(blockletInfo, eachColumnValueSize,
+            filePath);
     }
   }
 
@@ -84,8 +90,12 @@ public class CarbonDataReaderFactory {
     switch (version) {
       case V1:
         return new CompressedMeasureChunkFileBasedReaderV1(blockletInfo, filePath);
-      default:
+      case V2:
         return new CompressedMeasureChunkFileBasedReaderV2(blockletInfo, filePath);
+      case V3:
+      default:
+        return new CompressedMeasureChunkFileBasedReaderV3(blockletInfo, filePath);
+
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReaderV2V3Format.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReaderV2V3Format.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReaderV2V3Format.java
new file mode 100644
index 0000000..f083612
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReaderV2V3Format.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.reader.dimension;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.format.Encoding;
+
+/**
+ * Abstract class for V2, V3 format dimension column reader
+ */
+public abstract class AbstractChunkReaderV2V3Format extends AbstractChunkReader {
+
+  /**
+   * dimension chunks offset
+   */
+  protected List<Long> dimensionChunksOffset;
+
+  /**
+   * dimension chunks length
+   */
+  protected List<Integer> dimensionChunksLength;
+
+  public AbstractChunkReaderV2V3Format(final BlockletInfo blockletInfo,
+      final int[] eachColumnValueSize, final String filePath) {
+    super(eachColumnValueSize, filePath, blockletInfo.getNumberOfRows());
+    dimensionChunksOffset = blockletInfo.getDimensionChunkOffsets();
+    dimensionChunksLength = blockletInfo.getDimensionChunksLength();
+  }
+
+  /**
+   * Below method will be used to read the chunk based on block indexes
+   * Reading logic of below method is:
+   * Except last column all the column chunk can be read in group
+   * if not last column then read data of all the column present in block index
+   * together then process it.
+   * For last column read is separately and process
+   *
+   * @param fileReader      file reader to read the blocks from file
+   * @param blockletIndexes blocks range to be read
+   * @return dimension column chunks
+   */
+  @Override public DimensionRawColumnChunk[] readRawDimensionChunks(final FileHolder fileReader,
+      final int[][] blockletIndexes) throws IOException {
+    // read the column chunk based on block index and add
+    DimensionRawColumnChunk[] dataChunks =
+        new DimensionRawColumnChunk[dimensionChunksOffset.size()];
+    // if blocklet index is empty then return empry data chunk
+    if (blockletIndexes.length == 0) {
+      return dataChunks;
+    }
+    DimensionRawColumnChunk[] groupChunk = null;
+    int index = 0;
+    // iterate till block indexes -1 as block index will be in sorted order, so to avoid
+    // the last column reading in group
+    for (int i = 0; i < blockletIndexes.length - 1; i++) {
+      index = 0;
+      groupChunk =
+          readRawDimensionChunksInGroup(fileReader, blockletIndexes[i][0], blockletIndexes[i][1]);
+      for (int j = blockletIndexes[i][0]; j <= blockletIndexes[i][1]; j++) {
+        dataChunks[j] = groupChunk[index++];
+      }
+    }
+    // check last index is present in block index, if it is present then read separately
+    if (blockletIndexes[blockletIndexes.length - 1][0] == dimensionChunksOffset.size() - 1) {
+      dataChunks[blockletIndexes[blockletIndexes.length - 1][0]] =
+          readRawDimensionChunk(fileReader, blockletIndexes[blockletIndexes.length - 1][0]);
+    }
+    // otherwise read the data in group
+    else {
+      groupChunk =
+          readRawDimensionChunksInGroup(fileReader, blockletIndexes[blockletIndexes.length - 1][0],
+              blockletIndexes[blockletIndexes.length - 1][1]);
+      index = 0;
+      for (int j = blockletIndexes[blockletIndexes.length - 1][0];
+           j <= blockletIndexes[blockletIndexes.length - 1][1]; j++) {
+        dataChunks[j] = groupChunk[index++];
+      }
+    }
+    return dataChunks;
+  }
+
+  /**
+   * Below method will be used to read measure chunk data in group.
+   * This method will be useful to avoid multiple IO while reading the
+   * data from
+   *
+   * @param fileReader               file reader to read the data
+   * @param startColumnBlockletIndex first column blocklet index to be read
+   * @param endColumnBlockletIndex   end column blocklet index to be read
+   * @return measure raw chunkArray
+   * @throws IOException
+   */
+  protected abstract DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileHolder fileReader,
+      int startColumnBlockletIndex, int endColumnBlockletIndex) throws IOException;
+
+  /**
+   * Below method will be used to check whether particular encoding is present
+   * in the dimension or not
+   *
+   * @param encoding encoding to search
+   * @return if encoding is present in dimension
+   */
+  protected boolean hasEncoding(List<Encoding> encodings, Encoding encoding) {
+    return encodings.contains(encoding);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
index 9d5849f..b2201cd 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
@@ -18,7 +18,6 @@ package org.apache.carbondata.core.datastore.chunk.reader.dimension.v2;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.List;
 
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
@@ -26,7 +25,7 @@ import org.apache.carbondata.core.datastore.chunk.impl.ColumnGroupDimensionDataC
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionDataChunk;
-import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReader;
+import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReaderV2V3Format;
 import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -36,17 +35,7 @@ import org.apache.carbondata.format.Encoding;
 /**
  * Compressed dimension chunk reader class for version 2
  */
-public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkReader {
-
-  /**
-   * dimension chunks offset
-   */
-  private List<Long> dimensionChunksOffset;
-
-  /**
-   * dimension chunks length
-   */
-  private List<Integer> dimensionChunksLength;
+public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkReaderV2V3Format {
 
   /**
    * Constructor to get minimum parameter to create instance of this class
@@ -57,73 +46,18 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
    */
   public CompressedDimensionChunkFileBasedReaderV2(final BlockletInfo blockletInfo,
       final int[] eachColumnValueSize, final String filePath) {
-    super(eachColumnValueSize, filePath, blockletInfo.getNumberOfRows());
-    this.dimensionChunksOffset = blockletInfo.getDimensionChunkOffsets();
-    this.dimensionChunksLength = blockletInfo.getDimensionChunksLength();
-
-  }
-
-  /**
-   * Below method will be used to read the chunk based on block indexes
-   * Reading logic of below method is:
-   * Except last column all the column chunk can be read in group
-   * if not last column then read data of all the column present in block index
-   * together then process it.
-   * For last column read is separately and process
-   *
-   * @param fileReader   file reader to read the blocks from file
-   * @param blockletIndexes blocks range to be read
-   * @return dimension column chunks
-   */
-  @Override public DimensionRawColumnChunk[] readRawDimensionChunks(final FileHolder fileReader,
-      final int[][] blockletIndexes) throws IOException {
-    // read the column chunk based on block index and add
-    DimensionRawColumnChunk[] dataChunks =
-        new DimensionRawColumnChunk[dimensionChunksOffset.size()];
-    // if blocklet index is empty then return empry data chunk
-    if (blockletIndexes.length == 0) {
-      return dataChunks;
-    }
-    DimensionRawColumnChunk[] groupChunk = null;
-    int index = 0;
-    // iterate till block indexes -1 as block index will be in sorted order, so to avoid
-    // the last column reading in group
-    for (int i = 0; i < blockletIndexes.length - 1; i++) {
-      index = 0;
-      groupChunk =
-          readRawDimensionChunksInGroup(fileReader, blockletIndexes[i][0], blockletIndexes[i][1]);
-      for (int j = blockletIndexes[i][0]; j <= blockletIndexes[i][1]; j++) {
-        dataChunks[j] = groupChunk[index++];
-      }
-    }
-    // check last index is present in block index, if it is present then read separately
-    if (blockletIndexes[blockletIndexes.length - 1][0] == dimensionChunksOffset.size() - 1) {
-      dataChunks[blockletIndexes[blockletIndexes.length - 1][0]] =
-          readRawDimensionChunk(fileReader, blockletIndexes[blockletIndexes.length - 1][0]);
-    }
-    // otherwise read the data in group
-    else {
-      groupChunk =
-          readRawDimensionChunksInGroup(fileReader, blockletIndexes[blockletIndexes.length - 1][0],
-              blockletIndexes[blockletIndexes.length - 1][1]);
-      index = 0;
-      for (int j = blockletIndexes[blockletIndexes.length - 1][0];
-           j <= blockletIndexes[blockletIndexes.length - 1][1]; j++) {
-        dataChunks[j] = groupChunk[index++];
-      }
-    }
-    return dataChunks;
+    super(blockletInfo, eachColumnValueSize, filePath);
   }
 
   /**
    * Below method will be used to read the chunk based on block index
    *
-   * @param fileReader file reader to read the blocks from file
+   * @param fileReader    file reader to read the blocks from file
    * @param blockletIndex block to be read
    * @return dimension column chunk
    */
-  public DimensionRawColumnChunk readRawDimensionChunk(FileHolder fileReader,
-      int blockletIndex) throws IOException {
+  public DimensionRawColumnChunk readRawDimensionChunk(FileHolder fileReader, int blockletIndex)
+      throws IOException {
     int length = 0;
     if (dimensionChunksOffset.size() - 1 == blockletIndex) {
       // Incase of last block read only for datachunk and read remaining while converting it.
@@ -140,24 +74,35 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
         new DimensionRawColumnChunk(blockletIndex, buffer, 0, length, this);
     rawColumnChunk.setFileHolder(fileReader);
     rawColumnChunk.setPagesCount(1);
-    rawColumnChunk.setRowCount(new int[]{numberOfRows});
+    rawColumnChunk.setRowCount(new int[] { numberOfRows });
     return rawColumnChunk;
   }
 
-  private DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileHolder fileReader,
-      int startBlockIndex, int endBlockIndex) throws IOException {
-    long currentDimensionOffset = dimensionChunksOffset.get(startBlockIndex);
+  /**
+   * Below method will be used to read measure chunk data in group.
+   * This method will be useful to avoid multiple IO while reading the
+   * data from
+   *
+   * @param fileReader               file reader to read the data
+   * @param startColumnBlockletIndex first column blocklet index to be read
+   * @param endColumnBlockletIndex   end column blocklet index to be read
+   * @return measure raw chunkArray
+   * @throws IOException
+   */
+  protected DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileHolder fileReader,
+      int startColumnBlockletIndex, int endColumnBlockletIndex) throws IOException {
+    long currentDimensionOffset = dimensionChunksOffset.get(startColumnBlockletIndex);
     ByteBuffer buffer = ByteBuffer.allocateDirect(
-        (int) (dimensionChunksOffset.get(endBlockIndex + 1) - currentDimensionOffset));
+        (int) (dimensionChunksOffset.get(endColumnBlockletIndex + 1) - currentDimensionOffset));
     synchronized (fileReader) {
       fileReader.readByteBuffer(filePath, buffer, currentDimensionOffset,
-          (int) (dimensionChunksOffset.get(endBlockIndex + 1) - currentDimensionOffset));
+          (int) (dimensionChunksOffset.get(endColumnBlockletIndex + 1) - currentDimensionOffset));
     }
     DimensionRawColumnChunk[] dataChunks =
-        new DimensionRawColumnChunk[endBlockIndex - startBlockIndex + 1];
+        new DimensionRawColumnChunk[endColumnBlockletIndex - startColumnBlockletIndex + 1];
     int index = 0;
     int runningLength = 0;
-    for (int i = startBlockIndex; i <= endBlockIndex; i++) {
+    for (int i = startColumnBlockletIndex; i <= endColumnBlockletIndex; i++) {
       int currentLength = (int) (dimensionChunksOffset.get(i + 1) - dimensionChunksOffset.get(i));
       dataChunks[index] =
           new DimensionRawColumnChunk(i, buffer, runningLength, currentLength, this);
@@ -181,8 +126,8 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
     int blockIndex = dimensionRawColumnChunk.getBlockletId();
     ByteBuffer rawData = dimensionRawColumnChunk.getRawData();
     if (dimensionChunksOffset.size() - 1 == blockIndex) {
-      dimensionColumnChunk = CarbonUtil
-          .readDataChunk(rawData, copySourcePoint, dimensionRawColumnChunk.getLength());
+      dimensionColumnChunk =
+          CarbonUtil.readDataChunk(rawData, copySourcePoint, dimensionRawColumnChunk.getLength());
       int totalDimensionDataLength =
           dimensionColumnChunk.data_page_length + dimensionColumnChunk.rle_page_length
               + dimensionColumnChunk.rowid_page_length;
@@ -202,8 +147,7 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
     rawData.position(copySourcePoint);
     rawData.get(data);
     // first read the data and uncompressed it
-    dataPage =
-        COMPRESSOR.unCompressByte(data, 0, dimensionColumnChunk.data_page_length);
+    dataPage = COMPRESSOR.unCompressByte(data, 0, dimensionColumnChunk.data_page_length);
     copySourcePoint += dimensionColumnChunk.data_page_length;
     // if row id block is present then read the row id chunk and uncompress it
     if (hasEncoding(dimensionColumnChunk.encoders, Encoding.INVERTED_INDEX)) {
@@ -223,8 +167,7 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
       byte[] dataRle = new byte[dimensionColumnChunk.rle_page_length];
       rawData.position(copySourcePoint);
       rawData.get(dataRle);
-      rlePage =
-          numberComressor.unCompress(dataRle, 0, dimensionColumnChunk.rle_page_length);
+      rlePage = numberComressor.unCompress(dataRle, 0, dimensionColumnChunk.rle_page_length);
       // uncompress the data with rle indexes
       dataPage = UnBlockIndexer.uncompressData(dataPage, rlePage, eachColumnValueSize[blockIndex]);
     }
@@ -250,16 +193,4 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
     }
     return columnDataChunk;
   }
-
-  /**
-   * Below method will be used to check whether particular encoding is present
-   * in the dimension or not
-   *
-   * @param encoding encoding to search
-   * @return if encoding is present in dimension
-   */
-  private boolean hasEncoding(List<Encoding> encodings, Encoding encoding) {
-    return encodings.contains(encoding);
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
new file mode 100644
index 0000000..acaa2fa
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.reader.dimension.v3;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.ColumnGroupDimensionDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionDataChunk;
+import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReaderV2V3Format;
+import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.DataChunk2;
+import org.apache.carbondata.format.DataChunk3;
+import org.apache.carbondata.format.Encoding;
+
+import org.apache.commons.lang.ArrayUtils;
+
+/**
+ * Dimension column V3 Reader class which will be used to read and uncompress
+ * V3 format data
+ * data format
+ * Data Format
+ * <Column1 Data ChunkV3><Column1<Page1><Page2><Page3><Page4>>
+ * <Column2 Data ChunkV3><Column2<Page1><Page2><Page3><Page4>>
+ * <Column3 Data ChunkV3><Column3<Page1><Page2><Page3><Page4>>
+ * <Column4 Data ChunkV3><Column4<Page1><Page2><Page3><Page4>>
+ */
+public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkReaderV2V3Format {
+
+  /**
+   * end position of last dimension in carbon data file
+   */
+  private long lastDimensionOffsets;
+
+  public CompressedDimensionChunkFileBasedReaderV3(BlockletInfo blockletInfo,
+      int[] eachColumnValueSize, String filePath) {
+    super(blockletInfo, eachColumnValueSize, filePath);
+    lastDimensionOffsets = blockletInfo.getDimensionOffset();
+  }
+
+  /**
+   * Below method will be used to read the dimension column data form carbon data file
+   * Steps for reading
+   * 1. Get the length of the data to be read
+   * 2. Allocate the direct buffer
+   * 3. read the data from file
+   * 4. Get the data chunk object from data read
+   * 5. Create the raw chunk object and fill the details
+   *
+   * @param fileReader          reader for reading the column from carbon data file
+   * @param blockletColumnIndex blocklet index of the column in carbon data file
+   * @return dimension raw chunk
+   */
+  public DimensionRawColumnChunk readRawDimensionChunk(FileHolder fileReader,
+      int blockletColumnIndex) throws IOException {
+    // get the current dimension offset
+    long currentDimensionOffset = dimensionChunksOffset.get(blockletColumnIndex);
+    int length = 0;
+    // to calculate the length of the data to be read
+    // column other than last column we can subtract the offset of current column with
+    // next column and get the total length.
+    // but for last column we need to use lastDimensionOffset which is the end position
+    // of the last dimension, we can subtract current dimension offset from lastDimesionOffset
+    if (dimensionChunksOffset.size() - 1 == blockletColumnIndex) {
+      length = (int) (lastDimensionOffsets - currentDimensionOffset);
+    } else {
+      length = (int) (dimensionChunksOffset.get(blockletColumnIndex + 1) - currentDimensionOffset);
+    }
+    // allocate the buffer
+    ByteBuffer buffer = ByteBuffer.allocateDirect(length);
+    // read the data from carbon data file
+    synchronized (fileReader) {
+      fileReader.readByteBuffer(filePath, buffer, currentDimensionOffset, length);
+    }
+    // get the data chunk which will have all the details about the data pages
+    DataChunk3 dataChunk = CarbonUtil.readDataChunk3(buffer, 0, length);
+    // creating a raw chunks instance and filling all the details
+    DimensionRawColumnChunk rawColumnChunk =
+        new DimensionRawColumnChunk(blockletColumnIndex, buffer, 0, length, this);
+    int numberOfPages = dataChunk.getPage_length().size();
+    byte[][] maxValueOfEachPage = new byte[numberOfPages][];
+    byte[][] minValueOfEachPage = new byte[numberOfPages][];
+    int[] eachPageLength = new int[numberOfPages];
+    for (int i = 0; i < minValueOfEachPage.length; i++) {
+      maxValueOfEachPage[i] =
+          dataChunk.getData_chunk_list().get(i).getMin_max().getMax_values().get(0).array();
+      minValueOfEachPage[i] =
+          dataChunk.getData_chunk_list().get(i).getMin_max().getMin_values().get(0).array();
+      eachPageLength[i] = dataChunk.getData_chunk_list().get(i).getNumberOfRowsInpage();
+    }
+    rawColumnChunk.setDataChunkV3(dataChunk);
+    rawColumnChunk.setFileHolder(fileReader);
+    rawColumnChunk.setPagesCount(dataChunk.getPage_length().size());
+    rawColumnChunk.setMaxValues(maxValueOfEachPage);
+    rawColumnChunk.setMinValues(minValueOfEachPage);
+    rawColumnChunk.setRowCount(eachPageLength);
+    rawColumnChunk.setLengths(ArrayUtils
+        .toPrimitive(dataChunk.page_length.toArray(new Integer[dataChunk.page_length.size()])));
+    rawColumnChunk.setOffsets(ArrayUtils
+        .toPrimitive(dataChunk.page_offset.toArray(new Integer[dataChunk.page_offset.size()])));
+    return rawColumnChunk;
+  }
+
+  /**
+   * Below method will be used to read the multiple dimension column data in group
+   * and divide into dimension raw chunk object
+   * Steps for reading
+   * 1. Get the length of the data to be read
+   * 2. Allocate the direct buffer
+   * 3. read the data from file
+   * 4. Get the data chunk object from file for each column
+   * 5. Create the raw chunk object and fill the details for each column
+   * 6. increment the offset of the data
+   *
+   * @param fileReader
+   *        reader which will be used to read the dimension columns data from file
+   * @param startBlockletColumnIndex
+   *        blocklet index of the first dimension column
+   * @param endBlockletColumnIndex
+   *        blocklet index of the last dimension column
+   * @ DimensionRawColumnChunk array
+   */
+  protected DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileHolder fileReader,
+      int startBlockletColumnIndex, int endBlockletColumnIndex) throws IOException {
+    // to calculate the length of the data to be read
+    // column we can subtract the offset of start column offset with
+    // end column+1 offset and get the total length.
+    long currentDimensionOffset = dimensionChunksOffset.get(startBlockletColumnIndex);
+    ByteBuffer buffer = ByteBuffer.allocateDirect(
+        (int) (dimensionChunksOffset.get(endBlockletColumnIndex + 1) - currentDimensionOffset));
+    // read the data from carbon data file
+    synchronized (fileReader) {
+      fileReader.readByteBuffer(filePath, buffer, currentDimensionOffset,
+          (int) (dimensionChunksOffset.get(endBlockletColumnIndex + 1) - currentDimensionOffset));
+    }
+    // create raw chunk for each dimension column
+    DimensionRawColumnChunk[] dimensionDataChunks =
+        new DimensionRawColumnChunk[endBlockletColumnIndex - startBlockletColumnIndex + 1];
+    int index = 0;
+    int runningLength = 0;
+    for (int i = startBlockletColumnIndex; i <= endBlockletColumnIndex; i++) {
+      int currentLength = (int) (dimensionChunksOffset.get(i + 1) - dimensionChunksOffset.get(i));
+      dimensionDataChunks[index] =
+          new DimensionRawColumnChunk(i, buffer, runningLength, currentLength, this);
+      DataChunk3 dataChunk =
+          CarbonUtil.readDataChunk3(buffer, runningLength, dimensionChunksLength.get(i));
+      int numberOfPages = dataChunk.getPage_length().size();
+      byte[][] maxValueOfEachPage = new byte[numberOfPages][];
+      byte[][] minValueOfEachPage = new byte[numberOfPages][];
+      int[] eachPageLength = new int[numberOfPages];
+      for (int j = 0; j < minValueOfEachPage.length; j++) {
+        maxValueOfEachPage[j] =
+            dataChunk.getData_chunk_list().get(j).getMin_max().getMax_values().get(0).array();
+        minValueOfEachPage[j] =
+            dataChunk.getData_chunk_list().get(j).getMin_max().getMin_values().get(0).array();
+        eachPageLength[j] = dataChunk.getData_chunk_list().get(j).getNumberOfRowsInpage();
+      }
+      dimensionDataChunks[index].setDataChunkV3(dataChunk);
+      dimensionDataChunks[index].setFileHolder(fileReader);
+      dimensionDataChunks[index].setPagesCount(dataChunk.getPage_length().size());
+      dimensionDataChunks[index].setMaxValues(maxValueOfEachPage);
+      dimensionDataChunks[index].setMinValues(minValueOfEachPage);
+      dimensionDataChunks[index].setRowCount(eachPageLength);
+      dimensionDataChunks[index].setLengths(ArrayUtils
+          .toPrimitive(dataChunk.page_length.toArray(new Integer[dataChunk.page_length.size()])));
+      dimensionDataChunks[index].setOffsets(ArrayUtils
+          .toPrimitive(dataChunk.page_offset.toArray(new Integer[dataChunk.page_offset.size()])));
+      runningLength += currentLength;
+      index++;
+    }
+    return dimensionDataChunks;
+  }
+
+  /**
+   * Below method will be used to convert the compressed dimension chunk raw data to actual data
+   *
+   * @param dimensionRawColumnChunk dimension raw chunk
+   * @param page                    number
+   * @return DimensionColumnDataChunk
+   */
+  @Override public DimensionColumnDataChunk convertToDimensionChunk(
+      DimensionRawColumnChunk dimensionRawColumnChunk, int pageNumber) throws IOException {
+    byte[] dataPage = null;
+    int[] invertedIndexes = null;
+    int[] invertedIndexesReverse = null;
+    int[] rlePage = null;
+    // data chunk of page
+    DataChunk2 dimensionColumnChunk = null;
+    // data chunk of blocklet column
+    DataChunk3 dataChunk3 = dimensionRawColumnChunk.getDataChunkV3();
+    // get the data buffer
+    ByteBuffer rawData = dimensionRawColumnChunk.getRawData();
+    dimensionColumnChunk = dataChunk3.getData_chunk_list().get(pageNumber);
+    // calculating the start point of data
+    // as buffer can contain multiple column data, start point will be datachunkoffset +
+    // data chunk length + page offset
+    int copySourcePoint = dimensionRawColumnChunk.getOffSet() + dimensionChunksLength
+        .get(dimensionRawColumnChunk.getBlockletId()) + dataChunk3.getPage_offset().get(pageNumber);
+    byte[] data = new byte[dimensionColumnChunk.data_page_length];
+    rawData.position(copySourcePoint);
+    rawData.get(data);
+    // first read the data and uncompressed it
+    dataPage = COMPRESSOR.unCompressByte(data, 0, dimensionColumnChunk.data_page_length);
+    copySourcePoint += dimensionColumnChunk.data_page_length;
+    // if row id block is present then read the row id chunk and uncompress it
+    if (hasEncoding(dimensionColumnChunk.encoders, Encoding.INVERTED_INDEX)) {
+      invertedIndexes = CarbonUtil
+          .getUnCompressColumnIndex(dimensionColumnChunk.rowid_page_length, rawData,
+              copySourcePoint);
+      copySourcePoint += dimensionColumnChunk.rowid_page_length;
+      // get the reverse index
+      invertedIndexesReverse = getInvertedReverseIndex(invertedIndexes);
+    }
+    // if rle is applied then read the rle block chunk and then uncompress
+    //then actual data based on rle block
+    if (hasEncoding(dimensionColumnChunk.encoders, Encoding.RLE)) {
+      rlePage =
+          CarbonUtil.getIntArray(rawData, copySourcePoint, dimensionColumnChunk.rle_page_length);
+      // uncompress the data with rle indexes
+      dataPage = UnBlockIndexer.uncompressData(dataPage, rlePage,
+          eachColumnValueSize[dimensionRawColumnChunk.getBlockletId()]);
+      rlePage = null;
+    }
+    // fill chunk attributes
+    DimensionColumnDataChunk columnDataChunk = null;
+
+    if (dimensionColumnChunk.isRowMajor()) {
+      // to store fixed length column chunk values
+      columnDataChunk = new ColumnGroupDimensionDataChunk(dataPage,
+          eachColumnValueSize[dimensionRawColumnChunk.getBlockletId()],
+          dimensionRawColumnChunk.getRowCount()[pageNumber]);
+    }
+    // if no dictionary column then first create a no dictionary column chunk
+    // and set to data chunk instance
+    else if (!hasEncoding(dimensionColumnChunk.encoders, Encoding.DICTIONARY)) {
+      columnDataChunk =
+          new VariableLengthDimensionDataChunk(dataPage, invertedIndexes, invertedIndexesReverse,
+              dimensionRawColumnChunk.getRowCount()[pageNumber]);
+    } else {
+      // to store fixed length column chunk values
+      columnDataChunk =
+          new FixedLengthDimensionDataChunk(dataPage, invertedIndexes, invertedIndexesReverse,
+              dimensionRawColumnChunk.getRowCount()[pageNumber],
+              eachColumnValueSize[dimensionRawColumnChunk.getBlockletId()]);
+    }
+    return columnDataChunk;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
new file mode 100644
index 0000000..a94d08b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.reader.measure;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.metadata.blocklet.datachunk.PresenceMeta;
+
+/**
+ * Abstract class for V2, V3 format measure column reader
+ */
+public abstract class AbstractMeasureChunkReaderV2V3Format extends AbstractMeasureChunkReader {
+
+  /**
+   * measure column chunks offset
+   */
+  protected List<Long> measureColumnChunkOffsets;
+
+  /**
+   * measure column chunks length
+   */
+  protected List<Integer> measureColumnChunkLength;
+
+  public AbstractMeasureChunkReaderV2V3Format(final BlockletInfo blockletInfo,
+      final String filePath) {
+    super(filePath, blockletInfo.getNumberOfRows());
+    this.measureColumnChunkOffsets = blockletInfo.getMeasureChunkOffsets();
+    this.measureColumnChunkLength = blockletInfo.getMeasureChunksLength();
+  }
+
+  /**
+   * Below method will be used to read the chunk based on block indexes
+   * Reading logic of below method is: Except last column all the column chunk
+   * can be read in group if not last column then read data of all the column
+   * present in block index together then process it. For last column read is
+   * separately and process
+   *
+   * @param fileReader   file reader to read the blocks from file
+   * @param blockIndexes blocks range to be read
+   * @return measure column chunks
+   * @throws IOException
+   */
+  public MeasureRawColumnChunk[] readRawMeasureChunks(FileHolder fileReader, int[][] blockIndexes)
+      throws IOException {
+    // read the column chunk based on block index and add
+    MeasureRawColumnChunk[] dataChunks =
+        new MeasureRawColumnChunk[measureColumnChunkOffsets.size()];
+    if (blockIndexes.length == 0) {
+      return dataChunks;
+    }
+    MeasureRawColumnChunk[] groupChunk = null;
+    int index = 0;
+    for (int i = 0; i < blockIndexes.length - 1; i++) {
+      index = 0;
+      groupChunk = readRawMeasureChunksInGroup(fileReader, blockIndexes[i][0], blockIndexes[i][1]);
+      for (int j = blockIndexes[i][0]; j <= blockIndexes[i][1]; j++) {
+        dataChunks[j] = groupChunk[index++];
+      }
+    }
+    if (blockIndexes[blockIndexes.length - 1][0] == measureColumnChunkOffsets.size() - 1) {
+      dataChunks[blockIndexes[blockIndexes.length - 1][0]] =
+          readRawMeasureChunk(fileReader, blockIndexes[blockIndexes.length - 1][0]);
+    } else {
+      groupChunk = readRawMeasureChunksInGroup(fileReader, blockIndexes[blockIndexes.length - 1][0],
+          blockIndexes[blockIndexes.length - 1][1]);
+      index = 0;
+      for (int j = blockIndexes[blockIndexes.length - 1][0];
+           j <= blockIndexes[blockIndexes.length - 1][1]; j++) {
+        dataChunks[j] = groupChunk[index++];
+      }
+    }
+    return dataChunks;
+  }
+
+  /**
+   * Below method will be used to convert the thrift presence meta to wrapper
+   * presence meta
+   *
+   * @param presentMetadataThrift
+   * @return wrapper presence meta
+   */
+  protected PresenceMeta getPresenceMeta(
+      org.apache.carbondata.format.PresenceMeta presentMetadataThrift) {
+    PresenceMeta presenceMeta = new PresenceMeta();
+    presenceMeta.setRepresentNullValues(presentMetadataThrift.isRepresents_presence());
+    presenceMeta.setBitSet(BitSet.valueOf(CompressorFactory.getInstance().getCompressor()
+        .unCompressByte(presentMetadataThrift.getPresent_bit_stream())));
+    return presenceMeta;
+  }
+
+  /**
+   * Below method will be used to read measure chunk data in group.
+   * This method will be useful to avoid multiple IO while reading the
+   * data from
+   *
+   * @param fileReader               file reader to read the data
+   * @param startColumnBlockletIndex first column blocklet index to be read
+   * @param endColumnBlockletIndex   end column blocklet index to be read
+   * @return measure raw chunkArray
+   * @throws IOException
+   */
+  protected abstract MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileHolder fileReader,
+      int startColumnBlockletIndex, int endColumnBlockletIndex) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
index 7ac1578..7b6acee 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
@@ -19,37 +19,24 @@ package org.apache.carbondata.core.datastore.chunk.reader.measure.v2;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.BitSet;
 import java.util.List;
 
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
-import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReader;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReaderV2V3Format;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
 import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
 import org.apache.carbondata.core.datastore.dataholder.CarbonReadDataHolder;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
-import org.apache.carbondata.core.metadata.blocklet.datachunk.PresenceMeta;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.DataChunk2;
 
 /**
  * Class to read the measure column data for version 2
  */
-public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChunkReader {
-
-  /**
-   * measure column chunks offset
-   */
-  private List<Long> measureColumnChunkOffsets;
-
-  /**
-   * measure column chunks length
-   */
-  private List<Integer> measureColumnChunkLength;
+public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChunkReaderV2V3Format {
 
   /**
    * Constructor to get minimum parameter to create instance of this class
@@ -59,69 +46,7 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
    */
   public CompressedMeasureChunkFileBasedReaderV2(final BlockletInfo blockletInfo,
       final String filePath) {
-    super(filePath, blockletInfo.getNumberOfRows());
-    this.measureColumnChunkOffsets = blockletInfo.getMeasureChunkOffsets();
-    this.measureColumnChunkLength = blockletInfo.getMeasureChunksLength();
-  }
-
-  /**
-   * Below method will be used to convert the thrift presence meta to wrapper
-   * presence meta
-   *
-   * @param presentMetadataThrift
-   * @return wrapper presence meta
-   */
-  private static PresenceMeta getPresenceMeta(
-      org.apache.carbondata.format.PresenceMeta presentMetadataThrift) {
-    PresenceMeta presenceMeta = new PresenceMeta();
-    presenceMeta.setRepresentNullValues(presentMetadataThrift.isRepresents_presence());
-    presenceMeta.setBitSet(BitSet.valueOf(CompressorFactory.getInstance().getCompressor()
-        .unCompressByte(presentMetadataThrift.getPresent_bit_stream())));
-    return presenceMeta;
-  }
-
-  /**
-   * Below method will be used to read the chunk based on block indexes
-   * Reading logic of below method is: Except last column all the column chunk
-   * can be read in group if not last column then read data of all the column
-   * present in block index together then process it. For last column read is
-   * separately and process
-   *
-   * @param fileReader   file reader to read the blocks from file
-   * @param blockIndexes blocks range to be read
-   * @return measure column chunks
-   * @throws IOException
-   */
-  public MeasureRawColumnChunk[] readRawMeasureChunks(FileHolder fileReader, int[][] blockIndexes)
-      throws IOException {
-    // read the column chunk based on block index and add
-    MeasureRawColumnChunk[] dataChunks =
-        new MeasureRawColumnChunk[measureColumnChunkOffsets.size()];
-    if (blockIndexes.length == 0) {
-      return dataChunks;
-    }
-    MeasureRawColumnChunk[] groupChunk = null;
-    int index = 0;
-    for (int i = 0; i < blockIndexes.length - 1; i++) {
-      index = 0;
-      groupChunk = readRawMeasureChunksInGroup(fileReader, blockIndexes[i][0], blockIndexes[i][1]);
-      for (int j = blockIndexes[i][0]; j <= blockIndexes[i][1]; j++) {
-        dataChunks[j] = groupChunk[index++];
-      }
-    }
-    if (blockIndexes[blockIndexes.length - 1][0] == measureColumnChunkOffsets.size() - 1) {
-      dataChunks[blockIndexes[blockIndexes.length - 1][0]] =
-          readRawMeasureChunk(fileReader, blockIndexes[blockIndexes.length - 1][0]);
-    } else {
-      groupChunk = readRawMeasureChunksInGroup(fileReader, blockIndexes[blockIndexes.length - 1][0],
-          blockIndexes[blockIndexes.length - 1][1]);
-      index = 0;
-      for (int j = blockIndexes[blockIndexes.length - 1][0];
-           j <= blockIndexes[blockIndexes.length - 1][1]; j++) {
-        dataChunks[j] = groupChunk[index++];
-      }
-    }
-    return dataChunks;
+    super(blockletInfo, filePath);
   }
 
   @Override public MeasureRawColumnChunk readRawMeasureChunk(FileHolder fileReader, int blockIndex)
@@ -146,20 +71,31 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
     return rawColumnChunk;
   }
 
-  private MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileHolder fileReader,
-      int startBlockIndex, int endBlockIndex) throws IOException {
-    long currentMeasureOffset = measureColumnChunkOffsets.get(startBlockIndex);
+  /**
+   * Below method will be used to read measure chunk data in group.
+   * This method will be useful to avoid multiple IO while reading the
+   * data from
+   *
+   * @param fileReader               file reader to read the data
+   * @param startColumnBlockletIndex first column blocklet index to be read
+   * @param endColumnBlockletIndex   end column blocklet index to be read
+   * @return measure raw chunkArray
+   * @throws IOException
+   */
+  protected MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileHolder fileReader,
+      int startColumnBlockletIndex, int endColumnBlockletIndex) throws IOException {
+    long currentMeasureOffset = measureColumnChunkOffsets.get(startColumnBlockletIndex);
     ByteBuffer buffer = ByteBuffer.allocateDirect(
-        (int) (measureColumnChunkOffsets.get(endBlockIndex + 1) - currentMeasureOffset));
+        (int) (measureColumnChunkOffsets.get(endColumnBlockletIndex + 1) - currentMeasureOffset));
     synchronized (fileReader) {
       fileReader.readByteBuffer(filePath, buffer, currentMeasureOffset,
-          (int) (measureColumnChunkOffsets.get(endBlockIndex + 1) - currentMeasureOffset));
+          (int) (measureColumnChunkOffsets.get(endColumnBlockletIndex + 1) - currentMeasureOffset));
     }
     MeasureRawColumnChunk[] dataChunks =
-        new MeasureRawColumnChunk[endBlockIndex - startBlockIndex + 1];
+        new MeasureRawColumnChunk[endColumnBlockletIndex - startColumnBlockletIndex + 1];
     int runningLength = 0;
     int index = 0;
-    for (int i = startBlockIndex; i <= endBlockIndex; i++) {
+    for (int i = startColumnBlockletIndex; i <= endColumnBlockletIndex; i++) {
       int currentLength =
           (int) (measureColumnChunkOffsets.get(i + 1) - measureColumnChunkOffsets.get(i));
       MeasureRawColumnChunk measureRawColumnChunk =

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
new file mode 100644
index 0000000..307af41
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.reader.measure.v3;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReaderV2V3Format;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
+import org.apache.carbondata.core.datastore.dataholder.CarbonReadDataHolder;
+import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.DataChunk2;
+import org.apache.carbondata.format.DataChunk3;
+
+import org.apache.commons.lang.ArrayUtils;
+
+/**
+ * Measure column V3 Reader class which will be used to read and uncompress
+ * V3 format data
+ * data format
+ * Data Format
+ * <Column1 Data ChunkV3><Column1<Page1><Page2><Page3><Page4>>
+ * <Column2 Data ChunkV3><Column2<Page1><Page2><Page3><Page4>>
+ * <Column3 Data ChunkV3><Column3<Page1><Page2><Page3><Page4>>
+ * <Column4 Data ChunkV3><Column4<Page1><Page2><Page3><Page4>>
+ */
+public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChunkReaderV2V3Format {
+
+  /**
+   * end position of last measure in carbon data file
+   */
+  private long measureOffsets;
+
+  public CompressedMeasureChunkFileBasedReaderV3(BlockletInfo blockletInfo, String filePath) {
+    super(blockletInfo, filePath);
+    measureOffsets = blockletInfo.getMeasureOffsets();
+  }
+
+  /**
+   * Below method will be used to read the measure column data form carbon data file
+   * 1. Get the length of the data to be read
+   * 2. Allocate the direct buffer
+   * 3. read the data from file
+   * 4. Get the data chunk object from data read
+   * 5. Create the raw chunk object and fill the details
+   *
+   * @param fileReader          reader for reading the column from carbon data file
+   * @param blockletColumnIndex blocklet index of the column in carbon data file
+   * @return measure raw chunk
+   */
+  @Override public MeasureRawColumnChunk readRawMeasureChunk(FileHolder fileReader, int blockIndex)
+      throws IOException {
+    int dataLength = 0;
+    // to calculate the length of the data to be read
+    // column other than last column we can subtract the offset of current column with
+    // next column and get the total length.
+    // but for last column we need to use lastDimensionOffset which is the end position
+    // of the last dimension, we can subtract current dimension offset from lastDimesionOffset
+    if (measureColumnChunkOffsets.size() - 1 == blockIndex) {
+      dataLength = (int) (measureOffsets - measureColumnChunkOffsets.get(blockIndex));
+    } else {
+      dataLength = (int) (measureColumnChunkOffsets.get(blockIndex + 1) - measureColumnChunkOffsets
+          .get(blockIndex));
+    }
+    // allocate the buffer
+    ByteBuffer buffer = ByteBuffer.allocateDirect(dataLength);
+    // read the data from carbon data file
+    synchronized (fileReader) {
+      fileReader
+          .readByteBuffer(filePath, buffer, measureColumnChunkOffsets.get(blockIndex), dataLength);
+    }
+    // get the data chunk which will have all the details about the data pages
+    DataChunk3 dataChunk =
+        CarbonUtil.readDataChunk3(buffer, 0, measureColumnChunkLength.get(blockIndex));
+    // creating a raw chunks instance and filling all the details
+    MeasureRawColumnChunk rawColumnChunk =
+        new MeasureRawColumnChunk(blockIndex, buffer, 0, dataLength, this);
+    int numberOfPages = dataChunk.getPage_length().size();
+    byte[][] maxValueOfEachPage = new byte[numberOfPages][];
+    byte[][] minValueOfEachPage = new byte[numberOfPages][];
+    int[] eachPageLength = new int[numberOfPages];
+    for (int i = 0; i < minValueOfEachPage.length; i++) {
+      maxValueOfEachPage[i] =
+          dataChunk.getData_chunk_list().get(i).getMin_max().getMax_values().get(0).array();
+      minValueOfEachPage[i] =
+          dataChunk.getData_chunk_list().get(i).getMin_max().getMin_values().get(0).array();
+      eachPageLength[i] = dataChunk.getData_chunk_list().get(i).getNumberOfRowsInpage();
+    }
+    rawColumnChunk.setDataChunkV3(dataChunk);
+    rawColumnChunk.setFileReader(fileReader);
+    rawColumnChunk.setPagesCount(dataChunk.getPage_length().size());
+    rawColumnChunk.setMaxValues(maxValueOfEachPage);
+    rawColumnChunk.setMinValues(minValueOfEachPage);
+    rawColumnChunk.setRowCount(eachPageLength);
+    rawColumnChunk.setLengths(ArrayUtils
+        .toPrimitive(dataChunk.page_length.toArray(new Integer[dataChunk.page_length.size()])));
+    rawColumnChunk.setOffsets(ArrayUtils
+        .toPrimitive(dataChunk.page_offset.toArray(new Integer[dataChunk.page_offset.size()])));
+    return rawColumnChunk;
+  }
+
+  /**
+   * Below method will be used to read the multiple measure column data in group
+   * and divide into measure raw chunk object
+   * Steps for reading
+   * 1. Get the length of the data to be read
+   * 2. Allocate the direct buffer
+   * 3. read the data from file
+   * 4. Get the data chunk object from file for each column
+   * 5. Create the raw chunk object and fill the details for each column
+   * 6. increment the offset of the data
+   *
+   * @param fileReader
+   *        reader which will be used to read the measure columns data from file
+   * @param startBlockletColumnIndex
+   *        blocklet index of the first measure column
+   * @param endBlockletColumnIndex
+   *        blocklet index of the last measure column
+   * @return MeasureRawColumnChunk array
+   */
+  protected MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileHolder fileReader,
+      int startColumnBlockletIndex, int endColumnBlockletIndex) throws IOException {
+    // to calculate the length of the data to be read
+    // column we can subtract the offset of start column offset with
+    // end column+1 offset and get the total length.
+    long currentMeasureOffset = measureColumnChunkOffsets.get(startColumnBlockletIndex);
+    ByteBuffer buffer = ByteBuffer.allocateDirect(
+        (int) (measureColumnChunkOffsets.get(endColumnBlockletIndex + 1) - currentMeasureOffset));
+    // read the data from carbon data file
+    synchronized (fileReader) {
+      fileReader.readByteBuffer(filePath, buffer, currentMeasureOffset,
+          (int) (measureColumnChunkOffsets.get(endColumnBlockletIndex + 1) - currentMeasureOffset));
+    }
+    // create raw chunk for each measure column
+    MeasureRawColumnChunk[] measureDataChunk =
+        new MeasureRawColumnChunk[endColumnBlockletIndex - startColumnBlockletIndex + 1];
+    int runningLength = 0;
+    int index = 0;
+    for (int i = startColumnBlockletIndex; i <= endColumnBlockletIndex; i++) {
+      int currentLength =
+          (int) (measureColumnChunkOffsets.get(i + 1) - measureColumnChunkOffsets.get(i));
+      MeasureRawColumnChunk measureRawColumnChunk =
+          new MeasureRawColumnChunk(i, buffer, runningLength, currentLength, this);
+      DataChunk3 dataChunk =
+          CarbonUtil.readDataChunk3(buffer, runningLength, measureColumnChunkLength.get(i));
+
+      int numberOfPages = dataChunk.getPage_length().size();
+      byte[][] maxValueOfEachPage = new byte[numberOfPages][];
+      byte[][] minValueOfEachPage = new byte[numberOfPages][];
+      int[] eachPageLength = new int[numberOfPages];
+      for (int j = 0; j < minValueOfEachPage.length; j++) {
+        maxValueOfEachPage[j] =
+            dataChunk.getData_chunk_list().get(j).getMin_max().getMax_values().get(0).array();
+        minValueOfEachPage[j] =
+            dataChunk.getData_chunk_list().get(j).getMin_max().getMin_values().get(0).array();
+        eachPageLength[j] = dataChunk.getData_chunk_list().get(j).getNumberOfRowsInpage();
+      }
+      measureRawColumnChunk.setDataChunkV3(dataChunk);
+      ;
+      measureRawColumnChunk.setFileReader(fileReader);
+      measureRawColumnChunk.setPagesCount(dataChunk.getPage_length().size());
+      measureRawColumnChunk.setMaxValues(maxValueOfEachPage);
+      measureRawColumnChunk.setMinValues(minValueOfEachPage);
+      measureRawColumnChunk.setRowCount(eachPageLength);
+      measureRawColumnChunk.setLengths(ArrayUtils
+          .toPrimitive(dataChunk.page_length.toArray(new Integer[dataChunk.page_length.size()])));
+      measureRawColumnChunk.setOffsets(ArrayUtils
+          .toPrimitive(dataChunk.page_offset.toArray(new Integer[dataChunk.page_offset.size()])));
+      measureDataChunk[index] = measureRawColumnChunk;
+      runningLength += currentLength;
+      index++;
+    }
+    return measureDataChunk;
+  }
+
+  /**
+   * Below method will be used to convert the compressed measure chunk raw data to actual data
+   *
+   * @param measureRawColumnChunk measure raw chunk
+   * @param page                  number
+   * @return DimensionColumnDataChunk
+   */
+  @Override public MeasureColumnDataChunk convertToMeasureChunk(
+      MeasureRawColumnChunk measureRawColumnChunk, int pageNumber) throws IOException {
+    MeasureColumnDataChunk datChunk = new MeasureColumnDataChunk();
+    // data chunk of blocklet column
+    DataChunk3 dataChunk3 = measureRawColumnChunk.getDataChunkV3();
+    // data chunk of page
+    DataChunk2 measureColumnChunk = dataChunk3.getData_chunk_list().get(pageNumber);
+    // calculating the start point of data
+    // as buffer can contain multiple column data, start point will be datachunkoffset +
+    // data chunk length + page offset
+    int copyPoint = measureRawColumnChunk.getOffSet() + measureColumnChunkLength
+        .get(measureRawColumnChunk.getBlockletId()) + dataChunk3.getPage_offset().get(pageNumber);
+    List<ValueEncoderMeta> valueEncodeMeta = new ArrayList<>();
+    for (int i = 0; i < measureColumnChunk.getEncoder_meta().size(); i++) {
+      valueEncodeMeta.add(CarbonUtil
+          .deserializeEncoderMetaNew(measureColumnChunk.getEncoder_meta().get(i).array()));
+    }
+    WriterCompressModel compressionModel = CarbonUtil.getValueCompressionModel(valueEncodeMeta);
+    ValueCompressionHolder values = compressionModel.getValueCompressionHolder()[0];
+    // uncompress
+    byte[] data = new byte[measureColumnChunk.data_page_length];
+    ByteBuffer rawData = measureRawColumnChunk.getRawData();
+    rawData.position(copyPoint);
+    rawData.get(data);
+    values.uncompress(compressionModel.getConvertedDataType()[0], data, 0,
+        measureColumnChunk.data_page_length, compressionModel.getMantissa()[0],
+        compressionModel.getMaxValue()[0], measureRawColumnChunk.getRowCount()[pageNumber]);
+    CarbonReadDataHolder measureDataHolder = new CarbonReadDataHolder(values);
+    // set the data chunk
+    datChunk.setMeasureDataHolder(measureDataHolder);
+    // set the null value indexes
+    datChunk.setNullValueIndexHolder(getPresenceMeta(measureColumnChunk.presence));
+    return datChunk;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
index 14e7938..23af707 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.core.datastore.chunk.store.impl.safe;
 
 import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
 
 /**
  * Below class will be used to store fixed length dimension data
@@ -66,13 +67,7 @@ public class SafeFixedLengthDimensionDataChunkStore extends SafeAbsractDimension
     }
     // below part is to convert the byte array to surrogate value
     int startOffsetOfData = index * columnValueSize;
-    int surrogate = 0;
-    for (int i = 0; i < columnValueSize; i++) {
-      surrogate <<= 8;
-      surrogate ^= data[startOffsetOfData] & 0xFF;
-      startOffsetOfData++;
-    }
-    return surrogate;
+    return CarbonUtil.getSurrogateInternal(data, startOffsetOfData, columnValueSize);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
new file mode 100644
index 0000000..346d8d8
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.columnar;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.ByteUtil;
+
+public class BlockIndexerStorageForShort implements IndexStorage<short[]> {
+
+  private boolean alreadySorted;
+
+  private short[] dataAfterComp;
+
+  private short[] indexMap;
+
+  private byte[][] keyBlock;
+
+  private short[] dataIndexMap;
+
+  private int totalSize;
+
+  public BlockIndexerStorageForShort(byte[][] keyBlock, boolean compressData,
+      boolean isNoDictionary, boolean isSortRequired) {
+    ColumnWithShortIndex[] columnWithIndexs = createColumnWithIndexArray(keyBlock, isNoDictionary);
+    if (isSortRequired) {
+      Arrays.sort(columnWithIndexs);
+    }
+    compressMyOwnWay(extractDataAndReturnIndexes(columnWithIndexs, keyBlock));
+    if (compressData) {
+      compressDataMyOwnWay(columnWithIndexs);
+    }
+  }
+
+  /**
+   * Create an object with each column array and respective index
+   *
+   * @return
+   */
+  private ColumnWithShortIndex[] createColumnWithIndexArray(byte[][] keyBlock,
+      boolean isNoDictionary) {
+    ColumnWithShortIndex[] columnWithIndexs;
+    if (isNoDictionary) {
+      columnWithIndexs = new ColumnWithShortIndex[keyBlock.length];
+      for (short i = 0; i < columnWithIndexs.length; i++) {
+        columnWithIndexs[i] = new ColumnWithShortIndexForNoDictionay(keyBlock[i], i);
+      }
+    } else {
+      columnWithIndexs = new ColumnWithShortIndex[keyBlock.length];
+      for (short i = 0; i < columnWithIndexs.length; i++) {
+        columnWithIndexs[i] = new ColumnWithShortIndex(keyBlock[i], i);
+      }
+    }
+    return columnWithIndexs;
+  }
+
+  private short[] extractDataAndReturnIndexes(ColumnWithShortIndex[] columnWithIndexs,
+      byte[][] keyBlock) {
+    short[] indexes = new short[columnWithIndexs.length];
+    for (int i = 0; i < indexes.length; i++) {
+      indexes[i] = columnWithIndexs[i].getIndex();
+      keyBlock[i] = columnWithIndexs[i].getColumn();
+    }
+    this.keyBlock = keyBlock;
+    return indexes;
+  }
+
+  /**
+   * It compresses depends up on the sequence numbers.
+   * [1,2,3,4,6,8,10,11,12,13] is translated to [1,4,6,8,10,13] and [0,6]. In
+   * first array the start and end of sequential numbers and second array
+   * keeps the indexes of where sequential numbers starts. If there is no
+   * sequential numbers then the same array it returns with empty second
+   * array.
+   *
+   * @param indexes
+   */
+  public void compressMyOwnWay(short[] indexes) {
+    List<Short> list = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    List<Short> map = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    int k = 0;
+    int i = 1;
+    for (; i < indexes.length; i++) {
+      if (indexes[i] - indexes[i - 1] == 1) {
+        k++;
+      } else {
+        if (k > 0) {
+          map.add(((short) list.size()));
+          list.add(indexes[i - k - 1]);
+          list.add(indexes[i - 1]);
+        } else {
+          list.add(indexes[i - 1]);
+        }
+        k = 0;
+      }
+    }
+    if (k > 0) {
+      map.add(((short) list.size()));
+      list.add(indexes[i - k - 1]);
+      list.add(indexes[i - 1]);
+    } else {
+      list.add(indexes[i - 1]);
+    }
+    double compressionPercentage = (((list.size() + map.size()) * 100) / indexes.length);
+    if (compressionPercentage > 70) {
+      dataAfterComp = indexes;
+    } else {
+      dataAfterComp = convertToArray(list);
+    }
+    if (indexes.length == dataAfterComp.length) {
+      indexMap = new short[0];
+    } else {
+      indexMap = convertToArray(map);
+    }
+    if (dataAfterComp.length == 2 && indexMap.length == 1) {
+      alreadySorted = true;
+    }
+  }
+
+  private short[] convertToArray(List<Short> list) {
+    short[] shortArray = new short[list.size()];
+    for (int i = 0; i < shortArray.length; i++) {
+      shortArray[i] = list.get(i);
+    }
+    return shortArray;
+  }
+
+  /**
+   * @return the alreadySorted
+   */
+  public boolean isAlreadySorted() {
+    return alreadySorted;
+  }
+
+  /**
+   * @return the dataAfterComp
+   */
+  public short[] getDataAfterComp() {
+    return dataAfterComp;
+  }
+
+  /**
+   * @return the indexMap
+   */
+  public short[] getIndexMap() {
+    return indexMap;
+  }
+
+  /**
+   * @return the keyBlock
+   */
+  public byte[][] getKeyBlock() {
+    return keyBlock;
+  }
+
+  private void compressDataMyOwnWay(ColumnWithShortIndex[] indexes) {
+    byte[] prvKey = indexes[0].getColumn();
+    List<ColumnWithShortIndex> list =
+        new ArrayList<ColumnWithShortIndex>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    list.add(indexes[0]);
+    short counter = 1;
+    short start = 0;
+    List<Short> map = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    for (int i = 1; i < indexes.length; i++) {
+      if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(prvKey, indexes[i].getColumn()) != 0) {
+        prvKey = indexes[i].getColumn();
+        list.add(indexes[i]);
+        map.add(start);
+        map.add(counter);
+        start += counter;
+        counter = 1;
+        continue;
+      }
+      counter++;
+    }
+    map.add(start);
+    map.add(counter);
+    this.keyBlock = convertToKeyArray(list);
+    if (indexes.length == keyBlock.length) {
+      dataIndexMap = new short[0];
+    } else {
+      dataIndexMap = convertToArray(map);
+    }
+  }
+
+  private byte[][] convertToKeyArray(List<ColumnWithShortIndex> list) {
+    byte[][] shortArray = new byte[list.size()][];
+    for (int i = 0; i < shortArray.length; i++) {
+      shortArray[i] = list.get(i).getColumn();
+      totalSize += shortArray[i].length;
+    }
+    return shortArray;
+  }
+
+  @Override public short[] getDataIndexMap() {
+    return dataIndexMap;
+  }
+
+  @Override public int getTotalSize() {
+    return totalSize;
+  }
+
+  @Override public byte[] getMin() {
+    return keyBlock[0];
+  }
+
+  @Override public byte[] getMax() {
+    return keyBlock[keyBlock.length - 1];
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndex.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndex.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndex.java
new file mode 100644
index 0000000..57447a3
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndex.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.columnar;
+
+import java.util.Arrays;
+
+import org.apache.carbondata.core.util.ByteUtil;
+
+public class ColumnWithShortIndex implements Comparable<ColumnWithShortIndex> {
+  protected byte[] column;
+
+  private short index;
+
+  public ColumnWithShortIndex(byte[] column, short index) {
+    this.column = column;
+    this.index = index;
+  }
+
+  /**
+   * @return the column
+   */
+  public byte[] getColumn() {
+    return column;
+  }
+
+  /**
+   * @param column the column to set
+   */
+  public void setColumn(byte[] column) {
+    this.column = column;
+  }
+
+  /**
+   * @return the index
+   */
+  public short getIndex() {
+    return index;
+  }
+
+  /**
+   * @param index the index to set
+   */
+  public void setIndex(short index) {
+    this.index = index;
+  }
+
+  @Override public int compareTo(ColumnWithShortIndex o) {
+    return ByteUtil.UnsafeComparer.INSTANCE.compareTo(column, o.column);
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    ColumnWithShortIndex o = (ColumnWithShortIndex)obj;
+    return Arrays.equals(column, o.column) && index == o.index;
+  }
+
+  @Override public int hashCode() {
+    return Arrays.hashCode(column) + index;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndexForNoDictionay.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndexForNoDictionay.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndexForNoDictionay.java
new file mode 100644
index 0000000..34cce63
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndexForNoDictionay.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.columnar;
+
+import java.util.Arrays;
+
+import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+
+public class ColumnWithShortIndexForNoDictionay extends ColumnWithShortIndex
+    implements Comparable<ColumnWithShortIndex> {
+
+  public ColumnWithShortIndexForNoDictionay(byte[] column, short index) {
+    super(column, index);
+  }
+
+  @Override public int compareTo(ColumnWithShortIndex o) {
+    return UnsafeComparer.INSTANCE
+        .compareTo(column, 2, column.length - 2, o.column, 2, o.column.length - 2);
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    ColumnWithIntIndexForHighCard o = (ColumnWithIntIndexForHighCard) obj;
+    return Arrays.equals(column, o.column) && getIndex() == o.getIndex();
+  }
+
+  @Override public int hashCode() {
+    return Arrays.hashCode(column) + getIndex();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/metadata/ColumnarFormatVersion.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/ColumnarFormatVersion.java b/core/src/main/java/org/apache/carbondata/core/metadata/ColumnarFormatVersion.java
index 7629895..240f891 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/ColumnarFormatVersion.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/ColumnarFormatVersion.java
@@ -19,7 +19,8 @@ package org.apache.carbondata.core.metadata;
 
 public enum ColumnarFormatVersion {
   V1((short)1),
-  V2((short)2);
+  V2((short)2),
+  V3((short)3);
 
   private short version;
   ColumnarFormatVersion(short version) {
@@ -43,8 +44,12 @@ public enum ColumnarFormatVersion {
       case 1:
         // after multiple reader support, user can write new file with version 1
         return V1;
-      default:
+      case 2:
         return V2;
+      case 3:
+        return V3;
+      default:
+        return V3;
     }
   }
 }